You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/09/25 14:51:10 UTC

svn commit: r579229 - in /incubator/qpid/branches/M2.1: java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java python/tests/basic.py

Author: ritchiem
Date: Tue Sep 25 05:51:09 2007
New Revision: 579229

URL: http://svn.apache.org/viewvc?rev=579229&view=rev
Log:
QPID-610 : Fix for Get NO_ACK leak. The Java Client doesn't use get so augmented the python test_get to send persistent messages and used debugger to verify that messages were correctly removed. Verified that prior to this commit they would remain in the store. We need a management exchange to fully validate this with a python tests.
NOTE: The setting of "delivery mode" property on M2.1 is not the same as on trunk where _ is use such as "delivery_mode".
There is also no error that you have sent an incorrect property.

Modified:
    incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
    incubator/qpid/branches/M2.1/python/tests/basic.py

Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=579229&r1=579228&r2=579229&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Sep 25 05:51:09 2007
@@ -330,6 +330,11 @@
                                                                             deliveryTag, _queue.getMessageCount());
                     _totalMessageSize.addAndGet(-msg.getSize());
                 }
+
+                if (!acks)
+                {
+                   msg.decrementReference(channel.getStoreContext());
+                }
             }
             finally
             {

Modified: incubator/qpid/branches/M2.1/python/tests/basic.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/python/tests/basic.py?rev=579229&r1=579228&r2=579229&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/python/tests/basic.py (original)
+++ incubator/qpid/branches/M2.1/python/tests/basic.py Tue Sep 25 05:51:09 2007
@@ -339,9 +339,11 @@
         channel = self.channel
         channel.queue_declare(queue="test-get", exclusive=True)
         
-        #publish some messages (no_ack=True)
+        #publish some messages (no_ack=True) with persistent messaging
         for i in range(1, 11):
-            channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+            msg=Content("Message %d" % i)
+            msg["delivery mode"] = 2
+            channel.basic_publish(routing_key="test-get",content=msg )
 
         #use basic_get to read back the messages, and check that we get an empty at the end
         for i in range(1, 11):
@@ -354,18 +356,53 @@
         self.assertEqual(reply.method.klass.name, "basic")
         self.assertEqual(reply.method.name, "get-empty")
 
-        #repeat for no_ack=False
+
+        #publish some messages (no_ack=True) transient messaging
         for i in range(11, 21):
             channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
 
+        #use basic_get to read back the messages, and check that we get an empty at the end
         for i in range(11, 21):
+            reply = channel.basic_get(no_ack=True)
+            self.assertEqual(reply.method.klass.name, "basic")
+            self.assertEqual(reply.method.name, "get-ok")
+            self.assertEqual("Message %d" % i, reply.content.body)
+
+        reply = channel.basic_get(no_ack=True)
+        self.assertEqual(reply.method.klass.name, "basic")
+        self.assertEqual(reply.method.name, "get-empty")
+
+        #repeat for no_ack=False
+
+        #publish some messages (no_ack=False) with persistent messaging
+        for i in range(21, 31):
+            msg=Content("Message %d" % i)
+            msg["delivery mode"] = 2
+            channel.basic_publish(routing_key="test-get",content=msg )
+
+        #use basic_get to read back the messages, and check that we get an empty at the end
+        for i in range(21, 31):
+            reply = channel.basic_get(no_ack=False)
+            self.assertEqual(reply.method.klass.name, "basic")
+            self.assertEqual(reply.method.name, "get-ok")
+            self.assertEqual("Message %d" % i, reply.content.body)
+
+        reply = channel.basic_get(no_ack=True)
+        self.assertEqual(reply.method.klass.name, "basic")
+        self.assertEqual(reply.method.name, "get-empty")
+
+        #public some messages (no_ack=False) with transient messaging 
+        for i in range(31, 41):
+            channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+
+        for i in range(31, 41):
             reply = channel.basic_get(no_ack=False)
             self.assertEqual(reply.method.klass.name, "basic")
             self.assertEqual(reply.method.name, "get-ok")
             self.assertEqual("Message %d" % i, reply.content.body)
-            if(i == 13):
+            if(i == 33):
                 channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True)
-            if(i in [15, 17, 19]):
+            if(i in [35, 37, 39]):
                 channel.basic_ack(delivery_tag=reply.delivery_tag)
 
         reply = channel.basic_get(no_ack=True)
@@ -375,8 +412,8 @@
         #recover(requeue=True)
         channel.basic_recover(requeue=True)
         
-        #get the unacked messages again (14, 16, 18, 20)
-        for i in [14, 16, 18, 20]:
+        #get the unacked messages again (34, 36, 38, 40)
+        for i in [34, 36, 38, 40]:
             reply = channel.basic_get(no_ack=False)
             self.assertEqual(reply.method.klass.name, "basic")
             self.assertEqual(reply.method.name, "get-ok")