You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2011/03/03 05:49:50 UTC

svn commit: r1076516 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ systests/src/main/java/org/apache/qpid/test/client/destination/

Author: rajith
Date: Thu Mar  3 04:49:50 2011
New Revision: 1076516

URL: http://svn.apache.org/viewvc?rev=1076516&view=rev
Log:
QPID-3106
Instead of checking if it's an instance of AMQQueue, the code the now checks if it's an instance of AMQDestination and javax.jms.Queue to cover the AMQAnyDestination case. The same check is done for topics. Added test cases for QueueReceivers, TopicSubscribers and DurableTopicSubscribers using the new addressing scheme.

Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=1076516&r1=1076515&r2=1076516&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Thu Mar  3 04:49:50 2011
@@ -1043,7 +1043,7 @@ public abstract class AMQSession<C exten
             throws JMSException
     {
         checkNotClosed();
-        AMQTopic origTopic = checkValidTopic(topic, true);
+        Topic origTopic = checkValidTopic(topic, true);
         AMQTopic dest = AMQTopic.createDurableTopic(origTopic, name, _connection);
         
         String messageSelector = ((selector == null) || (selector.trim().length() == 0)) ? null : selector;
@@ -1307,8 +1307,8 @@ public abstract class AMQSession<C exten
     public QueueReceiver createQueueReceiver(Destination destination) throws JMSException
     {
         checkValidDestination(destination);
-        AMQQueue dest = (AMQQueue) destination;
-        C consumer = (C) createConsumer(destination);
+        Queue dest = validateQueue(destination);
+        C consumer = (C) createConsumer(dest);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
@@ -1326,8 +1326,8 @@ public abstract class AMQSession<C exten
     public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException
     {
         checkValidDestination(destination);
-        AMQQueue dest = (AMQQueue) destination;
-        C consumer = (C) createConsumer(destination, messageSelector);
+        Queue dest = validateQueue(destination);
+        C consumer = (C) createConsumer(dest, messageSelector);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
@@ -1344,7 +1344,7 @@ public abstract class AMQSession<C exten
     public QueueReceiver createReceiver(Queue queue) throws JMSException
     {
         checkNotClosed();
-        AMQQueue dest = (AMQQueue) queue;
+        Queue dest = validateQueue(queue);
         C consumer = (C) createConsumer(dest);
 
         return new QueueReceiverAdaptor(dest, consumer);
@@ -1363,11 +1363,23 @@ public abstract class AMQSession<C exten
     public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
     {
         checkNotClosed();
-        AMQQueue dest = (AMQQueue) queue;
+        Queue dest = validateQueue(queue);
         C consumer = (C) createConsumer(dest, messageSelector);
 
         return new QueueReceiverAdaptor(dest, consumer);
     }
+    
+    private Queue validateQueue(Destination dest) throws InvalidDestinationException
+    {
+        if (dest instanceof AMQDestination && dest instanceof javax.jms.Queue)
+        {
+            return (Queue)dest;
+        }
+        else
+        {
+            throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Queue");
+        }
+    }
 
     public QueueSender createSender(Queue queue) throws JMSException
     {
@@ -1408,7 +1420,7 @@ public abstract class AMQSession<C exten
     public TopicSubscriber createSubscriber(Topic topic) throws JMSException
     {
         checkNotClosed();
-        AMQTopic dest = checkValidTopic(topic);
+        Topic dest = checkValidTopic(topic);
 
         // AMQTopic dest = new AMQTopic(topic.getTopicName());
         return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest));
@@ -1428,7 +1440,7 @@ public abstract class AMQSession<C exten
     public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
     {
         checkNotClosed();
-        AMQTopic dest = checkValidTopic(topic);
+        Topic dest = checkValidTopic(topic);
 
         // AMQTopic dest = new AMQTopic(topic.getTopicName());
         return new TopicSubscriberAdaptor(dest, (C) createExclusiveConsumer(dest, messageSelector, noLocal));
@@ -2395,7 +2407,7 @@ public abstract class AMQSession<C exten
     /*
      * I could have combined the last 3 methods, but this way it improves readability
      */
-    protected AMQTopic checkValidTopic(Topic topic, boolean durable) throws JMSException
+    protected Topic checkValidTopic(Topic topic, boolean durable) throws JMSException
     {
         if (topic == null)
         {
@@ -2414,17 +2426,17 @@ public abstract class AMQSession<C exten
                 ("Cannot create a durable subscription with a temporary topic: " + topic);
         }
 
-        if (!(topic instanceof AMQTopic))
+        if (!(topic instanceof AMQDestination && topic instanceof javax.jms.Topic))
         {
             throw new javax.jms.InvalidDestinationException(
                     "Cannot create a subscription on topic created for another JMS Provider, class of topic provided is: "
                     + topic.getClass().getName());
         }
 
-        return (AMQTopic) topic;
+        return topic;
     }
 
-    protected AMQTopic checkValidTopic(Topic topic) throws JMSException
+    protected Topic checkValidTopic(Topic topic) throws JMSException
     {
         return checkValidTopic(topic, false);
     }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?rev=1076516&r1=1076515&r2=1076516&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Thu Mar  3 04:49:50 2011
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
 
 import java.net.URISyntaxException;
 
+import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
 import javax.jms.Topic;
 
@@ -95,39 +96,47 @@ public class AMQTopic extends AMQDestina
         super(exchangeName, exchangeClass, routingKey, isExclusive, isAutoDelete, queueName, isDurable,bindingKeys);
     }
 
-    public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
+    public static AMQTopic createDurableTopic(Topic topic, String subscriptionName, AMQConnection connection)
             throws JMSException
     {
-        if (topic.getDestSyntax() == DestSyntax.ADDR)
+        if (topic instanceof AMQDestination && topic instanceof javax.jms.Topic)
         {
-            try
+            AMQDestination qpidTopic = (AMQDestination)topic;
+            if (qpidTopic.getDestSyntax() == DestSyntax.ADDR)
             {
-                AMQTopic t = new AMQTopic(topic.getAddress());
-                AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
-                // link is never null if dest was created using an address string.
-                t.getLink().setName(queueName.asString());               
-                t.getSourceNode().setAutoDelete(false);
-                t.getSourceNode().setDurable(true);
-                
-                // The legacy fields are also populated just in case.
-                t.setQueueName(queueName);
-                t.setAutoDelete(false);
-                t.setDurable(true);
-                return t;
+                try
+                {
+                    AMQTopic t = new AMQTopic(qpidTopic.getAddress());
+                    AMQShortString queueName = getDurableTopicQueueName(subscriptionName, connection);
+                    // link is never null if dest was created using an address string.
+                    t.getLink().setName(queueName.asString());               
+                    t.getSourceNode().setAutoDelete(false);
+                    t.getSourceNode().setDurable(true);
+                    
+                    // The legacy fields are also populated just in case.
+                    t.setQueueName(queueName);
+                    t.setAutoDelete(false);
+                    t.setDurable(true);
+                    return t;
+                }
+                catch(Exception e)
+                {
+                    JMSException ex = new JMSException("Error creating durable topic");
+                    ex.initCause(e);
+                    ex.setLinkedException(e);
+                    throw ex;
+                }
             }
-            catch(Exception e)
+            else
             {
-                JMSException ex = new JMSException("Error creating durable topic");
-                ex.initCause(e);
-                ex.setLinkedException(e);
-                throw ex;
+                return new AMQTopic(qpidTopic.getExchangeName(), qpidTopic.getRoutingKey(), false,
+                                getDurableTopicQueueName(subscriptionName, connection),
+                                true);
             }
         }
         else
         {
-            return new AMQTopic(topic.getExchangeName(), topic.getRoutingKey(), false,
-                            getDurableTopicQueueName(subscriptionName, connection),
-                            true);
+            throw new InvalidDestinationException("The destination object used is not from this provider or of type javax.jms.Topic");
         }
     }
 

Modified: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java?rev=1076516&r1=1076515&r2=1076516&view=diff
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java (original)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java Thu Mar  3 04:49:50 2011
@@ -31,12 +31,18 @@ import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSession;
 import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
 import javax.naming.Context;
 
 import org.apache.qpid.client.AMQAnyDestination;
+import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQSession_0_10;
 import org.apache.qpid.client.messaging.address.Node.ExchangeNode;
@@ -796,4 +802,46 @@ public class AddressBasedDestinationTest
         {            
         }
     }
+    
+    public void testQueueReceiversAndTopicSubscriber() throws Exception
+    {
+        Queue queue = new AMQAnyDestination("ADDR:my-queue; {create: always}");
+        Topic topic = new AMQAnyDestination("ADDR:amq.topic/test");
+        
+        QueueSession qSession = ((AMQConnection)_connection).createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+        QueueReceiver receiver = qSession.createReceiver(queue);
+        
+        TopicSession tSession = ((AMQConnection)_connection).createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        TopicSubscriber sub = tSession.createSubscriber(topic);
+        
+        Session ssn = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer prod1 = ssn.createProducer(ssn.createQueue("ADDR:my-queue"));
+        prod1.send(ssn.createTextMessage("test1"));
+        
+        MessageProducer prod2 = ssn.createProducer(ssn.createTopic("ADDR:amq.topic/test"));
+        prod2.send(ssn.createTextMessage("test2"));
+        
+        Message msg1 = receiver.receive();
+        assertNotNull(msg1);
+        assertEquals("test1",((TextMessage)msg1).getText());
+        
+        Message msg2 = sub.receive();
+        assertNotNull(msg2);
+        assertEquals("test2",((TextMessage)msg2).getText());  
+    }
+    
+    public void testDurableSubscriber() throws Exception
+    {
+        Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE);        
+        Topic topic = ssn.createTopic("news.us");
+        
+        MessageConsumer cons = ssn.createDurableSubscriber(topic, "my-sub");
+        MessageProducer prod = ssn.createProducer(topic);
+        
+        Message m = ssn.createTextMessage("A");
+        prod.send(m);
+        Message msg = cons.receive(1000);
+        assertNotNull(msg);
+        assertEquals("A",((TextMessage)msg).getText());
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org