You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/16 17:03:43 UTC

svn commit: r487821 - in /incubator/qpid/trunk/qpid/java/client/src: main/java/org/apache/qpid/client/ test/java/org/apache/qpid/test/unit/topic/

Author: rgreig
Date: Sat Dec 16 08:03:42 2006
New Revision: 487821

URL: http://svn.apache.org/viewvc?view=rev&rev=487821
Log:
QPID-205 : Do not allow subscription to temporary topics created on a different session.

Added:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java
Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=487821&r1=487820&r2=487821
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Sat Dec 16 08:03:42 2006
@@ -885,6 +885,8 @@
                                                  final String selector,
                                                  final FieldTable rawSelector) throws JMSException
     {
+        checkTemporaryDestination(destination);
+
         return (org.apache.qpid.jms.MessageConsumer) new FailoverSupport()
         {
             public Object operation() throws JMSException
@@ -929,6 +931,26 @@
         }.execute(_connection);
     }
 
+    private void checkTemporaryDestination(Destination destination)
+            throws JMSException
+    {
+        if((destination instanceof TemporaryDestination))
+        {
+            _logger.debug("destination is temporary");
+            final TemporaryDestination tempDest = (TemporaryDestination) destination;
+            if(tempDest.getSession() != this)
+            {
+                _logger.debug("destination is on different session");
+                throw new JMSException("Cannot consume from a temporary destination created onanother session");
+            }
+            if(tempDest.isDeleted())
+            {
+                _logger.debug("destination is deleted");
+                throw new JMSException("Cannot consume from a deleted destination");
+            }
+        }
+    }
+
 
     public boolean hasConsumer(Destination destination)
     {
@@ -1497,11 +1519,15 @@
     /*
      * I could have combined the last 3 methods, but this way it improves readability
      */
-    private void checkValidTopic(Topic topic) throws InvalidDestinationException
+    private void checkValidTopic(Topic topic) throws JMSException
     {
         if (topic == null)
         {
             throw new javax.jms.InvalidDestinationException("Invalid Topic");
+        }
+        if((topic instanceof TemporaryDestination) && ((TemporaryDestination)topic).getSession() != this)
+        {
+            throw new JMSException("Cannot create a subscription on a temporary topic created in another session");
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?view=diff&rev=487821&r1=487820&r2=487821
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Sat Dec 16 08:03:42 2006
@@ -26,10 +26,12 @@
 /**
  * AMQ implementation of a TemporaryQueue.
  */
-final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue
+final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination
 {
 
+
     private final AMQSession _session;
+    private boolean _deleted;
 
     /**
      * Create a new instance of an AMQTemporaryQueue
@@ -49,10 +51,20 @@
         {
             throw new JMSException("Temporary Queue has consumers so cannot be deleted");
         }
+        _deleted = true;
 
         // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
         // by the server when there are no more subscriptions to that queue.  This is probably not
         // quite right for JMSCompliance.
     }
 
+    public AMQSession getSession()
+    {
+        return _session;
+    }
+
+    public boolean isDeleted()
+    {
+        return _deleted;
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java?view=diff&rev=487821&r1=487820&r2=487821
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java Sat Dec 16 08:03:42 2006
@@ -26,10 +26,11 @@
 /**
  * AMQ implementation of TemporaryTopic.
  */
-class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic
+class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic, TemporaryDestination
 {
 
     private final AMQSession _session;
+    private boolean _deleted;
     /**
      * Create new temporary topic.
      */
@@ -49,9 +50,20 @@
             throw new JMSException("Temporary Topic has consumers so cannot be deleted");
         }
 
+        _deleted = true;
         // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
         // by the server when there are no more subscriptions to that queue.  This is probably not
         // quite right for JMSCompliance.
+    }
+
+    public AMQSession getSession()
+    {
+        return _session;
+    }
+
+    public boolean isDeleted()
+    {
+        return _deleted;
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=487821&r1=487820&r2=487821
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Sat Dec 16 08:03:42 2006
@@ -142,7 +142,7 @@
 
     public void setDisableMessageID(boolean b) throws JMSException
     {
-    	checkPreConditions();
+        checkPreConditions();
         checkNotClosed();
         // IGNORED
     }
@@ -156,7 +156,7 @@
 
     public void setDisableMessageTimestamp(boolean b) throws JMSException
     {
-    	checkPreConditions();
+        checkPreConditions();
         _disableTimestamps = b;
     }
 
@@ -168,11 +168,11 @@
 
     public void setDeliveryMode(int i) throws JMSException
     {
-    	checkPreConditions();
+        checkPreConditions();
         if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT)
         {
             throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i +
-                                   " is illegal");
+                    " is illegal");
         }
         _deliveryMode = i;
     }
@@ -185,7 +185,7 @@
 
     public void setPriority(int i) throws JMSException
     {
-    	checkPreConditions();
+        checkPreConditions();
         if (i < 0 || i > 9)
         {
             throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
@@ -201,7 +201,7 @@
 
     public void setTimeToLive(long l) throws JMSException
     {
-    	checkPreConditions();
+        checkPreConditions();
         if (l < 0)
         {
             throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
@@ -229,8 +229,8 @@
 
     public void send(Message message) throws JMSException
     {
-    	checkPreConditions();
-    	checkInitialDestination();
+        checkPreConditions();
+        checkInitialDestination();
 
 
         synchronized (_connection.getFailoverMutex())
@@ -242,8 +242,8 @@
 
     public void send(Message message, int deliveryMode) throws JMSException
     {
-    	checkPreConditions();
-    	checkInitialDestination();
+        checkPreConditions();
+        checkInitialDestination();
 
         synchronized (_connection.getFailoverMutex())
         {
@@ -254,8 +254,8 @@
 
     public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
     {
-    	checkPreConditions();
-    	checkInitialDestination();
+        checkPreConditions();
+        checkInitialDestination();
         synchronized (_connection.getFailoverMutex())
         {
             sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
@@ -266,8 +266,8 @@
     public void send(Message message, int deliveryMode, int priority,
                      long timeToLive) throws JMSException
     {
-    	checkPreConditions();
-    	checkInitialDestination();
+        checkPreConditions();
+        checkInitialDestination();
         synchronized (_connection.getFailoverMutex())
         {
             sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory,
@@ -277,8 +277,8 @@
 
     public void send(Destination destination, Message message) throws JMSException
     {
-    	checkPreConditions();
-    	checkDestination(destination);
+        checkPreConditions();
+        checkDestination(destination);
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
@@ -291,8 +291,8 @@
                      int priority, long timeToLive)
             throws JMSException
     {
-    	checkPreConditions();
-    	checkDestination(destination);
+        checkPreConditions();
+        checkDestination(destination);
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
@@ -319,8 +319,8 @@
                      int priority, long timeToLive, boolean mandatory, boolean immediate)
             throws JMSException
     {
-    	checkPreConditions();
-    	checkDestination(destination);
+        checkPreConditions();
+        checkDestination(destination);
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
@@ -334,8 +334,8 @@
                      boolean immediate, boolean waitUntilSent)
             throws JMSException
     {
-    	checkPreConditions();
-    	checkDestination(destination);
+        checkPreConditions();
+        checkDestination(destination);
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
@@ -347,7 +347,7 @@
 
     private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
     {
-        if(message instanceof AbstractJMSMessage)
+        if (message instanceof AbstractJMSMessage)
         {
             return (AbstractJMSMessage) message;
         }
@@ -355,7 +355,7 @@
         {
             AbstractJMSMessage newMessage;
 
-            if(message instanceof BytesMessage)
+            if (message instanceof BytesMessage)
             {
                 BytesMessage bytesMessage = (BytesMessage) message;
                 bytesMessage.reset();
@@ -363,41 +363,40 @@
                 JMSBytesMessage nativeMsg = (JMSBytesMessage) _session.createBytesMessage();
 
 
-
                 byte[] buf = new byte[1024];
 
                 int len;
 
-                while((len = bytesMessage.readBytes(buf)) != -1)
+                while ((len = bytesMessage.readBytes(buf)) != -1)
                 {
-                    nativeMsg.writeBytes(buf,0,len);
+                    nativeMsg.writeBytes(buf, 0, len);
                 }
 
                 newMessage = nativeMsg;
             }
-            else if(message instanceof MapMessage)
+            else if (message instanceof MapMessage)
             {
                 MapMessage origMessage = (MapMessage) message;
                 MapMessage nativeMessage = _session.createMapMessage();
 
                 Enumeration mapNames = origMessage.getMapNames();
-                while(mapNames.hasMoreElements())
+                while (mapNames.hasMoreElements())
                 {
                     String name = (String) mapNames.nextElement();
                     nativeMessage.setObject(name, origMessage.getObject(name));
                 }
                 newMessage = (AbstractJMSMessage) nativeMessage;
             }
-            else if(message instanceof ObjectMessage)
+            else if (message instanceof ObjectMessage)
             {
                 ObjectMessage origMessage = (ObjectMessage) message;
                 ObjectMessage nativeMessage = _session.createObjectMessage();
 
                 nativeMessage.setObject(origMessage.getObject());
-                
+
                 newMessage = (AbstractJMSMessage) nativeMessage;
             }
-            else if(message instanceof TextMessage)
+            else if (message instanceof TextMessage)
             {
                 TextMessage origMessage = (TextMessage) message;
                 TextMessage nativeMessage = _session.createTextMessage();
@@ -406,7 +405,7 @@
 
                 newMessage = (AbstractJMSMessage) nativeMessage;
             }
-            else if(message instanceof StreamMessage)
+            else if (message instanceof StreamMessage)
             {
                 StreamMessage origMessage = (StreamMessage) message;
                 StreamMessage nativeMessage = _session.createStreamMessage();
@@ -415,7 +414,7 @@
                 try
                 {
                     origMessage.reset();
-                    while(true)
+                    while (true)
                     {
                         nativeMessage.writeObject(origMessage.readObject());
                     }
@@ -433,10 +432,10 @@
             }
 
             Enumeration propertyNames = message.getPropertyNames();
-            while(propertyNames.hasMoreElements())
+            while (propertyNames.hasMoreElements())
             {
                 String propertyName = String.valueOf(propertyNames.nextElement());
-                if(!propertyName.startsWith("JMSX_"))
+                if (!propertyName.startsWith("JMSX_"))
                 {
                     Object value = message.getObjectProperty(propertyName);
                     newMessage.setObjectProperty(propertyName, value);
@@ -445,28 +444,26 @@
 
             newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
 
-            
+
             int priority = message.getJMSPriority();
-            if(priority < 0)
+            if (priority < 0)
             {
                 priority = 0;
             }
-            else if(priority > 9)
+            else if (priority > 9)
             {
                 priority = 9;
             }
 
             newMessage.setJMSPriority(priority);
-            if(message.getJMSReplyTo() != null)
+            if (message.getJMSReplyTo() != null)
             {
                 newMessage.setJMSReplyTo(message.getJMSReplyTo());
             }
             newMessage.setJMSType(message.getJMSType());
 
 
-            
-
-            if(newMessage != null)
+            if (newMessage != null)
             {
                 return newMessage;
             }
@@ -478,15 +475,14 @@
     }
 
 
-
     private void validateDestination(Destination destination) throws JMSException
     {
         if (!(destination instanceof AMQDestination))
         {
             throw new JMSException("Unsupported destination class: " +
-                                   (destination != null ? destination.getClass() : null));
+                    (destination != null ? destination.getClass() : null));
         }
-        declareDestination((AMQDestination)destination);
+        declareDestination((AMQDestination) destination);
     }
 
     protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority,
@@ -497,6 +493,7 @@
 
     /**
      * The caller of this method must hold the failover mutex.
+     *
      * @param destination
      * @param origMessage
      * @param deliveryMode
@@ -509,6 +506,7 @@
     protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority,
                             long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
     {
+        checkTemporaryDestination(destination);
 
         AbstractJMSMessage message = convertToNativeMessage(origMessage);
         AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
@@ -568,7 +566,7 @@
         _protocolHandler.writeFrame(compositeFrame, wait);
 
 
-        if(message != origMessage)
+        if (message != origMessage)
         {
             _logger.warn("Updating original message");
             origMessage.setJMSPriority(message.getJMSPriority());
@@ -579,9 +577,29 @@
         }
     }
 
+    private void checkTemporaryDestination(AMQDestination destination) throws JMSException
+    {
+        if(destination instanceof TemporaryDestination)
+        {
+            _logger.debug("destination is temporary destination");
+            TemporaryDestination tempDest = (TemporaryDestination) destination;
+            if(tempDest.getSession().isClosed())
+            {
+                _logger.debug("session is closed");
+                throw new JMSException("Session for temporary destination has been closed");
+            }
+            if(tempDest.isDeleted())
+            {
+                _logger.debug("destination is deleted");
+                throw new JMSException("Cannot send to a deleted temporary destination");
+            }
+        }
+    }
+
     /**
      * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
      * maximum frame size.
+     *
      * @param payload
      * @return the array of content bodies
      */
@@ -611,8 +629,8 @@
             for (int i = 0; i < bodies.length; i++)
             {
                 bodies[i] = new ContentBody();
-                payload.position((int)framePayloadMax * i);
-                int length = (remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining;
+                payload.position((int) framePayloadMax * i);
+                int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
                 payload.limit(payload.position() + length);
                 bodies[i].payload = payload.slice();
                 remaining -= length;
@@ -633,32 +651,42 @@
         _encoding = encoding;
     }
 
-	private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException {
-		checkNotClosed();
+    private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException
+    {
+        checkNotClosed();
+
+        if (_session == null || _session.isClosed())
+        {
+            throw new javax.jms.IllegalStateException("Invalid Session");
+        }
+    }
+
+    private void checkInitialDestination()
+    {
+        if (_destination == null)
+        {
+            throw new UnsupportedOperationException("Destination is null");
+        }
+    }
 
-		if(_session == null || _session.isClosed()){
-			throw new javax.jms.IllegalStateException("Invalid Session");
-		}
-	}
-
-	private void checkInitialDestination(){
-		if(_destination == null){
-			throw new UnsupportedOperationException("Destination is null");
-		}
-	}
-
-	private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{
-		if (_destination != null && suppliedDestination != null){
-			throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
-		}
-
-		if (suppliedDestination == null){
-			throw new InvalidDestinationException("Supplied Destination was invalid");
-		}
-	}
-
-
-	public AMQSession getSession() {
-		return _session;
-	}
+    private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException
+    {
+        if (_destination != null && suppliedDestination != null)
+        {
+            throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
+        }
+
+        if (suppliedDestination == null)
+        {
+            throw new InvalidDestinationException("Supplied Destination was invalid");
+        }
+
+
+    }
+
+
+    public AMQSession getSession()
+    {
+        return _session;
+    }
 }

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java?view=auto&rev=487821
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TemporaryDestination.java Sat Dec 16 08:03:42 2006
@@ -0,0 +1,17 @@
+package org.apache.qpid.client;
+
+import javax.jms.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Provides support for covenience interface implemented by both AMQTemporaryTopic and AMQTemporaryQueue
+ * so that operations related to their "temporary-ness" can be abstracted out.
+ */
+interface TemporaryDestination extends Destination
+{
+
+    public void delete() throws JMSException;
+    public AMQSession getSession();
+    public boolean isDeleted();
+
+}

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?view=diff&rev=487821&r1=487820&r2=487821
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Sat Dec 16 08:03:42 2006
@@ -235,6 +235,19 @@
             fail("Unexpected Exception: " + je.getMessage());
         }
 
+        TopicSession session2 = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        try
+        {
+            MessageConsumer consumer2 = session2.createConsumer(topic);
+            fail("Expected a JMSException when subscribing to a temporary topic created on adifferent session");
+        }
+        catch (JMSException je)
+        {
+            ; // pass
+        }
+
+
+
         conn.close();
     }