You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/02/12 16:15:26 UTC
svn commit: r506477 - in /incubator/qpid/branches/qpid.0-9/python:
qpid/client.py tests/message.py
Author: gsim
Date: Mon Feb 12 07:15:25 2007
New Revision: 506477
URL: http://svn.apache.org/viewvc?view=rev&rev=506477
Log:
* qpid/client.py - altered handling of transfer for references,
add the transfer to queue as before and require
client to look up the reference themselves. this
lets application access the other transfer fields
* tests/message.py - added some more tests for references
Modified:
incubator/qpid/branches/qpid.0-9/python/qpid/client.py
incubator/qpid/branches/qpid.0-9/python/tests/message.py
Modified: incubator/qpid/branches/qpid.0-9/python/qpid/client.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/qpid/client.py?view=diff&rev=506477&r1=506476&r2=506477
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/qpid/client.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/qpid/client.py Mon Feb 12 07:15:25 2007
@@ -111,10 +111,7 @@
self.client.started.set()
def message_transfer(self, ch, msg):
- if isinstance(msg.body, ReferenceId):
- self.client.queue(msg.destination).put(ch.references.get(msg.body.id))
- else:
- self.client.queue(msg.destination).put(msg)
+ self.client.queue(msg.destination).put(msg)
def message_open(self, ch, msg):
ch.references.open(msg.reference)
Modified: incubator/qpid/branches/qpid.0-9/python/tests/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/tests/message.py?view=diff&rev=506477&r1=506476&r2=506477
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/tests/message.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/tests/message.py Mon Feb 12 07:15:25 2007
@@ -445,15 +445,7 @@
#first, wait for the ok for the transfer
ack.get_response(timeout=1)
- msg = queue.get(timeout=1)
- if isinstance(msg, Reference):
- #should we force broker to deliver as reference by frame
- #size limit? or test that separately? for compliance,
- #allowing either seems best for now...
- data = msg.get_complete()
- else:
- data = msg.body
- self.assertEquals("abcdefghijkl", data)
+ self.assertDataEquals(channel, queue.get(timeout=1), "abcdefghijkl")
def test_reference_large(self):
@@ -482,8 +474,8 @@
queue = other.queue("c1")
msg = queue.get(timeout=1)
- self.assertTrue(isinstance(msg, Reference))
- self.assertEquals(data, msg.get_complete())
+ self.assertTrue(isinstance(msg.body, ReferenceId))
+ self.assertEquals(data, ch2.references.get(msg.body.id).get_complete())
def test_reference_completion(self):
"""
@@ -514,13 +506,126 @@
#first, wait for the ok for the transfer
ack.get_response(timeout=1)
+ self.assertDataEquals(channel, queue.get(timeout=1), "abcd")
+
+ def test_reference_multi_transfer(self):
+ """
+ Test that multiple transfer requests for the same reference are
+ correctly handled.
+ """
+ channel = self.channel
+ #declare and consume from two queues
+ channel.queue_declare(queue="q-one", exclusive=True)
+ channel.queue_declare(queue="q-two", exclusive=True)
+ channel.message_consume(queue="q-one", destination="q-one")
+ channel.message_consume(queue="q-two", destination="q-two")
+ queue1 = self.client.queue("q-one")
+ queue2 = self.client.queue("q-two")
+
+ #transfer a single ref to both queues (in separate commands)
+ channel.message_open(reference="my-ref")
+ channel.synchronous = False
+ ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref"))
+ channel.message_append(reference="my-ref", bytes="my data")
+ ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ #check that both queues have the message
+ self.assertDataEquals(channel, queue1.get(timeout=1), "my data")
+ self.assertDataEquals(channel, queue2.get(timeout=1), "my data")
+ self.assertEmpty(queue1)
+ self.assertEmpty(queue2)
+
+ #transfer a single ref to the same queue twice (in separate commands)
+ channel.message_open(reference="my-ref")
+ channel.synchronous = False
+ ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref"))
+ channel.message_append(reference="my-ref", bytes="second message")
+ ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref"))
+ channel.synchronous = True
+ channel.message_close(reference="my-ref")
+
+ msg1 = queue1.get(timeout=1)
+ msg2 = queue1.get(timeout=1)
+ #order is undefined
+ if msg1.message_id == "abc":
+ self.assertEquals(msg2.message_id, "xyz")
+ else:
+ self.assertEquals(msg1.message_id, "xyz")
+ self.assertEquals(msg2.message_id, "abc")
+
+ #would be legal for the incoming messages to be transfered
+ #inline or by reference in any combination
+
+ if isinstance(msg1.body, ReferenceId):
+ self.assertEquals("second message", channel.references.get(msg1.body.id).get_complete())
+ if isinstance(msg2.body, ReferenceId):
+ if msg1.body != msg2.body:
+ self.assertEquals("second message", channel.references.get(msg2.body.id).get_complete())
+ #else ok, as same ref as msg1
+ else:
+ self.assertEquals("second message", msg1.body)
+ if isinstance(msg2.body, ReferenceId):
+ self.assertEquals("second message", channel.references.get(msg2.body.id).get_complete())
+ else:
+ self.assertEquals("second message", msg2.body)
+
+ self.assertEmpty(queue1)
+
+ def test_reference_unopened_on_append_error(self):
+ channel = self.channel
+ try:
+ channel.message_append(reference="unopened")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_unopened_on_close_error(self):
+ channel = self.channel
+ try:
+ channel.message_close(reference="unopened")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_unopened_on_transfer_error(self):
+ channel = self.channel
+ try:
+ channel.message_transfer(body=ReferenceId("unopened"))
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_reference_already_opened_error(self):
+ channel = self.channel
+ channel.message_open(reference="a")
+ try:
+ channel.message_open(reference="a")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def test_empty_reference(self):
+ channel = self.channel
+ channel.queue_declare(queue="ref_queue", exclusive=True)
+ channel.message_consume(queue="ref_queue", destination="c1")
+ queue = self.client.queue("c1")
+
+ refId = "myref"
+ channel.message_open(reference=refId)
+ channel.synchronous = False
+ ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId))
+ channel.synchronous = True
+ channel.message_close(reference=refId)
+
+ #first, wait for the ok for the transfer
+ ack.get_response(timeout=1)
+
msg = queue.get(timeout=1)
- if isinstance(msg, Reference):
- #should we force broker to deliver as reference by frame
- #size limit? or test that separately? for compliance,
- #allowing either seems best for now...
- data = msg.get_complete()
+ self.assertEquals(msg.message_id, "empty-msg")
+ self.assertDataEquals(channel, msg, "")
+
+
+ def assertDataEquals(self, channel, msg, expected):
+ if isinstance(msg.body, ReferenceId):
+ data = channel.references.get(msg.body.id).get_complete()
else:
data = msg.body
- self.assertEquals("abcdefghijkl", data)
-
+ self.assertEquals(expected, data)