You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/08/19 12:03:13 UTC

svn commit: r687010 - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java

Author: aidan
Date: Tue Aug 19 03:03:07 2008
New Revision: 687010

URL: http://svn.apache.org/viewvc?rev=687010&view=rev
Log:
QPID-1202: Rebind durable subscriptions if the arguments have changed

TopicExchange: take field arguments into account when determining if topic binding already exists when binding, but not for regular isBound().

DurableSubscriptionTest: add test case for QPID-1202

Modified:
    incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
    incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java

Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java?rev=687010&r1=687009&r2=687010&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java Tue Aug 19 03:03:07 2008
@@ -115,11 +115,13 @@
     {
         private final AMQShortString _bindingKey;
         private final AMQQueue _queue;
+        private final FieldTable _args;
 
-        public Binding(AMQShortString bindingKey, AMQQueue queue)
+        public Binding(AMQShortString bindingKey, AMQQueue queue, FieldTable args)
         {
             _bindingKey = bindingKey;
             _queue = queue;
+            _args = args;
         }
 
         public AMQShortString getBindingKey()
@@ -134,7 +136,7 @@
 
         public int hashCode()
         {
-            return (_bindingKey == null ? 1 : _bindingKey.hashCode())*31 + _queue.hashCode();
+            return (_bindingKey == null ? 1 : _bindingKey.hashCode())*31 +_queue.hashCode();
         }
 
         public boolean equals(Object o)
@@ -382,7 +384,7 @@
             routingKey = rKey;
         }
 
-        Binding binding = new Binding(rKey, queue);
+        Binding binding = new Binding(rKey, queue, args);
 
         if(_bindings.containsKey(binding))
         {
@@ -544,14 +546,29 @@
 
     public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
     {
-        return isBound(routingKey, queue);
+        Binding binding = new Binding(routingKey, queue, arguments);
+        if (arguments == null)
+        {
+            return _bindings.containsKey(binding);
+        }
+        else
+        {
+            FieldTable o = _bindings.get(binding);
+            if (o != null)
+            {
+                return o.equals(arguments);
+            }
+            else
+            {
+                return false;
+            }
+            
+        }
     }
 
     public boolean isBound(AMQShortString routingKey, AMQQueue queue)
     {
-        Binding binding = new Binding(routingKey, queue);
-
-        return _bindings.containsKey(binding);
+        return isBound(routingKey, null, queue);
     }
 
     public boolean isBound(AMQShortString routingKey)
@@ -590,7 +607,7 @@
         assert queue != null;
         assert rKey != null;
 
-        Binding binding = new Binding(rKey, queue);
+        Binding binding = new Binding(rKey, queue, args);
 
 
         if (!_bindings.containsKey(binding))

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java?rev=687010&r1=687009&r2=687010&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/test/unit/topic/DurableSubscriptionTest.java Tue Aug 19 03:03:07 2008
@@ -348,6 +348,66 @@
         session.unsubscribe("testDurableWithInvalidDestinationsub");
     }
     
+    /**
+     * Tests QPID-1202
+     * Creates a durable subscription with a selector, then changes that selector on resubscription
+     * @throws Exception 
+     */
+    public void testResubscribeWithChangedSelector() throws Exception
+    {
+        Connection conn = getConnection();
+        conn.start();
+        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        AMQTopic topic = new AMQTopic((AMQConnection) conn, "testResubscribeWithChangedSelector");
+        MessageProducer producer = session.createProducer(topic);
+        
+        // Create durable subscriber that matches A
+        TopicSubscriber subA = session.createDurableSubscriber(topic, 
+                "testResubscribeWithChangedSelector",
+                "Match = True", false);
+
+        // Send 1 matching message and 1 non-matching message
+        sendMatchingAndNonMatchingMessage(session, producer);
+
+        Message rMsg = subA.receive(1000);
+        assertNotNull(rMsg);
+        assertEquals("Content was wrong", 
+                     "testResubscribeWithChangedSelector1",
+                     ((TextMessage) rMsg).getText());
+        
+        rMsg = subA.receive(250);
+        assertNull(rMsg);
+        
+        // Disconnect subscriber
+        subA.close();
+        
+        // Reconnect with new selector that matches B
+        TopicSubscriber subB = session.createDurableSubscriber(topic, 
+                "testResubscribeWithChangedSelector","Match = False", false);
+        
+        
+        // Check messages are recieved properly
+        sendMatchingAndNonMatchingMessage(session, producer);
+        rMsg = subB.receive(1000);
+        assertNotNull(rMsg);
+        assertEquals("Content was wrong", 
+                     "testResubscribeWithChangedSelector2",
+                     ((TextMessage) rMsg).getText());
+        
+        rMsg = subB.receive(250);
+        assertNull(rMsg);
+    }
+
+    private void sendMatchingAndNonMatchingMessage(Session session, MessageProducer producer) throws JMSException
+    {
+        TextMessage msg = session.createTextMessage("testResubscribeWithChangedSelector1");
+        msg.setBooleanProperty("Match", true);
+        producer.send(msg);
+        msg = session.createTextMessage("testResubscribeWithChangedSelector2");
+        msg.setBooleanProperty("Match", false);
+        producer.send(msg);
+    }
+    
     public static junit.framework.Test suite()
     {
         return new junit.framework.TestSuite(DurableSubscriptionTest.class);