You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/09/25 14:51:10 UTC
svn commit: r579229 - in /incubator/qpid/branches/M2.1:
java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
python/tests/basic.py
Author: ritchiem
Date: Tue Sep 25 05:51:09 2007
New Revision: 579229
URL: http://svn.apache.org/viewvc?rev=579229&view=rev
Log:
QPID-610 : Fix for Get NO_ACK leak. The Java Client doesn't use get so augmented the python test_get to send persistent messages and used debugger to verify that messages were correctly removed. Verified that prior to this commit they would remain in the store. We need a management exchange to fully validate this with a python tests.
NOTE: The setting of "delivery mode" property on M2.1 is not the same as on trunk where _ is use such as "delivery_mode".
There is also no error that you have sent an incorrect property.
Modified:
incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/branches/M2.1/python/tests/basic.py
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=579229&r1=579228&r2=579229&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Sep 25 05:51:09 2007
@@ -330,6 +330,11 @@
deliveryTag, _queue.getMessageCount());
_totalMessageSize.addAndGet(-msg.getSize());
}
+
+ if (!acks)
+ {
+ msg.decrementReference(channel.getStoreContext());
+ }
}
finally
{
Modified: incubator/qpid/branches/M2.1/python/tests/basic.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/python/tests/basic.py?rev=579229&r1=579228&r2=579229&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/python/tests/basic.py (original)
+++ incubator/qpid/branches/M2.1/python/tests/basic.py Tue Sep 25 05:51:09 2007
@@ -339,9 +339,11 @@
channel = self.channel
channel.queue_declare(queue="test-get", exclusive=True)
- #publish some messages (no_ack=True)
+ #publish some messages (no_ack=True) with persistent messaging
for i in range(1, 11):
- channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+ msg=Content("Message %d" % i)
+ msg["delivery mode"] = 2
+ channel.basic_publish(routing_key="test-get",content=msg )
#use basic_get to read back the messages, and check that we get an empty at the end
for i in range(1, 11):
@@ -354,18 +356,53 @@
self.assertEqual(reply.method.klass.name, "basic")
self.assertEqual(reply.method.name, "get-empty")
- #repeat for no_ack=False
+
+ #publish some messages (no_ack=True) transient messaging
for i in range(11, 21):
channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+ #use basic_get to read back the messages, and check that we get an empty at the end
for i in range(11, 21):
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-ok")
+ self.assertEqual("Message %d" % i, reply.content.body)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-empty")
+
+ #repeat for no_ack=False
+
+ #publish some messages (no_ack=False) with persistent messaging
+ for i in range(21, 31):
+ msg=Content("Message %d" % i)
+ msg["delivery mode"] = 2
+ channel.basic_publish(routing_key="test-get",content=msg )
+
+ #use basic_get to read back the messages, and check that we get an empty at the end
+ for i in range(21, 31):
+ reply = channel.basic_get(no_ack=False)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-ok")
+ self.assertEqual("Message %d" % i, reply.content.body)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get-empty")
+
+ #public some messages (no_ack=False) with transient messaging
+ for i in range(31, 41):
+ channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+
+ for i in range(31, 41):
reply = channel.basic_get(no_ack=False)
self.assertEqual(reply.method.klass.name, "basic")
self.assertEqual(reply.method.name, "get-ok")
self.assertEqual("Message %d" % i, reply.content.body)
- if(i == 13):
+ if(i == 33):
channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True)
- if(i in [15, 17, 19]):
+ if(i in [35, 37, 39]):
channel.basic_ack(delivery_tag=reply.delivery_tag)
reply = channel.basic_get(no_ack=True)
@@ -375,8 +412,8 @@
#recover(requeue=True)
channel.basic_recover(requeue=True)
- #get the unacked messages again (14, 16, 18, 20)
- for i in [14, 16, 18, 20]:
+ #get the unacked messages again (34, 36, 38, 40)
+ for i in [34, 36, 38, 40]:
reply = channel.basic_get(no_ack=False)
self.assertEqual(reply.method.klass.name, "basic")
self.assertEqual(reply.method.name, "get-ok")