You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/02/08 11:14:51 UTC
svn commit: r504849 -
/incubator/qpid/branches/qpid.0-9/python/tests/message.py
Author: gsim
Date: Thu Feb 8 02:14:50 2007
New Revision: 504849
URL: http://svn.apache.org/viewvc?view=rev&rev=504849
Log:
Fixes to qos and get tests. Added test for correct completion of references.
Modified:
incubator/qpid/branches/qpid.0-9/python/tests/message.py
Modified: incubator/qpid/branches/qpid.0-9/python/tests/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/tests/message.py?view=diff&rev=504849&r1=504848&r2=504849
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/tests/message.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/tests/message.py Thu Feb 8 02:14:50 2007
@@ -267,7 +267,7 @@
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.add(msg)
+ msgs.append(msg)
try:
extra = queue.get(timeout=1)
self.fail("Got unexpected 6th message in original queue: " + extra.body)
@@ -277,16 +277,16 @@
#todo: once batching is implmented, send a single response for all messages
for msg in msgs:
msg.ok()
- msgs.clear()
+ del msgs
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.add(msg)
+ msgs.append(msg)
for msg in msgs:
msg.ok()
- msgs.clear()
+ del msgs
try:
extra = queue.get(timeout=1)
@@ -317,7 +317,8 @@
for i in range(1, 6):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.add(msg)
+ print "Got Message %d" % i
+ msgs.append(msg)
try:
extra = queue.get(timeout=1)
@@ -327,16 +328,16 @@
#ack messages and check that the next set arrive ok:
for msg in msgs:
msg.ok()
- msgs.clear()
+ del msgs
for i in range(6, 11):
msg = queue.get(timeout=1)
self.assertEqual("Message %d" % i, msg.body)
- msgs.add(msg)
+ msgs.append(msg)
for msg in msgs:
msg.ok()
- msgs.clear()
+ del msgs
try:
extra = queue.get(timeout=1)
@@ -363,12 +364,13 @@
#use message_get to read back the messages, and check that we get an empty at the end
for i in range(1, 11):
- reply = channel.message_get(no_ack=True)
+ tag = "queue %d" % i
+ reply = channel.message_get(no_ack=True, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
- self.assertEqual(reply.method.name, "ok")
- self.assertEqual("Message %d" % i, reply.body)
+ self.assertEqual(reply.method.name, "ok")
+ self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
- reply = channel.message_get(no_ack=True)
+ reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "get-empty")
@@ -377,10 +379,11 @@
channel.message_transfer(routing_key="test-get", body="Message %d" % i)
for i in range(11, 21):
- reply = channel.message_get(no_ack=False)
+ tag = "queue %d" % i
+ reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "ok")
- self.assertEqual("Message %d" % i, reply.body)
+ self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
reply.ok()
#todo: when batching is available, test ack multiple
@@ -389,7 +392,7 @@
#if(i in [15, 17, 19]):
# channel.message_ack(delivery_tag=reply.delivery_tag)
- reply = channel.message_get(no_ack=True)
+ reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "get-empty")
@@ -398,20 +401,21 @@
#get the unacked messages again (14, 16, 18, 20)
for i in [14, 16, 18, 20]:
- reply = channel.message_get(no_ack=False)
+ tag = "queue %d" % i
+ reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "ok")
- self.assertEqual("Message %d" % i, reply.body)
+ self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
reply.ok()
#channel.message_ack(delivery_tag=reply.delivery_tag)
- reply = channel.message_get(no_ack=True)
+ reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "get-empty")
channel.message_recover(requeue=True)
- reply = channel.message_get(no_ack=True)
+ reply = channel.message_get(no_ack=True, queue="test-get")
self.assertEqual(reply.method.klass.name, "message")
self.assertEqual(reply.method.name, "get-empty")
@@ -477,3 +481,43 @@
msg = queue.get(timeout=1)
self.assertTrue(isinstance(msg, Reference))
self.assertEquals(data, msg.get_complete())
+
+ def test_reference_completion(self):
+ """
+ Test that reference transfer are not deemed complete until
+ closed (therefore are not acked or routed until that point)
+ """
+ channel = self.channel
+ channel.queue_declare(queue="ref_queue", exclusive=True)
+ channel.message_consume(queue="ref_queue", destination="c1")
+ queue = self.client.queue("c1")
+
+ refId = "myref"
+ channel.message_open(reference=refId)
+ channel.message_append(reference=refId, bytes="abcd")
+ channel.synchronous = False
+ ack = channel.message_transfer(routing_key="ref_queue", body=ReferenceId(refId))
+ channel.synchronous = True
+
+ try:
+ msg = queue.get(timeout=1)
+ self.fail("Got unexpected message on queue: " + msg)
+ except Empty: None
+
+ self.assertTrue(not ack.is_complete())
+
+ channel.message_close(reference=refId)
+
+ #first, wait for the ok for the transfer
+ ack.get_response(timeout=1)
+
+ msg = queue.get(timeout=1)
+ if isinstance(msg, Reference):
+ #should we force broker to deliver as reference by frame
+ #size limit? or test that separately? for compliance,
+ #allowing either seems best for now...
+ data = msg.get_complete()
+ else:
+ data = msg.body
+ self.assertEquals("abcdefghijkl", data)
+