You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/03/19 21:34:33 UTC

svn commit: r520094 - in /incubator/qpid/trunk/qpid: ./ python/ python/qpid/ python/tests/

Author: aconway
Date: Mon Mar 19 13:34:32 2007
New Revision: 520094

URL: http://svn.apache.org/viewvc?view=rev&rev=520094
Log:
Merged revisions 507491-507559,507561-507601,507603-507621,507623-507671,507673-507959,507961-507992,507994-508097,508099-508149,508151-508155,508157-508232,508234-508378,508380-508390,508392-508459,508461-508704,508707-509615,509617-509737,509739-509753,509756-509833,509835-510106,510108-510160,510162-510179,510181-510552,510554-510704,510706-510911,510913-510985,510987-511003,511005-514750,514752-515720,515722-516156,516158-516458,516461-516484,516486-516488,516490-517823,517825,517827,517829,517831-517832,517834-517848,517850,517852-517854,517856-517858,517860-517877,517879-517886,517888-517891,517893-517903,517905,517907-517928,517930,517932-518197,518201-518206,518208-518230,518232,518235,518237,518239-518240,518243-518245,518247-518255,518257,518259-518260,518262,518264,518266-518292,518294-518707 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9

........
  r507491 | gsim | 2007-02-14 06:39:26 -0500 (Wed, 14 Feb 2007) | 3 lines
  
  Expanded the use of batched acks to a few other places in tests.
........
  r508377 | gsim | 2007-02-16 07:03:37 -0500 (Fri, 16 Feb 2007) | 4 lines
  
  Updated failing list for java (some of these result in test suite blocking)
  Added better error handling when connection closes without close method
........
  r508396 | gsim | 2007-02-16 08:54:54 -0500 (Fri, 16 Feb 2007) | 3 lines
  
  Fix: use message_resume not channel_resume
........
  r509611 | gsim | 2007-02-20 10:39:14 -0500 (Tue, 20 Feb 2007) | 3 lines
  
  Fixed bug where response id rather than request id was being used to get listener for response.
........
  r509617 | gsim | 2007-02-20 10:52:31 -0500 (Tue, 20 Feb 2007) | 3 lines
  
  Updated list of failing tests for java broker on this branch.
........
  r510096 | gsim | 2007-02-21 11:49:27 -0500 (Wed, 21 Feb 2007) | 5 lines
  
  Fixed bug in references where map wasn't qualified in close
  Attach reference to transfer, as it will be deleted on close
  Altered tests to get reference from the message on the queue rather than looking them up from channel as they are already gone there
........
  r510114 | astitcher | 2007-02-21 12:37:36 -0500 (Wed, 21 Feb 2007) | 3 lines
  
   r1224@fuschia:  andrew | 2007-02-21 17:20:59 +0000
   Updated expected cpp broker test failures
........
  r510128 | gsim | 2007-02-21 13:06:02 -0500 (Wed, 21 Feb 2007) | 3 lines
  
  Ensure socket is closed in tearDown
........
  r510913 | gsim | 2007-02-23 06:37:08 -0500 (Fri, 23 Feb 2007) | 3 lines
  
  Revised list of failing tests for java broker on this branch
........
  r515363 | aconway | 2007-03-06 18:35:08 -0500 (Tue, 06 Mar 2007) | 6 lines
  
  * python/qpid/peer.py (Channel.__init__): use reliable framing if
    version >= 0-8.
  * python/qpid/spec.py (Spec.__init__): Remove unused parameter.
  * python/qpid/testlib.py (TestRunner._parseargs): Add --errata option,
    set default errata only if --spec is not present.
........
  r518707 | aconway | 2007-03-15 13:49:44 -0400 (Thu, 15 Mar 2007) | 6 lines
  
  
  * python/qpid/peer.py (Peer.close): Close delegate *before* channels.
    Otherwise we get a race: closing a channel can wake a client thread,
    which may see client.closed as still false. Was causing bogus exceptions
    in some tests.
........

Modified:
    incubator/qpid/trunk/qpid/   (props changed)
    incubator/qpid/trunk/qpid/python/cpp_failing.txt
    incubator/qpid/trunk/qpid/python/java_failing.txt
    incubator/qpid/trunk/qpid/python/qpid/client.py
    incubator/qpid/trunk/qpid/python/qpid/connection.py
    incubator/qpid/trunk/qpid/python/qpid/peer.py
    incubator/qpid/trunk/qpid/python/qpid/reference.py
    incubator/qpid/trunk/qpid/python/qpid/spec.py
    incubator/qpid/trunk/qpid/python/qpid/testlib.py
    incubator/qpid/trunk/qpid/python/tests/message.py
    incubator/qpid/trunk/qpid/python/tests/tx.py

Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
    svk:merge = 8427bd24-ae5a-4eba-a324-d2fc9c9c6c77:/local/qpid.0-9.ams:1224

Propchange: incubator/qpid/trunk/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: incubator/qpid/trunk/qpid/python/cpp_failing.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/cpp_failing.txt?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/cpp_failing.txt (original)
+++ incubator/qpid/trunk/qpid/python/cpp_failing.txt Mon Mar 19 13:34:32 2007
@@ -0,0 +1,4 @@
+tests.message.MessageTests.test_checkpoint
+tests.message.MessageTests.test_reference_large
+tests.message.MessageTests.test_reject
+tests.basic.BasicTests.test_get

Modified: incubator/qpid/trunk/qpid/python/java_failing.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/java_failing.txt?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/java_failing.txt (original)
+++ incubator/qpid/trunk/qpid/python/java_failing.txt Mon Mar 19 13:34:32 2007
@@ -1,4 +1,4 @@
-tests.basic.BasicTests.test_qos_prefetch_count 
+ntests.basic.BasicTests.test_qos_prefetch_count 
 tests.basic.BasicTests.test_ack
 tests.basic.BasicTests.test_cancel
 tests.basic.BasicTests.test_consume_exclusive
@@ -8,11 +8,11 @@
 tests.basic.BasicTests.test_get
 tests.basic.BasicTests.test_qos_prefetch_size
 tests.basic.BasicTests.test_recover_requeue
+
 tests.exchange.RecommendedTypesRuleTests.testTopic
 tests.exchange.RequiredInstancesRuleTests.testAmqTopic
 
-tests.message.MessageTests.test_qos_prefetch_count 
-tests.message.MessageTests.test_ack
-tests.message.MessageTests.test_get
-tests.message.MessageTests.test_qos_prefetch_size
-tests.message.MessageTests.test_recover_requeue
+tests.message.MessageTests.test_checkpoint
+tests.message.MessageTests.test_reject
+
+tests.broker.BrokerTests.test_ping_pong

Modified: incubator/qpid/trunk/qpid/python/qpid/client.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/client.py?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/client.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/client.py Mon Mar 19 13:34:32 2007
@@ -76,7 +76,8 @@
     self.locale = locale
     self.tune_params = tune_params
 
-    self.conn = Connection(connect(self.host, self.port), self.spec)
+    self.socket = connect(self.host, self.port)
+    self.conn = Connection(self.socket, self.spec)
     self.peer = Peer(self.conn, ClientDelegate(self), self.opened)
 
     self.conn.init()
@@ -90,6 +91,9 @@
   def opened(self, ch):
     ch.references = References()
 
+  def close(self):
+    self.socket.close()
+
 class ClientDelegate(Delegate):
 
   def __init__(self, client):
@@ -112,9 +116,8 @@
 
   def message_transfer(self, ch, msg):
     if isinstance(msg.body, ReferenceId):
-      self.client.queue(msg.destination).put(ch.references.get(msg.body.id))
-    else:
-      self.client.queue(msg.destination).put(msg)
+      msg.reference = ch.references.get(msg.body.id)
+    self.client.queue(msg.destination).put(msg)
 
   def message_open(self, ch, msg):
     ch.references.open(msg.reference)

Modified: incubator/qpid/trunk/qpid/python/qpid/connection.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection.py?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/connection.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/connection.py Mon Mar 19 13:34:32 2007
@@ -53,6 +53,9 @@
   def flush(self):
     pass
 
+  def close(self):
+    self.sock.shutdown(socket.SHUT_RDWR)
+
 def connect(host, port):
   sock = socket.socket()
   sock.connect((host, port))

Modified: incubator/qpid/trunk/qpid/python/qpid/peer.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/peer.py?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/peer.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/peer.py Mon Mar 19 13:34:32 2007
@@ -97,9 +97,12 @@
       self.fatal()
 
   def close(self, reason):
+    # We must close the delegate first because closing channels
+    # may wake up waiting threads and we don't want them to see
+    # the delegate as open.
+    self.delegate.close(reason)
     for ch in self.channels.values():
       ch.close(reason)
-    self.delegate.close(reason)
 
   def writer(self):
     try:
@@ -144,7 +147,7 @@
     self.write(frame, content)
 
   def receive(self, channel, frame):
-    listener = self.outstanding.pop(frame.id)
+    listener = self.outstanding.pop(frame.request_id)
     listener(channel, frame)
 
 class Responder:
@@ -178,8 +181,8 @@
     self.requester = Requester(self.write)
     self.responder = Responder(self.write)
 
-    # XXX: better switch
-    self.reliable = False
+    # Use reliable framing if version == 0-9.
+    self.reliable = (spec.major == 0 and spec.minor == 9)
     self.synchronous = True
 
   def close(self, reason):

Modified: incubator/qpid/trunk/qpid/python/qpid/reference.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/reference.py?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/reference.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/reference.py Mon Mar 19 13:34:32 2007
@@ -111,7 +111,7 @@
         self.get(id).close()
         self.lock.acquire()
         try:
-            del map[id]
+            self.map.pop(id)
         finally:
             self.lock.release()
         

Modified: incubator/qpid/trunk/qpid/python/qpid/spec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/spec.py?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/spec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/spec.py Mon Mar 19 13:34:32 2007
@@ -79,12 +79,11 @@
 
   PRINT=["major", "minor", "file"]
 
-  def __init__(self, major, minor, file, errata):
+  def __init__(self, major, minor, file):
     Metadata.__init__(self)
     self.major = major
     self.minor = minor
     self.file = file
-    self.errata = errata
     self.constants = SpecContainer()
     self.classes = SpecContainer()
     # methods indexed by classname_methname
@@ -275,7 +274,7 @@
 def load(specfile, *errata):
   doc = xmlutil.parse(specfile)
   spec_root = doc["amqp"][0]
-  spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile, errata)
+  spec = Spec(int(spec_root["@major"]), int(spec_root["@minor"]), specfile)
 
   for root in [spec_root] + map(lambda x: xmlutil.parse(x)["amqp"][0], errata):
     # constants

Modified: incubator/qpid/trunk/qpid/python/qpid/testlib.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/testlib.py?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/testlib.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/testlib.py Mon Mar 19 13:34:32 2007
@@ -26,6 +26,7 @@
 import Queue
 from getopt import getopt, GetoptError
 from qpid.content import Content
+from qpid.message import Message
 
 def findmodules(root):
     """Find potential python modules under directory root"""
@@ -56,15 +57,16 @@
 run-tests [options] [test*]
 The name of a test is package.module.ClassName.testMethod
 Options:
-  -?/-h/--help         : this message
+  -?/-h/--help             : this message
   -s/--spec <spec.xml> : URL of AMQP XML specification or one of these abbreviations:
                            0-8 - use the default 0-8 specification.
                            0-9 - use the default 0-9 specification.
+  -e/--errata <errata.xml> : file containing amqp XML errata
   -b/--broker [<user>[/<password>]@]<host>[:<port>] : broker to connect to
-  -v/--verbose         : verbose - lists tests as they are run.
-  -d/--debug           : enable debug logging.
-  -i/--ignore <test>   : ignore the named test.
-  -I/--ignore-file     : file containing patterns to ignore.
+  -v/--verbose             : verbose - lists tests as they are run.
+  -d/--debug               : enable debug logging.
+  -i/--ignore <test>       : ignore the named test.
+  -I/--ignore-file         : file containing patterns to ignore.
   """
         sys.exit(1)
 
@@ -103,24 +105,27 @@
         for opt, value in opts:
             if opt in ("-?", "-h", "--help"): self._die()
             if opt in ("-s", "--spec"): self.specfile = value
+            if opt in ("-e", "--errata"): self.errata.append(value)
             if opt in ("-b", "--broker"): self.setBroker(value)
             if opt in ("-v", "--verbose"): self.verbose = 2
             if opt in ("-d", "--debug"): logging.basicConfig(level=logging.DEBUG)
             if opt in ("-i", "--ignore"): self.ignore.append(value)
             if opt in ("-I", "--ignore-file"): self.ignoreFile(value)
+	# Abbreviations for default settings.
         if (self.specfile == "0-8"):
-          self.specfile = "../specs/amqp.0-8.xml"
+       	    self.specfile = "../specs/amqp.0-8.xml"
         if (self.specfile == "0-9"):
-          self.specfile = "../specs/amqp.0-9.xml"
-          self.errata = ["../specs/amqp-errata.0-9.xml"]
+            self.specfile = "../specs/amqp.0-9.xml"
+            self.errata.append("../specs/amqp-errata.0-9.xml")
+	if (self.specfile == None):
+	    self._die("No XML specification provided")
         print "Using specification from:", self.specfile
         self.spec = qpid.spec.load(self.specfile, *self.errata)
         if len(self.tests) == 0:
             if self.use08spec():
-                testdir="tests_0-8"
+                self.tests=findmodules("tests_0-8")
             else:
-                testdir="tests"
-            self.tests=findmodules(testdir)
+                self.tests=findmodules("tests")
 
     def testSuite(self):
         class IgnoringTestSuite(unittest.TestSuite):
@@ -137,7 +142,11 @@
         self._parseargs(args)
         runner = unittest.TextTestRunner(descriptions=False,
                                          verbosity=self.verbose)
-        result = runner.run(self.testSuite())
+        try:
+            result = runner.run(self.testSuite())
+        except:
+            print "Unhandled error in test:", sys.exc_info()
+            
         if (self.ignore):
             print "======================================="
             print "NOTE: the following tests were ignored:"
@@ -181,10 +190,18 @@
         self.channel.channel_open()
 
     def tearDown(self):
-        for ch, q in self.queues:
-            ch.queue_delete(queue=q)
-        for ch, ex in self.exchanges:
-            ch.exchange_delete(exchange=ex)
+        try:
+            for ch, q in self.queues:
+                ch.queue_delete(queue=q)
+            for ch, ex in self.exchanges:
+                ch.exchange_delete(exchange=ex)
+        except:
+            print "Error on tearDown:", sys.exc_info()
+
+        if not self.client.closed:    
+            self.client.channel(0).connection_close(reply_code=200)
+        else:    
+            self.client.close()
 
     def connect(self, *args, **keys):
         """Create a new connction, return the Client object"""
@@ -261,13 +278,15 @@
         """
         self.assertPublishGet(self.consume(queue), exchange, routing_key, properties)
 
-    def assertChannelException(self, expectedCode, message): 
+    def assertChannelException(self, expectedCode, message):
+        if not isinstance(message, Message): self.fail("expected channel_close method")
         self.assertEqual("channel", message.method.klass.name)
         self.assertEqual("close", message.method.name)
         self.assertEqual(expectedCode, message.reply_code)
 
 
     def assertConnectionException(self, expectedCode, message): 
+        if not isinstance(message, Message): self.fail("expected connection_close method")
         self.assertEqual("connection", message.method.klass.name)
         self.assertEqual("close", message.method.name)
         self.assertEqual(expectedCode, message.reply_code)

Modified: incubator/qpid/trunk/qpid/python/tests/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/message.py?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/message.py Mon Mar 19 13:34:32 2007
@@ -171,8 +171,7 @@
         self.assertEqual("Four", msg4.body)
         self.assertEqual("Five", msg5.body)
 
-        msg1.ok()
-        msg2.ok()
+        msg1.ok(batchoffset=1)#One and Two
         msg4.ok()
 
         channel.message_recover(requeue=False)
@@ -216,9 +215,8 @@
         self.assertEqual("Four", msg4.body)
         self.assertEqual("Five", msg5.body)
 
-        msg1.ok()  #One
-        msg2.ok()  #Two
-        msg4.ok()  #Two
+        msg1.ok(batchoffset=1)  #One and Two
+        msg4.ok()  #Four
 
         channel.message_cancel(destination="consumer_tag")
         channel.message_consume(queue="test-requeue", destination="consumer_tag")
@@ -263,11 +261,9 @@
             channel.message_transfer(routing_key="test-prefetch-count", body="Message %d" % i)
 
         #only 5 messages should have been delivered:
-        msgs = []
         for i in range(1, 6):
             msg = queue.get(timeout=1)
             self.assertEqual("Message %d" % i, msg.body)
-            msgs.append(msg)
         try:
             extra = queue.get(timeout=1)
             self.fail("Got unexpected 6th message in original queue: " + extra.body)
@@ -275,18 +271,13 @@
 
         #ack messages and check that the next set arrive ok:
         #todo: once batching is implmented, send a single response for all messages
-        for msg in msgs:
-            msg.ok()
-        del msgs    
+        msg.ok(batchoffset=-4)#1-5
 
         for i in range(6, 11):
             msg = queue.get(timeout=1)
             self.assertEqual("Message %d" % i, msg.body)
-            msgs.append(msg)
 
-        for msg in msgs:
-            msg.ok()
-        del msgs    
+        msg.ok(batchoffset=-4)#6-10
 
         try:
             extra = queue.get(timeout=1)
@@ -313,12 +304,9 @@
             channel.message_transfer(routing_key="test-prefetch-size", body="Message %d" % i)
 
         #only 5 messages should have been delivered (i.e. 45 bytes worth):
-        msgs = []
         for i in range(1, 6):
             msg = queue.get(timeout=1)
             self.assertEqual("Message %d" % i, msg.body)
-            print "Got Message %d" % i
-            msgs.append(msg)
 
         try:
             extra = queue.get(timeout=1)
@@ -326,18 +314,13 @@
         except Empty: None
 
         #ack messages and check that the next set arrive ok:
-        for msg in msgs:
-            msg.ok()
-        del msgs    
+        msg.ok(batchoffset=-4)#1-5
 
         for i in range(6, 11):
             msg = queue.get(timeout=1)
             self.assertEqual("Message %d" % i, msg.body)
-            msgs.append(msg)
 
-        for msg in msgs:
-            msg.ok()
-        del msgs    
+        msg.ok(batchoffset=-4)#6-10
 
         try:
             extra = queue.get(timeout=1)
@@ -383,14 +366,13 @@
             reply = channel.message_get(no_ack=False, queue="test-get", destination=tag)
             self.assertEqual(reply.method.klass.name, "message")
             self.assertEqual(reply.method.name, "ok")
-            self.assertEqual("Message %d" % i, self.client.queue(tag).get(timeout=1).body)
-            reply.ok()
-
-            #todo: when batching is available, test ack multiple
-            #if(i == 13):
-            #    channel.message_ack(delivery_tag=reply.delivery_tag, multiple=True)
-            #if(i in [15, 17, 19]):
-            #    channel.message_ack(delivery_tag=reply.delivery_tag)
+            msg = self.client.queue(tag).get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.body)
+            
+            if (i==13):
+              msg.ok(batchoffset=-2)#11, 12 & 13
+            if(i in [15, 17, 19]):
+              msg.ok()
 
         reply = channel.message_get(no_ack=True, queue="test-get")
         self.assertEqual(reply.method.klass.name, "message")
@@ -479,8 +461,9 @@
         queue = other.queue("c1")
         
         msg = queue.get(timeout=1)
-        self.assertTrue(isinstance(msg, Reference))
-        self.assertEquals(data, msg.get_complete())
+        self.assertTrue(isinstance(msg.body, ReferenceId))
+        self.assertTrue(msg.reference)
+        self.assertEquals(data, msg.reference.get_complete())
 
     def test_reference_completion(self):
         """
@@ -511,12 +494,165 @@
         #first, wait for the ok for the transfer
         ack.get_response(timeout=1)
         
+        self.assertDataEquals(channel, queue.get(timeout=1), "abcd")
+
+    def test_reference_multi_transfer(self):
+        """
+        Test that multiple transfer requests for the same reference are
+        correctly handled.
+        """
+        channel = self.channel
+        #declare and consume from two queues
+        channel.queue_declare(queue="q-one", exclusive=True)
+        channel.queue_declare(queue="q-two", exclusive=True)
+        channel.message_consume(queue="q-one", destination="q-one")
+        channel.message_consume(queue="q-two", destination="q-two")
+        queue1 = self.client.queue("q-one")
+        queue2 = self.client.queue("q-two")
+
+        #transfer a single ref to both queues (in separate commands)
+        channel.message_open(reference="my-ref")
+        channel.synchronous = False
+        ack1 = channel.message_transfer(routing_key="q-one", body=ReferenceId("my-ref"))
+        channel.message_append(reference="my-ref", bytes="my data")
+        ack2 = channel.message_transfer(routing_key="q-two", body=ReferenceId("my-ref"))
+        channel.synchronous = True
+        channel.message_close(reference="my-ref")
+
+        #check that both queues have the message
+        self.assertDataEquals(channel, queue1.get(timeout=1), "my data")
+        self.assertDataEquals(channel, queue2.get(timeout=1), "my data")
+        self.assertEmpty(queue1)
+        self.assertEmpty(queue2)
+
+        #transfer a single ref to the same queue twice (in separate commands)
+        channel.message_open(reference="my-ref")
+        channel.synchronous = False
+        ack1 = channel.message_transfer(routing_key="q-one", message_id="abc", body=ReferenceId("my-ref"))
+        channel.message_append(reference="my-ref", bytes="second message")
+        ack2 = channel.message_transfer(routing_key="q-one", message_id="xyz", body=ReferenceId("my-ref"))
+        channel.synchronous = True
+        channel.message_close(reference="my-ref")
+
+        msg1 = queue1.get(timeout=1)
+        msg2 = queue1.get(timeout=1)
+        #order is undefined
+        if msg1.message_id == "abc":
+            self.assertEquals(msg2.message_id, "xyz")
+        else:
+            self.assertEquals(msg1.message_id, "xyz")
+            self.assertEquals(msg2.message_id, "abc")
+            
+        #would be legal for the incoming messages to be transfered
+        #inline or by reference in any combination
+        
+        if isinstance(msg1.body, ReferenceId):            
+            self.assertEquals("second message", msg1.reference.get_complete())
+            if isinstance(msg2.body, ReferenceId):
+                if msg1.body != msg2.body:
+                    self.assertEquals("second message", msg2.reference.get_complete())
+                #else ok, as same ref as msg1    
+        else:
+            self.assertEquals("second message", msg1.body)
+            if isinstance(msg2.body, ReferenceId):
+                self.assertEquals("second message", msg2.reference.get_complete())
+            else:
+                self.assertEquals("second message", msg2.body)                
+
+        self.assertEmpty(queue1)
+
+    def test_reference_unopened_on_append_error(self):
+        channel = self.channel
+        try:
+            channel.message_append(reference="unopened")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def test_reference_unopened_on_close_error(self):
+        channel = self.channel
+        try:
+            channel.message_close(reference="unopened")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def test_reference_unopened_on_transfer_error(self):
+        channel = self.channel
+        try:
+            channel.message_transfer(body=ReferenceId("unopened"))
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def test_reference_already_opened_error(self):
+        channel = self.channel
+        channel.message_open(reference="a")
+        try:
+            channel.message_open(reference="a")            
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def test_empty_reference(self):
+        channel = self.channel
+        channel.queue_declare(queue="ref_queue", exclusive=True)
+        channel.message_consume(queue="ref_queue", destination="c1")
+        queue = self.client.queue("c1")
+
+        refId = "myref"
+        channel.message_open(reference=refId)
+        channel.synchronous = False
+        ack = channel.message_transfer(routing_key="ref_queue", message_id="empty-msg", body=ReferenceId(refId))
+        channel.synchronous = True
+        channel.message_close(reference=refId)
+
+        #first, wait for the ok for the transfer
+        ack.get_response(timeout=1)
+
         msg = queue.get(timeout=1)
-        if isinstance(msg, Reference):
-            #should we force broker to deliver as reference by frame
-            #size limit? or test that separately? for compliance, 
-            #allowing either seems best for now...            
-            data = msg.get_complete()
+        self.assertEquals(msg.message_id, "empty-msg")
+        self.assertDataEquals(channel, msg, "")
+
+    def test_reject(self):
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True)
+
+        channel.message_consume(queue = "q", destination = "consumer")
+        channel.message_transfer(routing_key = "q", body="blah, blah")
+        msg = self.client.queue("consumer").get(timeout = 1)
+        self.assertEquals(msg.body, "blah, blah")
+        channel.message_cancel(destination = "consumer")
+        msg.reject()
+
+        channel.message_consume(queue = "q", destination = "checker")
+        msg = self.client.queue("checker").get(timeout = 1)
+        self.assertEquals(msg.body, "blah, blah")
+
+    def test_checkpoint(self):
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True)
+
+        channel.message_open(reference="my-ref")
+        channel.message_append(reference="my-ref", bytes="abcdefgh")
+        channel.message_append(reference="my-ref", bytes="ijklmnop")
+        channel.message_checkpoint(reference="my-ref", identifier="my-checkpoint")
+        channel.channel_close()
+
+        channel = self.client.channel(2)
+        channel.channel_open()
+        channel.message_consume(queue = "q", destination = "consumer")
+        offset = channel.message_resume(reference="my-ref", identifier="my-checkpoint").value
+        self.assertEquals(offset, 16)
+        channel.message_append(reference="my-ref", bytes="qrstuvwxyz")
+        channel.synchronous = False
+        channel.message_transfer(routing_key="q-one", message_id="abcd", body=ReferenceId("my-ref"))
+        channel.synchronous = True
+        channel.message_close(reference="my-ref")
+
+        self.assertDataEquals(channel, self.client.queue("consumer").get(timeout = 1), "abcdefghijklmnopqrstuvwxyz")
+        self.assertEmpty(self.client.queue("consumer"))
+        
+        
+    def assertDataEquals(self, channel, msg, expected):
+        if isinstance(msg.body, ReferenceId):
+            data = msg.reference.get_complete()
         else:
             data = msg.body
         self.assertEquals("abcdefghijkl", data)

Modified: incubator/qpid/trunk/qpid/python/tests/tx.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests/tx.py?view=diff&rev=520094&r1=520093&r2=520094
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests/tx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests/tx.py Mon Mar 19 13:34:32 2007
@@ -163,7 +163,8 @@
         for i in range(1, 5):
             msg = queue_a.get(timeout=1)
             self.assertEqual("Message %d" % i, msg.body)
-            msg.ok()
+
+        msg.ok(batchoffset=-3)
 
         channel.message_consume(queue=name_b, destination="sub_b", no_ack=False)
         queue_b = self.client.queue("sub_b")