You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/02/12 16:15:26 UTC

svn commit: r506477 - in /incubator/qpid/branches/qpid.0-9/python: qpid/client.py tests/message.py

Author: gsim
Date: Mon Feb 12 07:15:25 2007
New Revision: 506477

URL: http://svn.apache.org/viewvc?view=rev&rev=506477
Log:
* qpid/client.py - altered handling of transfer for references, 
                   add the transfer to queue as before and require 
                   client to look up the reference themselves. this 
                   lets application access the other transfer fields
* tests/message.py - added some more tests for references


Modified:
    incubator/qpid/branches/qpid.0-9/python/qpid/client.py
    incubator/qpid/branches/qpid.0-9/python/tests/message.py

Modified: incubator/qpid/branches/qpid.0-9/python/qpid/client.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/qpid/client.py?view=diff&rev=506477&r1=506476&r2=506477
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/qpid/client.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/qpid/client.py Mon Feb 12 07:15:25 2007
@@ -111,10 +111,7 @@
     self.client.started.set()
 
   def message_transfer(self, ch, msg):
-    if isinstance(msg.body, ReferenceId):
-      self.client.queue(msg.destination).put(ch.references.get(msg.body.id))
-    else:
-      self.client.queue(msg.destination).put(msg)
+    self.client.queue(msg.destination).put(msg)
 
   def message_open(self, ch, msg):
     ch.references.open(msg.reference)

Modified: incubator/qpid/branches/qpid.0-9/python/tests/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/python/tests/message.py?view=diff&rev=506477&r1=506476&r2=506477
==============================================================================
--- incubator/qpid/branches/qpid.0-9/python/tests/message.py (original)
+++ incubator/qpid/branches/qpid.0-9/python/tests/message.py Mon Feb 12 07:15:25 2007
@@ -445,15 +445,7 @@
         #first, wait for the ok for the transfer
         ack.get_response(timeout=1)
         
-        msg = queue.get(timeout=1)
-        if isinstance(msg, Reference):
-            #should we force broker to deliver as reference by frame
-            #size limit? or test that separately? for compliance, 
-            #allowing either seems best for now...            
-            data = msg.get_complete()
-        else:
-            data = msg.body
-        self.assertEquals("abcdefghijkl", data)
+        self.assertDataEquals(channel, queue.get(timeout=1), "abcdefghijkl")
 
 
     def test_reference_large(self):
@@ -482,8 +474,8 @@
         queue = other.queue("c1")
         
         msg = queue.get(timeout=1)
-        self.assertTrue(isinstance(msg, Reference))
-        self.assertEquals(data, msg.get_complete())
+        self.assertTrue(isinstance(msg.body, ReferenceId))
+        self.assertEquals(data, ch2.references.get(msg.body.id).get_complete())
 
     def test_reference_completion(self):
         """
@@ -514,13 +506,126 @@
         #first, wait for the ok for the transfer
         ack.get_response(timeout=1)
         
+        self.assertDataEquals(channel, queue.get(timeout=1), "abcd")
+
+    def test_reference_multi_transfer(self):
+        """
+        Test that multiple transfer requests for the same reference are
+        correctly handled.
+        """
+        channel = self.channel
+        #declare and consume from two queues
+        channel.queue_declare(queue="q-one", exclusive=True)
+        channel.queue_declare(queue="q-two", exclusive=True)
+        channel.message_consume(queue="q-one", destination="q-one")
+        channel.message_consume(queue="q-two", destination="q-two")
+        queue1 = self.client.queue("q-one")
+        queue2 = self.client.queue("q-two")
+
+        #transfer a single ref to both queues (in separate commands)
+        channel.message_open(reference="my-ref")
+        channel.synchronous = False
+        ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref"))
+        channel.message_append(reference="my-ref", bytes="my data")
+        ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref"))
+        channel.synchronous = True
+        channel.message_close(reference="my-ref")
+
+        #check that both queues have the message
+        self.assertDataEquals(channel, queue1.get(timeout=1), "my data")
+        self.assertDataEquals(channel, queue2.get(timeout=1), "my data")
+        self.assertEmpty(queue1)
+        self.assertEmpty(queue2)
+
+        #transfer a single ref to the same queue twice (in separate commands)
+        channel.message_open(reference="my-ref")
+        channel.synchronous = False
+        ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref"))
+        channel.message_append(reference="my-ref", bytes="second message")
+        ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref"))
+        channel.synchronous = True
+        channel.message_close(reference="my-ref")
+
+        msg1 = queue1.get(timeout=1)
+        msg2 = queue1.get(timeout=1)
+        #order is undefined
+        if msg1.message_id == "abc":
+            self.assertEquals(msg2.message_id, "xyz")
+        else:
+            self.assertEquals(msg1.message_id, "xyz")
+            self.assertEquals(msg2.message_id, "abc")
+            
+        #would be legal for the incoming messages to be transfered
+        #inline or by reference in any combination
+        
+        if isinstance(msg1.body, ReferenceId):            
+            self.assertEquals("second message", channel.references.get(msg1.body.id).get_complete())
+            if isinstance(msg2.body, ReferenceId):
+                if msg1.body != msg2.body:
+                    self.assertEquals("second message", channel.references.get(msg2.body.id).get_complete())
+                #else ok, as same ref as msg1    
+        else:
+            self.assertEquals("second message", msg1.body)
+            if isinstance(msg2.body, ReferenceId):
+                self.assertEquals("second message", channel.references.get(msg2.body.id).get_complete())
+            else:
+                self.assertEquals("second message", msg2.body)                
+
+        self.assertEmpty(queue1)
+
+    def test_reference_unopened_on_append_error(self):
+        channel = self.channel
+        try:
+            channel.message_append(reference="unopened")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def test_reference_unopened_on_close_error(self):
+        channel = self.channel
+        try:
+            channel.message_close(reference="unopened")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def test_reference_unopened_on_transfer_error(self):
+        channel = self.channel
+        try:
+            channel.message_transfer(body=ReferenceId("unopened"))
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def test_reference_already_opened_error(self):
+        channel = self.channel
+        channel.message_open(reference="a")
+        try:
+            channel.message_open(reference="a")            
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def test_empty_reference(self):
+        channel = self.channel
+        channel.queue_declare(queue="ref_queue", exclusive=True)
+        channel.message_consume(queue="ref_queue", destination="c1")
+        queue = self.client.queue("c1")
+
+        refId = "myref"
+        channel.message_open(reference=refId)
+        channel.synchronous = False
+        ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId))
+        channel.synchronous = True
+        channel.message_close(reference=refId)
+
+        #first, wait for the ok for the transfer
+        ack.get_response(timeout=1)
+
         msg = queue.get(timeout=1)
-        if isinstance(msg, Reference):
-            #should we force broker to deliver as reference by frame
-            #size limit? or test that separately? for compliance, 
-            #allowing either seems best for now...            
-            data = msg.get_complete()
+        self.assertEquals(msg.message_id, "empty-msg")
+        self.assertDataEquals(channel, msg, "")
+        
+        
+    def assertDataEquals(self, channel, msg, expected):
+        if isinstance(msg.body, ReferenceId):
+            data = channel.references.get(msg.body.id).get_complete()
         else:
             data = msg.body
-        self.assertEquals("abcdefghijkl", data)
-
+        self.assertEquals(expected, data)