You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2008/03/05 20:56:48 UTC

svn commit: r634003 [2/3] - in /incubator/qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/framing/ cpp/src/tests/ cpp/xml/ python/ python/qpid/ python/tests_0-10/ python/tests_0-10_preview/

Added: incubator/qpid/trunk/qpid/python/tests_0-10_preview/dtx.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10_preview/dtx.py?rev=634003&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10_preview/dtx.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10_preview/dtx.py Wed Mar  5 11:56:43 2008
@@ -0,0 +1,645 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+from struct import pack, unpack
+from time import sleep
+
+class DtxTests(TestBase):
+    """
+    Tests for the amqp dtx related classes.
+
+    Tests of the form test_simple_xxx test the basic transactional
+    behaviour. The approach here is to 'swap' a message from one queue
+    to another by consuming and re-publishing in the same
+    transaction. That transaction is then completed in different ways
+    and the appropriate result verified.
+
+    The other tests enforce more specific rules and behaviour on a
+    per-method or per-field basis.        
+    """
+
+    XA_RBROLLBACK = 1
+    XA_RBTIMEOUT = 2
+    XA_OK = 0
+    tx_counter = 0
+
+    def reset_channel(self):
+        self.channel.session_close()
+        self.channel = self.client.channel(self.channel.id + 1)
+        self.channel.session_open()
+
+    def test_simple_commit(self):
+        """        
+        Test basic one-phase commit behaviour.     
+        """
+        channel = self.channel
+        tx = self.xid("my-xid")
+        self.txswap(tx, "commit")
+
+        #neither queue should have any messages accessible
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+
+        #commit
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=True).status)
+
+        #should close and reopen channel to ensure no unacked messages are held
+        self.reset_channel()
+
+        #check result
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(1, "queue-b")
+        self.assertMessageId("commit", "queue-b")
+
+    def test_simple_prepare_commit(self):
+        """        
+        Test basic two-phase commit behaviour.     
+        """
+        channel = self.channel
+        tx = self.xid("my-xid")
+        self.txswap(tx, "prepare-commit")
+
+        #prepare
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status)
+
+        #neither queue should have any messages accessible
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+
+        #commit
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_commit(xid=tx, one_phase=False).status)
+
+        self.reset_channel()
+
+        #check result
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(1, "queue-b")
+        self.assertMessageId("prepare-commit", "queue-b")
+
+
+    def test_simple_rollback(self):
+        """        
+        Test basic rollback behaviour.     
+        """
+        channel = self.channel
+        tx = self.xid("my-xid")
+        self.txswap(tx, "rollback")
+
+        #neither queue should have any messages accessible
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+
+        #rollback
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+
+        self.reset_channel()
+
+        #check result
+        self.assertMessageCount(1, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+        self.assertMessageId("rollback", "queue-a")
+
+    def test_simple_prepare_rollback(self):
+        """        
+        Test basic rollback behaviour after the transaction has been prepared.     
+        """
+        channel = self.channel
+        tx = self.xid("my-xid")
+        self.txswap(tx, "prepare-rollback")
+
+        #prepare
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_prepare(xid=tx).status)
+
+        #neither queue should have any messages accessible
+        self.assertMessageCount(0, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+
+        #rollback
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)
+
+        self.reset_channel()
+
+        #check result
+        self.assertMessageCount(1, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+        self.assertMessageId("prepare-rollback", "queue-a")    
+
+    def test_select_required(self):
+        """
+        check that an error is flagged if select is not issued before
+        start or end        
+        """
+        channel = self.channel
+        tx = self.xid("dummy")
+        try:
+            channel.dtx_demarcation_start(xid=tx)
+            
+            #if we get here we have failed, but need to do some cleanup:
+            channel.dtx_demarcation_end(xid=tx)
+            channel.dtx_coordination_rollback(xid=tx)
+            self.fail("Channel not selected for use with dtx, expected exception!")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def test_start_already_known(self):
+        """
+        Verify that an attempt to start an association with a
+        transaction that is already known is not allowed (unless the
+        join flag is set).
+        """
+        #create two channels on different connection & select them for use with dtx:
+        channel1 = self.channel
+        channel1.dtx_demarcation_select()
+
+        other = self.connect()
+        channel2 = other.channel(1)
+        channel2.session_open()
+        channel2.dtx_demarcation_select()
+
+        #create a xid
+        tx = self.xid("dummy")
+        #start work on one channel under that xid:
+        channel1.dtx_demarcation_start(xid=tx)
+        #then start on the other without the join set
+        failed = False
+        try:
+            channel2.dtx_demarcation_start(xid=tx)
+        except Closed, e:
+            failed = True
+            error = e
+
+        #cleanup:
+        if not failed:
+            channel2.dtx_demarcation_end(xid=tx)
+            other.close()
+        channel1.dtx_demarcation_end(xid=tx)
+        channel1.dtx_coordination_rollback(xid=tx)
+        
+        #verification:
+        if failed: self.assertConnectionException(503, e.args[0])
+        else: self.fail("Xid already known, expected exception!")                    
+
+    def test_forget_xid_on_completion(self):
+        """
+        Verify that a xid is 'forgotten' - and can therefore be used
+        again - once it is completed.
+        """
+        #do some transactional work & complete the transaction
+        self.test_simple_commit()
+        # channel has been reset, so reselect for use with dtx
+        self.channel.dtx_demarcation_select()        
+        
+        #start association for the same xid as the previously completed txn
+        tx = self.xid("my-xid")
+        self.channel.dtx_demarcation_start(xid=tx)
+        self.channel.dtx_demarcation_end(xid=tx)
+        self.channel.dtx_coordination_rollback(xid=tx)
+
+    def test_start_join_and_resume(self):
+        """
+        Ensure the correct error is signalled when both the join and
+        resume flags are set on starting an association between a
+        channel and a transcation.
+        """
+        channel = self.channel
+        channel.dtx_demarcation_select()
+        tx = self.xid("dummy")
+        try:
+            channel.dtx_demarcation_start(xid=tx, join=True, resume=True)
+            #failed, but need some cleanup:
+            channel.dtx_demarcation_end(xid=tx)
+            channel.dtx_coordination_rollback(xid=tx)
+            self.fail("Join and resume both set, expected exception!")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def test_start_join(self):
+        """        
+        Verify 'join' behaviour, where a channel is associated with a
+        transaction that is already associated with another channel.        
+        """
+        #create two channels & select them for use with dtx:
+        channel1 = self.channel
+        channel1.dtx_demarcation_select()
+
+        channel2 = self.client.channel(2)
+        channel2.session_open()
+        channel2.dtx_demarcation_select()
+
+        #setup
+        channel1.queue_declare(queue="one", exclusive=True, auto_delete=True)
+        channel1.queue_declare(queue="two", exclusive=True, auto_delete=True)
+        channel1.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
+        channel1.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
+
+        #create a xid
+        tx = self.xid("dummy")
+        #start work on one channel under that xid:
+        channel1.dtx_demarcation_start(xid=tx)
+        #then start on the other with the join flag set
+        channel2.dtx_demarcation_start(xid=tx, join=True)
+
+        #do work through each channel
+        self.swap(channel1, "one", "two")#swap 'a' from 'one' to 'two'
+        self.swap(channel2, "two", "one")#swap 'b' from 'two' to 'one'
+
+        #mark end on both channels
+        channel1.dtx_demarcation_end(xid=tx)
+        channel2.dtx_demarcation_end(xid=tx)
+        
+        #commit and check
+        channel1.dtx_coordination_commit(xid=tx, one_phase=True)
+        self.assertMessageCount(1, "one")
+        self.assertMessageCount(1, "two")
+        self.assertMessageId("a", "two")
+        self.assertMessageId("b", "one")
+        
+
+    def test_suspend_resume(self):
+        """
+        Test suspension and resumption of an association
+        """
+        channel = self.channel
+        channel.dtx_demarcation_select()
+
+        #setup
+        channel.queue_declare(queue="one", exclusive=True, auto_delete=True)
+        channel.queue_declare(queue="two", exclusive=True, auto_delete=True)
+        channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
+        channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
+
+        tx = self.xid("dummy")
+
+        channel.dtx_demarcation_start(xid=tx)
+        self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two'
+        channel.dtx_demarcation_end(xid=tx, suspend=True)
+
+        channel.dtx_demarcation_start(xid=tx, resume=True)
+        self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one'
+        channel.dtx_demarcation_end(xid=tx)
+        
+        #commit and check
+        channel.dtx_coordination_commit(xid=tx, one_phase=True)
+        self.assertMessageCount(1, "one")
+        self.assertMessageCount(1, "two")
+        self.assertMessageId("a", "two")
+        self.assertMessageId("b", "one")
+
+    def test_suspend_start_end_resume(self):        
+        """
+        Test suspension and resumption of an association with work
+        done on another transaction when the first transaction is
+        suspended
+        """
+        channel = self.channel
+        channel.dtx_demarcation_select()
+
+        #setup
+        channel.queue_declare(queue="one", exclusive=True, auto_delete=True)
+        channel.queue_declare(queue="two", exclusive=True, auto_delete=True)
+        channel.message_transfer(content=Content(properties={'routing_key':"one", 'message_id':"a"}, body="DtxMessage"))
+        channel.message_transfer(content=Content(properties={'routing_key':"two", 'message_id':"b"}, body="DtxMessage"))
+
+        tx = self.xid("dummy")
+
+        channel.dtx_demarcation_start(xid=tx)
+        self.swap(channel, "one", "two")#swap 'a' from 'one' to 'two'
+        channel.dtx_demarcation_end(xid=tx, suspend=True)
+
+        channel.dtx_demarcation_start(xid=tx, resume=True)
+        self.swap(channel, "two", "one")#swap 'b' from 'two' to 'one'
+        channel.dtx_demarcation_end(xid=tx)
+        
+        #commit and check
+        channel.dtx_coordination_commit(xid=tx, one_phase=True)
+        self.assertMessageCount(1, "one")
+        self.assertMessageCount(1, "two")
+        self.assertMessageId("a", "two")
+        self.assertMessageId("b", "one")
+
+    def test_end_suspend_and_fail(self):
+        """        
+        Verify that the correct error is signalled if the suspend and
+        fail flag are both set when disassociating a transaction from
+        the channel        
+        """
+        channel = self.channel
+        channel.dtx_demarcation_select()
+        tx = self.xid("suspend_and_fail")
+        channel.dtx_demarcation_start(xid=tx)
+        try:
+            channel.dtx_demarcation_end(xid=tx, suspend=True, fail=True)
+            self.fail("Suspend and fail both set, expected exception!")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+        #cleanup    
+        other = self.connect()
+        channel = other.channel(1)
+        channel.session_open()
+        channel.dtx_coordination_rollback(xid=tx)
+        channel.session_close()
+        other.close()
+    
+
+    def test_end_unknown_xid(self):
+        """        
+        Verifies that the correct exception is thrown when an attempt
+        is made to end the association for a xid not previously
+        associated with the channel
+        """
+        channel = self.channel
+        channel.dtx_demarcation_select()
+        tx = self.xid("unknown-xid")
+        try:
+            channel.dtx_demarcation_end(xid=tx)
+            self.fail("Attempted to end association with unknown xid, expected exception!")
+        except Closed, e:
+            #FYI: this is currently *not* the exception specified, but I think the spec is wrong! Confirming...
+            self.assertConnectionException(503, e.args[0])
+
+    def test_end(self):
+        """
+        Verify that the association is terminated by end and subsequent
+        operations are non-transactional        
+        """
+        channel = self.client.channel(2)
+        channel.session_open()
+        channel.queue_declare(queue="tx-queue", exclusive=True, auto_delete=True)
+
+        #publish a message under a transaction
+        channel.dtx_demarcation_select()
+        tx = self.xid("dummy")
+        channel.dtx_demarcation_start(xid=tx)
+        channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"one"}, body="DtxMessage"))
+        channel.dtx_demarcation_end(xid=tx)
+
+        #now that association with txn is ended, publish another message
+        channel.message_transfer(content=Content(properties={'routing_key':"tx-queue", 'message_id':"two"}, body="DtxMessage"))
+
+        #check the second message is available, but not the first
+        self.assertMessageCount(1, "tx-queue")
+        self.subscribe(channel, queue="tx-queue", destination="results", confirm_mode=1)
+        msg = self.client.queue("results").get(timeout=1)
+        self.assertEqual("two", msg.content['message_id'])
+        channel.message_cancel(destination="results")
+        #ack the message then close the channel
+        msg.complete()
+        channel.session_close()
+
+        channel = self.channel        
+        #commit the transaction and check that the first message (and
+        #only the first message) is then delivered
+        channel.dtx_coordination_commit(xid=tx, one_phase=True)
+        self.assertMessageCount(1, "tx-queue")
+        self.assertMessageId("one", "tx-queue")
+
+    def test_invalid_commit_one_phase_true(self):
+        """
+        Test that a commit with one_phase = True is rejected if the
+        transaction in question has already been prepared.        
+        """
+        other = self.connect()
+        tester = other.channel(1)
+        tester.session_open()
+        tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+        tester.dtx_demarcation_select()
+        tx = self.xid("dummy")
+        tester.dtx_demarcation_start(xid=tx)
+        tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
+        tester.dtx_demarcation_end(xid=tx)
+        tester.dtx_coordination_prepare(xid=tx)
+        failed = False
+        try:
+            tester.dtx_coordination_commit(xid=tx, one_phase=True)
+        except Closed, e:
+            failed = True
+            error = e
+
+        if failed:
+            self.channel.dtx_coordination_rollback(xid=tx)
+            self.assertConnectionException(503, e.args[0])
+        else:
+            tester.session_close()
+            other.close()
+            self.fail("Invalid use of one_phase=True, expected exception!")
+
+    def test_invalid_commit_one_phase_false(self):
+        """
+        Test that a commit with one_phase = False is rejected if the
+        transaction in question has not yet been prepared.        
+        """
+        """
+        Test that a commit with one_phase = True is rejected if the
+        transaction in question has already been prepared.        
+        """
+        other = self.connect()
+        tester = other.channel(1)
+        tester.session_open()
+        tester.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+        tester.dtx_demarcation_select()
+        tx = self.xid("dummy")
+        tester.dtx_demarcation_start(xid=tx)
+        tester.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
+        tester.dtx_demarcation_end(xid=tx)
+        failed = False
+        try:
+            tester.dtx_coordination_commit(xid=tx, one_phase=False)
+        except Closed, e:
+            failed = True
+            error = e
+
+        if failed:
+            self.channel.dtx_coordination_rollback(xid=tx)
+            self.assertConnectionException(503, e.args[0])
+        else:
+            tester.session_close()
+            other.close()
+            self.fail("Invalid use of one_phase=False, expected exception!")
+
+    def test_implicit_end(self):
+        """
+        Test that an association is implicitly ended when the channel
+        is closed (whether by exception or explicit client request)
+        and the transaction in question is marked as rollback only.
+        """
+        channel1 = self.channel
+        channel2 = self.client.channel(2)
+        channel2.session_open()
+
+        #setup:
+        channel2.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+        channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
+        tx = self.xid("dummy")
+
+        channel2.dtx_demarcation_select()
+        channel2.dtx_demarcation_start(xid=tx)
+        channel2.message_subscribe(queue="dummy", destination="dummy", confirm_mode=1)
+        channel2.message_flow(destination="dummy", unit=0, value=1)
+        channel2.message_flow(destination="dummy", unit=1, value=0xFFFFFFFF)
+        self.client.queue("dummy").get(timeout=1).complete()
+        channel2.message_cancel(destination="dummy")
+        channel2.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="whatever"))
+        channel2.session_close()
+
+        self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).status)
+        channel1.dtx_coordination_rollback(xid=tx)
+
+    def test_get_timeout(self):
+        """        
+        Check that get-timeout returns the correct value, (and that a
+        transaction with a timeout can complete normally)        
+        """
+        channel = self.channel
+        tx = self.xid("dummy")
+
+        channel.dtx_demarcation_select()
+        channel.dtx_demarcation_start(xid=tx)
+        self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout)
+        channel.dtx_coordination_set_timeout(xid=tx, timeout=60)
+        self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout)
+        self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).status)
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).status)        
+        
+    def test_set_timeout(self):
+        """        
+        Test the timeout of a transaction results in the expected
+        behaviour        
+        """
+        #open new channel to allow self.channel to be used in checking te queue
+        channel = self.client.channel(2)
+        channel.session_open()
+        #setup:
+        tx = self.xid("dummy")
+        channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
+        channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
+        channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':"timeout"}, body="DtxMessage"))
+
+        channel.dtx_demarcation_select()
+        channel.dtx_demarcation_start(xid=tx)
+        self.swap(channel, "queue-a", "queue-b")
+        channel.dtx_coordination_set_timeout(xid=tx, timeout=2)
+        sleep(3)
+        #check that the work has been rolled back already
+        self.assertMessageCount(1, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+        self.assertMessageId("timeout", "queue-a")
+        #check the correct codes are returned when we try to complete the txn
+        self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).status)
+        self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).status)        
+
+
+
+    def test_recover(self):
+        """
+        Test basic recover behaviour
+        """
+        channel = self.channel
+
+        channel.dtx_demarcation_select()
+        channel.queue_declare(queue="dummy", exclusive=True, auto_delete=True)
+
+        prepared = []
+        for i in range(1, 10):
+            tx = self.xid("tx%s" % (i))
+            channel.dtx_demarcation_start(xid=tx)
+            channel.message_transfer(content=Content(properties={'routing_key':"dummy"}, body="message%s" % (i)))
+            channel.dtx_demarcation_end(xid=tx)
+            if i in [2, 5, 6, 8]:
+                channel.dtx_coordination_prepare(xid=tx)
+                prepared.append(tx)
+            else:    
+                channel.dtx_coordination_rollback(xid=tx)
+
+        xids = channel.dtx_coordination_recover().in_doubt
+        
+        #rollback the prepared transactions returned by recover
+        for x in xids:
+            channel.dtx_coordination_rollback(xid=x)            
+
+        #validate against the expected list of prepared transactions
+        actual = set(xids)
+        expected = set(prepared)
+        intersection = actual.intersection(expected)
+        
+        if intersection != expected:
+            missing = expected.difference(actual)
+            extra = actual.difference(expected)
+            for x in missing:
+                channel.dtx_coordination_rollback(xid=x)            
+            self.fail("Recovered xids not as expected. missing: %s; extra: %s" % (missing, extra))
+
+    def test_bad_resume(self):
+        """
+        Test that a resume on a session not selected for use with dtx fails
+        """
+        channel = self.channel
+        try:
+            channel.dtx_demarcation_start(resume=True)
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def xid(self, txid):
+        DtxTests.tx_counter += 1
+        branchqual = "v%s" % DtxTests.tx_counter
+        return pack('!LBB', 0, len(txid), len(branchqual)) + txid + branchqual
+        
+    def txswap(self, tx, id):
+        channel = self.channel
+        #declare two queues:
+        channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
+        channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
+        #put message with specified id on one queue:
+        channel.message_transfer(content=Content(properties={'routing_key':"queue-a", 'message_id':id}, body="DtxMessage"))
+
+        #start the transaction:
+        channel.dtx_demarcation_select()        
+        self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_start(xid=tx).status)
+
+        #'swap' the message from one queue to the other, under that transaction:
+        self.swap(self.channel, "queue-a", "queue-b")
+
+        #mark the end of the transactional work:
+        self.assertEqual(self.XA_OK, self.channel.dtx_demarcation_end(xid=tx).status)
+
+    def swap(self, channel, src, dest):
+        #consume from src:
+        channel.message_subscribe(destination="temp-swap", queue=src, confirm_mode=1)
+        channel.message_flow(destination="temp-swap", unit=0, value=1)
+        channel.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF)
+        msg = self.client.queue("temp-swap").get(timeout=1)
+        channel.message_cancel(destination="temp-swap")
+        msg.complete();        
+
+        #re-publish to dest
+        channel.message_transfer(content=Content(properties={'routing_key':dest, 'message_id':msg.content['message_id']},
+                                                 body=msg.content.body))
+
+    def assertMessageCount(self, expected, queue):
+        self.assertEqual(expected, self.channel.queue_query(queue=queue).message_count)
+
+    def assertMessageId(self, expected, queue):
+        self.channel.message_subscribe(queue=queue, destination="results")
+        self.channel.message_flow(destination="results", unit=0, value=1)
+        self.channel.message_flow(destination="results", unit=1, value=0xFFFFFFFF)
+        self.assertEqual(expected, self.client.queue("results").get(timeout=1).content['message_id'])
+        self.channel.message_cancel(destination="results")

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10_preview/dtx.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/tests_0-10_preview/example.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10_preview/example.py?rev=634003&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10_preview/example.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10_preview/example.py Wed Mar  5 11:56:43 2008
@@ -0,0 +1,95 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class ExampleTest (TestBase):
+    """
+    An example Qpid test, illustrating the unittest framework and the
+    python Qpid client. The test class must inherit TestBase.  The
+    test code uses the Qpid client to interact with a qpid broker and
+    verify it behaves as expected.
+    """ 
+
+    def test_example(self):
+        """
+        An example test. Note that test functions must start with 'test_'
+        to be recognized by the test framework.
+        """
+
+        # By inheriting TestBase, self.client is automatically connected
+        # and self.channel is automatically opened as channel(1)
+        # Other channel methods mimic the protocol.
+        channel = self.channel
+
+        # Now we can send regular commands. If you want to see what the method
+        # arguments mean or what other commands are available, you can use the
+        # python builtin help() method. For example:
+        #help(chan)
+        #help(chan.exchange_declare)
+
+        # If you want browse the available protocol methods without being
+        # connected to a live server you can use the amqp-doc utility:
+        #
+        #   Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>]
+        #
+        #   Options:
+        #       -e, --regexp    use regex instead of glob when matching
+
+        # Now that we know what commands are available we can use them to
+        # interact with the server.
+
+        # Here we use ordinal arguments.
+        self.exchange_declare(channel, 0, "test", "direct")
+
+        # Here we use keyword arguments.
+        self.queue_declare(channel, queue="test-queue")
+        channel.queue_bind(queue="test-queue", exchange="test", routing_key="key")
+
+        # Call Channel.basic_consume to register as a consumer.
+        # All the protocol methods return a message object. The message object
+        # has fields corresponding to the reply method fields, plus a content
+        # field that is filled if the reply includes content. In this case the
+        # interesting field is the consumer_tag.
+        channel.message_subscribe(queue="test-queue", destination="consumer_tag")
+        channel.message_flow(destination="consumer_tag", unit=0, value=0xFFFFFFFF)
+        channel.message_flow(destination="consumer_tag", unit=1, value=0xFFFFFFFF)
+
+        # We can use the Client.queue(...) method to access the queue
+        # corresponding to our consumer_tag.
+        queue = self.client.queue("consumer_tag")
+
+        # Now lets publish a message and see if our consumer gets it. To do
+        # this we need to import the Content class.
+        sent = Content("Hello World!")
+        sent["routing_key"] = "key"
+        channel.message_transfer(destination="test", content=sent)
+
+        # Now we'll wait for the message to arrive. We can use the timeout
+        # argument in case the server hangs. By default queue.get() will wait
+        # until a message arrives or the connection to the server dies.
+        msg = queue.get(timeout=10)
+
+        # And check that we got the right response with assertEqual
+        self.assertEqual(sent.body, msg.content.body)
+
+        # Now acknowledge the message.
+        msg.complete()
+

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10_preview/example.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/tests_0-10_preview/exchange.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10_preview/exchange.py?rev=634003&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10_preview/exchange.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10_preview/exchange.py Wed Mar  5 11:56:43 2008
@@ -0,0 +1,335 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Tests for exchange behaviour.
+
+Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
+"""
+
+import Queue, logging
+from qpid.testlib import TestBase
+from qpid.content import Content
+from qpid.client import Closed
+
+
+class StandardExchangeVerifier:
+    """Verifies standard exchange behavior.
+
+    Used as base class for classes that test standard exchanges."""
+
+    def verifyDirectExchange(self, ex):
+        """Verify that ex behaves like a direct exchange."""
+        self.queue_declare(queue="q")
+        self.channel.queue_bind(queue="q", exchange=ex, routing_key="k")
+        self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
+        try:
+            self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
+            self.fail("Expected Empty exception")
+        except Queue.Empty: None # Expected
+
+    def verifyFanOutExchange(self, ex):
+        """Verify that ex behaves like a fanout exchange."""
+        self.queue_declare(queue="q") 
+        self.channel.queue_bind(queue="q", exchange=ex)
+        self.queue_declare(queue="p") 
+        self.channel.queue_bind(queue="p", exchange=ex)
+        for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
+
+    def verifyTopicExchange(self, ex):
+        """Verify that ex behaves like a topic exchange"""
+        self.queue_declare(queue="a")
+        self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*")
+        q = self.consume("a")
+        self.assertPublishGet(q, ex, "a.b.x")
+        self.assertPublishGet(q, ex, "a.x.b.x")
+        self.assertPublishGet(q, ex, "a.x.x.b.x")
+        # Shouldn't match
+        self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"}))        
+        self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b.x.y"}))        
+        self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"x.a.b.x"}))        
+        self.channel.message_transfer(destination=ex, content=Content(properties={'routing_key':"a.b"}))
+        self.assert_(q.empty())
+
+    def verifyHeadersExchange(self, ex):
+        """Verify that ex is a headers exchange"""
+        self.queue_declare(queue="q")
+        self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
+        q = self.consume("q")
+        headers = {"name":"fred", "age":3}
+        self.assertPublishGet(q, exchange=ex, properties=headers)
+        self.channel.message_transfer(destination=ex) # No headers, won't deliver
+        self.assertEmpty(q);                 
+        
+
+class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
+    """
+    The server SHOULD implement these standard exchange types: topic, headers.
+    
+    Client attempts to declare an exchange with each of these standard types.
+    """
+
+    def testDirect(self):
+        """Declare and test a direct exchange"""
+        self.exchange_declare(0, exchange="d", type="direct")
+        self.verifyDirectExchange("d")
+
+    def testFanout(self):
+        """Declare and test a fanout exchange"""
+        self.exchange_declare(0, exchange="f", type="fanout")
+        self.verifyFanOutExchange("f")
+
+    def testTopic(self):
+        """Declare and test a topic exchange"""
+        self.exchange_declare(0, exchange="t", type="topic")
+        self.verifyTopicExchange("t")
+
+    def testHeaders(self):
+        """Declare and test a headers exchange"""
+        self.exchange_declare(0, exchange="h", type="headers")
+        self.verifyHeadersExchange("h")
+        
+
+class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
+    """
+    The server MUST, in each virtual host, pre-declare an exchange instance
+    for each standard exchange type that it implements, where the name of the
+    exchange instance is amq. followed by the exchange type name.
+    
+    Client creates a temporary queue and attempts to bind to each required
+    exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if
+    those types are defined).
+    """
+    def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
+
+    def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
+
+    def testAmqTopic(self):  self.verifyTopicExchange("amq.topic")
+        
+    def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
+
+class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
+    """
+    The server MUST predeclare a direct exchange to act as the default exchange
+    for content Publish methods and for default queue bindings.
+    
+    Client checks that the default exchange is active by specifying a queue
+    binding with no exchange name, and publishing a message with a suitable
+    routing key but without specifying the exchange name, then ensuring that
+    the message arrives in the queue correctly.
+    """
+    def testDefaultExchange(self):
+        # Test automatic binding by queue name.
+        self.queue_declare(queue="d")
+        self.assertPublishConsume(queue="d", routing_key="d")
+        # Test explicit bind to default queue
+        self.verifyDirectExchange("")
+
+
+# TODO aconway 2006-09-27: Fill in empty tests:
+
+class DefaultAccessRuleTests(TestBase):
+    """
+    The server MUST NOT allow clients to access the default exchange except
+    by specifying an empty exchange name in the Queue.Bind and content Publish
+    methods.
+    """
+
+class ExtensionsRuleTests(TestBase):
+    """
+    The server MAY implement other exchange types as wanted.
+    """
+
+
+class DeclareMethodMinimumRuleTests(TestBase):
+    """
+    The server SHOULD support a minimum of 16 exchanges per virtual host and
+    ideally, impose no limit except as defined by available resources.
+    
+    The client creates as many exchanges as it can until the server reports
+    an error; the number of exchanges successfuly created must be at least
+    sixteen.
+    """
+
+
+class DeclareMethodTicketFieldValidityRuleTests(TestBase):
+    """
+    The client MUST provide a valid access ticket giving "active" access to
+    the realm in which the exchange exists or will be created, or "passive"
+    access if the if-exists flag is set.
+    
+    Client creates access ticket with wrong access rights and attempts to use
+    in this method.
+    """
+
+
+class DeclareMethodExchangeFieldReservedRuleTests(TestBase):
+    """
+    Exchange names starting with "amq." are reserved for predeclared and
+    standardised exchanges. The client MUST NOT attempt to create an exchange
+    starting with "amq.".
+    
+    
+    """
+
+
+class DeclareMethodTypeFieldTypedRuleTests(TestBase):
+    """
+    Exchanges cannot be redeclared with different types.  The client MUST not
+    attempt to redeclare an existing exchange with a different type than used
+    in the original Exchange.Declare method.
+    
+    
+    """
+
+
+class DeclareMethodTypeFieldSupportRuleTests(TestBase):
+    """
+    The client MUST NOT attempt to create an exchange with a type that the
+    server does not support.
+    
+    
+    """
+
+
+class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase):
+    """
+    If set, and the exchange does not already exist, the server MUST raise a
+    channel exception with reply code 404 (not found).    
+    """
+    def test(self):
+        try:
+            self.channel.exchange_declare(exchange="humpty_dumpty", passive=True)
+            self.fail("Expected 404 for passive declaration of unknown exchange.")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+
+class DeclareMethodDurableFieldSupportRuleTests(TestBase):
+    """
+    The server MUST support both durable and transient exchanges.
+    
+    
+    """
+
+
+class DeclareMethodDurableFieldStickyRuleTests(TestBase):
+    """
+    The server MUST ignore the durable field if the exchange already exists.
+    
+    
+    """
+
+
+class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase):
+    """
+    The server MUST ignore the auto-delete field if the exchange already
+    exists.
+    
+    
+    """
+
+
+class DeleteMethodTicketFieldValidityRuleTests(TestBase):
+    """
+    The client MUST provide a valid access ticket giving "active" access
+    rights to the exchange's access realm.
+    
+    Client creates access ticket with wrong access rights and attempts to use
+    in this method.
+    """
+
+
+class DeleteMethodExchangeFieldExistsRuleTests(TestBase):
+    """
+    The client MUST NOT attempt to delete an exchange that does not exist.
+    """
+
+
+class HeadersExchangeTests(TestBase):
+    """
+    Tests for headers exchange functionality.
+    """
+    def setUp(self):
+        TestBase.setUp(self)
+        self.queue_declare(queue="q")
+        self.q = self.consume("q")
+
+    def myAssertPublishGet(self, headers):
+        self.assertPublishGet(self.q, exchange="amq.match", properties=headers)
+
+    def myBasicPublish(self, headers):
+        self.channel.message_transfer(destination="amq.match", content=Content("foobar", properties={'application_headers':headers}))
+        
+    def testMatchAll(self):
+        self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
+        self.myAssertPublishGet({"name":"fred", "age":3})
+        self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
+        
+        # None of these should match
+        self.myBasicPublish({})
+        self.myBasicPublish({"name":"barney"})
+        self.myBasicPublish({"name":10})
+        self.myBasicPublish({"name":"fred", "age":2})
+        self.assertEmpty(self.q)
+
+    def testMatchAny(self):
+        self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
+        self.myAssertPublishGet({"name":"fred"})
+        self.myAssertPublishGet({"name":"fred", "ignoreme":10})
+        self.myAssertPublishGet({"ignoreme":10, "age":3})
+
+        # Wont match
+        self.myBasicPublish({})
+        self.myBasicPublish({"irrelevant":0})
+        self.assertEmpty(self.q)
+
+
+class MiscellaneousErrorsTests(TestBase):
+    """
+    Test some miscellaneous error conditions
+    """
+    def testTypeNotKnown(self):
+        try:
+            self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
+            self.fail("Expected 503 for declaration of unknown exchange type.")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def testDifferentDeclaredType(self):
+        self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
+        try:
+            self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
+            self.fail("Expected 530 for redeclaration of exchange with different type.")
+        except Closed, e:
+            self.assertConnectionException(530, e.args[0])
+        #cleanup    
+        other = self.connect()
+        c2 = other.channel(1)
+        c2.session_open()
+        c2.exchange_delete(exchange="test_different_declared_type_exchange")
+    
+class ExchangeTests(TestBase):
+    def testHeadersBindNoMatchArg(self):
+        self.channel.queue_declare(queue="q", exclusive=True, auto_delete=True)
+        try: 
+            self.channel.queue_bind(queue="q", exchange="amq.match", arguments={"name":"fred" , "age":3} )
+            self.fail("Expected failure for missing x-match arg.")
+        except Closed, e:    
+            self.assertConnectionException(541, e.args[0])

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10_preview/exchange.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/tests_0-10_preview/execution.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10_preview/execution.py?rev=634003&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10_preview/execution.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10_preview/execution.py Wed Mar  5 11:56:43 2008
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class ExecutionTests (TestBase):
+    def test_flush(self):
+        channel = self.channel
+        for i in [1, 2, 3]:
+            channel.message_transfer(
+                content=Content(properties={'routing_key':str(i)}))
+            assert(channel.completion.wait(channel.completion.command_id, timeout=1))

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10_preview/execution.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/tests_0-10_preview/message.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10_preview/message.py?rev=634003&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10_preview/message.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10_preview/message.py Wed Mar  5 11:56:43 2008
@@ -0,0 +1,834 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+from qpid.reference import Reference, ReferenceId
+
+class MessageTests(TestBase):
+    """Tests for 'methods' on the amqp message 'class'"""
+
+    def test_consume_no_local(self):
+        """
+        Test that the no_local flag is honoured in the consume method
+        """
+        channel = self.channel
+        #setup, declare two queues:
+        channel.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
+        channel.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True)
+        #establish two consumers one of which excludes delivery of locally sent messages
+        self.subscribe(destination="local_included", queue="test-queue-1a")
+        self.subscribe(destination="local_excluded", queue="test-queue-1b", no_local=True)
+
+        #send a message
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1a"}, body="consume_no_local"))
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-1b"}, body="consume_no_local"))
+
+        #check the queues of the two consumers
+        excluded = self.client.queue("local_excluded")
+        included = self.client.queue("local_included")
+        msg = included.get(timeout=1)
+        self.assertEqual("consume_no_local", msg.content.body)
+        try:
+            excluded.get(timeout=1)
+            self.fail("Received locally published message though no_local=true")
+        except Empty: None
+
+    def test_consume_no_local_awkward(self):
+
+        """
+        If an exclusive queue gets a no-local delivered to it, that
+        message could 'block' delivery of subsequent messages or it
+        could be left on the queue, possibly never being consumed
+        (this is the case for example in the qpid JMS mapping of
+        topics). This test excercises a Qpid C++ broker hack that
+        deletes such messages.
+        """
+
+        channel = self.channel
+        #setup:
+        channel.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+        #establish consumer which excludes delivery of locally sent messages
+        self.subscribe(destination="local_excluded", queue="test-queue", no_local=True)
+
+        #send a 'local' message
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="local"))
+
+        #send a non local message
+        other = self.connect()
+        channel2 = other.channel(1)
+        channel2.session_open()
+        channel2.message_transfer(content=Content(properties={'routing_key' : "test-queue"}, body="foreign"))
+        channel2.session_close()
+        other.close()
+
+        #check that the second message only is delivered
+        excluded = self.client.queue("local_excluded")
+        msg = excluded.get(timeout=1)
+        self.assertEqual("foreign", msg.content.body)
+        try:
+            excluded.get(timeout=1)
+            self.fail("Received extra message")
+        except Empty: None
+        #check queue is empty
+        self.assertEqual(0, channel.queue_query(queue="test-queue").message_count)
+
+
+    def test_consume_exclusive(self):
+        """
+        Test that the exclusive flag is honoured in the consume method
+        """
+        channel = self.channel
+        #setup, declare a queue:
+        channel.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
+
+        #check that an exclusive consumer prevents other consumer being created:
+        self.subscribe(destination="first", queue="test-queue-2", exclusive=True)
+        try:
+            self.subscribe(destination="second", queue="test-queue-2")
+            self.fail("Expected consume request to fail due to previous exclusive consumer")
+        except Closed, e:
+            self.assertChannelException(403, e.args[0])
+
+        #open new channel and cleanup last consumer:
+        channel = self.client.channel(2)
+        channel.session_open()
+
+        #check that an exclusive consumer cannot be created if a consumer already exists:
+        self.subscribe(channel, destination="first", queue="test-queue-2")
+        try:
+            self.subscribe(destination="second", queue="test-queue-2", exclusive=True)
+            self.fail("Expected exclusive consume request to fail due to previous consumer")
+        except Closed, e:
+            self.assertChannelException(403, e.args[0])
+
+    def test_consume_queue_errors(self):
+        """
+        Test error conditions associated with the queue field of the consume method:
+        """
+        channel = self.channel
+        try:
+            #queue specified but doesn't exist:
+            self.subscribe(queue="invalid-queue", destination="")
+            self.fail("Expected failure when consuming from non-existent queue")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+        channel = self.client.channel(2)
+        channel.session_open()
+        try:
+            #queue not specified and none previously declared for channel:
+            self.subscribe(channel, queue="", destination="")
+            self.fail("Expected failure when consuming from unspecified queue")
+        except Closed, e:
+            self.assertConnectionException(530, e.args[0])
+
+    def test_consume_unique_consumers(self):
+        """
+        Ensure unique consumer tags are enforced
+        """
+        channel = self.channel
+        #setup, declare a queue:
+        channel.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True)
+
+        #check that attempts to use duplicate tags are detected and prevented:
+        self.subscribe(destination="first", queue="test-queue-3")
+        try:
+            self.subscribe(destination="first", queue="test-queue-3")
+            self.fail("Expected consume request to fail due to non-unique tag")
+        except Closed, e:
+            self.assertConnectionException(530, e.args[0])
+
+    def test_cancel(self):
+        """
+        Test compliance of the basic.cancel method
+        """
+        channel = self.channel
+        #setup, declare a queue:
+        channel.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True)
+        self.subscribe(destination="my-consumer", queue="test-queue-4")
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="One"))
+
+        #cancel should stop messages being delivered
+        channel.message_cancel(destination="my-consumer")
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-queue-4"}, body="Two"))
+        myqueue = self.client.queue("my-consumer")
+        msg = myqueue.get(timeout=1)
+        self.assertEqual("One", msg.content.body)
+        try:
+            msg = myqueue.get(timeout=1)
+            self.fail("Got message after cancellation: " + msg)
+        except Empty: None
+
+        #cancellation of non-existant consumers should be handled without error
+        channel.message_cancel(destination="my-consumer")
+        channel.message_cancel(destination="this-never-existed")
+
+
+    def test_ack(self):
+        """
+        Test basic ack/recover behaviour
+        """
+        channel = self.channel
+        channel.queue_declare(queue="test-ack-queue", exclusive=True, auto_delete=True)
+
+        self.subscribe(queue="test-ack-queue", destination="consumer_tag", confirm_mode=1)
+        queue = self.client.queue("consumer_tag")
+
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="One"))
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Two"))
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Three"))
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Four"))
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-ack-queue"}, body="Five"))
+
+        msg1 = queue.get(timeout=1)
+        msg2 = queue.get(timeout=1)
+        msg3 = queue.get(timeout=1)
+        msg4 = queue.get(timeout=1)
+        msg5 = queue.get(timeout=1)
+
+        self.assertEqual("One", msg1.content.body)
+        self.assertEqual("Two", msg2.content.body)
+        self.assertEqual("Three", msg3.content.body)
+        self.assertEqual("Four", msg4.content.body)
+        self.assertEqual("Five", msg5.content.body)
+
+        msg2.complete(cumulative=True)#One and Two
+        msg4.complete(cumulative=False)
+
+        channel.message_recover(requeue=False)
+
+        msg3b = queue.get(timeout=1)
+        msg5b = queue.get(timeout=1)
+
+        self.assertEqual("Three", msg3b.content.body)
+        self.assertEqual("Five", msg5b.content.body)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message: " + extra.content.body)
+        except Empty: None
+
+
+    def test_recover(self):
+        """
+        Test recover behaviour
+        """
+        channel = self.channel
+        channel.queue_declare(queue="queue-a", exclusive=True, auto_delete=True)
+        channel.queue_bind(exchange="amq.fanout", queue="queue-a")
+        channel.queue_declare(queue="queue-b", exclusive=True, auto_delete=True)
+        channel.queue_bind(exchange="amq.fanout", queue="queue-b")
+
+        self.subscribe(queue="queue-a", destination="unconfirmed", confirm_mode=1)
+        self.subscribe(queue="queue-b", destination="confirmed", confirm_mode=0)
+        confirmed = self.client.queue("confirmed")
+        unconfirmed = self.client.queue("unconfirmed")
+
+        data = ["One", "Two", "Three", "Four", "Five"]
+        for d in data:
+            channel.message_transfer(destination="amq.fanout", content=Content(body=d))
+
+        for q in [confirmed, unconfirmed]:
+            for d in data:
+                self.assertEqual(d, q.get(timeout=1).content.body)
+            self.assertEmpty(q)
+
+        channel.message_recover(requeue=False)
+
+        self.assertEmpty(confirmed)
+
+        while len(data):
+            msg = None
+            for d in data:
+                msg = unconfirmed.get(timeout=1)
+                self.assertEqual(d, msg.content.body)
+                self.assertEqual(True, msg.content['redelivered'])
+            self.assertEmpty(unconfirmed)
+            data.remove(msg.content.body)
+            msg.complete(cumulative=False)
+            channel.message_recover(requeue=False)
+
+
+    def test_recover_requeue(self):
+        """
+        Test requeing on recovery
+        """
+        channel = self.channel
+        channel.queue_declare(queue="test-requeue", exclusive=True, auto_delete=True)
+
+        self.subscribe(queue="test-requeue", destination="consumer_tag", confirm_mode=1)
+        queue = self.client.queue("consumer_tag")
+
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="One"))
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Two"))
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Three"))
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Four"))
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Five"))
+
+        msg1 = queue.get(timeout=1)
+        msg2 = queue.get(timeout=1)
+        msg3 = queue.get(timeout=1)
+        msg4 = queue.get(timeout=1)
+        msg5 = queue.get(timeout=1)
+
+        self.assertEqual("One", msg1.content.body)
+        self.assertEqual("Two", msg2.content.body)
+        self.assertEqual("Three", msg3.content.body)
+        self.assertEqual("Four", msg4.content.body)
+        self.assertEqual("Five", msg5.content.body)
+
+        msg2.complete(cumulative=True)  #One and Two
+        msg4.complete(cumulative=False)  #Four
+
+        channel.message_cancel(destination="consumer_tag")
+
+        #publish a new message
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-requeue"}, body="Six"))
+        #requeue unacked messages (Three and Five)
+        channel.message_recover(requeue=True)
+
+        self.subscribe(queue="test-requeue", destination="consumer_tag")
+        queue2 = self.client.queue("consumer_tag")
+
+        msg3b = queue2.get(timeout=1)
+        msg5b = queue2.get(timeout=1)
+
+        self.assertEqual("Three", msg3b.content.body)
+        self.assertEqual("Five", msg5b.content.body)
+
+        self.assertEqual(True, msg3b.content['redelivered'])
+        self.assertEqual(True, msg5b.content['redelivered'])
+
+        self.assertEqual("Six", queue2.get(timeout=1).content.body)
+
+        try:
+            extra = queue2.get(timeout=1)
+            self.fail("Got unexpected message in second queue: " + extra.content.body)
+        except Empty: None
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in original queue: " + extra.content.body)
+        except Empty: None
+
+
+    def test_qos_prefetch_count(self):
+        """
+        Test that the prefetch count specified is honoured
+        """
+        #setup: declare queue and subscribe
+        channel = self.channel
+        channel.queue_declare(queue="test-prefetch-count", exclusive=True, auto_delete=True)
+        subscription = self.subscribe(queue="test-prefetch-count", destination="consumer_tag", confirm_mode=1)
+        queue = self.client.queue("consumer_tag")
+
+        #set prefetch to 5:
+        channel.message_qos(prefetch_count=5)
+
+        #publish 10 messages:
+        for i in range(1, 11):
+            channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-count"}, body="Message %d" % i))
+
+        #only 5 messages should have been delivered:
+        for i in range(1, 6):
+            msg = queue.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+        except Empty: None
+
+        #ack messages and check that the next set arrive ok:
+        msg.complete()
+
+        for i in range(6, 11):
+            msg = queue.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+
+        msg.complete()
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+        except Empty: None
+
+
+
+    def test_qos_prefetch_size(self):
+        """
+        Test that the prefetch size specified is honoured
+        """
+        #setup: declare queue and subscribe
+        channel = self.channel
+        channel.queue_declare(queue="test-prefetch-size", exclusive=True, auto_delete=True)
+        subscription = self.subscribe(queue="test-prefetch-size", destination="consumer_tag", confirm_mode=1)
+        queue = self.client.queue("consumer_tag")
+
+        #set prefetch to 50 bytes (each message is 9 or 10 bytes):
+        channel.message_qos(prefetch_size=50)
+
+        #publish 10 messages:
+        for i in range(1, 11):
+            channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body="Message %d" % i))
+
+        #only 5 messages should have been delivered (i.e. 45 bytes worth):
+        for i in range(1, 6):
+            msg = queue.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+        except Empty: None
+
+        #ack messages and check that the next set arrive ok:
+        msg.complete()
+
+        for i in range(6, 11):
+            msg = queue.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+
+        msg.complete()
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+        except Empty: None
+
+        #make sure that a single oversized message still gets delivered
+        large = "abcdefghijklmnopqrstuvwxyz"
+        large = large + "-" + large;
+        channel.message_transfer(content=Content(properties={'routing_key' : "test-prefetch-size"}, body=large))
+        msg = queue.get(timeout=1)
+        self.assertEqual(large, msg.content.body)
+
+    def test_reject(self):
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout")
+        channel.queue_declare(queue = "r", exclusive=True, auto_delete=True)
+        channel.queue_bind(queue = "r", exchange = "amq.fanout")
+
+        self.subscribe(queue = "q", destination = "consumer", confirm_mode = 1)
+        channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body="blah, blah"))
+        msg = self.client.queue("consumer").get(timeout = 1)
+        self.assertEquals(msg.content.body, "blah, blah")
+        channel.message_reject([msg.command_id, msg.command_id])
+
+        self.subscribe(queue = "r", destination = "checker")
+        msg = self.client.queue("checker").get(timeout = 1)
+        self.assertEquals(msg.content.body, "blah, blah")
+
+    def test_credit_flow_messages(self):
+        """
+        Test basic credit based flow control with unit = message
+        """
+        #declare an exclusive queue
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        #create consumer (for now that defaults to infinite credit)
+        channel.message_subscribe(queue = "q", destination = "c")
+        channel.message_flow_mode(mode = 0, destination = "c")
+        #send batch of messages to queue
+        for i in range(1, 11):
+            channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
+
+        #set message credit to finite amount (less than enough for all messages)
+        channel.message_flow(unit = 0, value = 5, destination = "c")
+        #set infinite byte credit
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+        #check that expected number were received
+        q = self.client.queue("c")
+        for i in range(1, 6):
+            self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+        self.assertEmpty(q)
+
+        #increase credit again and check more are received
+        for i in range(6, 11):
+            channel.message_flow(unit = 0, value = 1, destination = "c")
+            self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+            self.assertEmpty(q)
+
+    def test_credit_flow_bytes(self):
+        """
+        Test basic credit based flow control with unit = bytes
+        """
+        #declare an exclusive queue
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        #create consumer (for now that defaults to infinite credit)
+        channel.message_subscribe(queue = "q", destination = "c")
+        channel.message_flow_mode(mode = 0, destination = "c")
+        #send batch of messages to queue
+        for i in range(10):
+            channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
+
+        #each message is currently interpreted as requiring msg_size bytes of credit
+        msg_size = 35
+
+        #set byte credit to finite amount (less than enough for all messages)
+        channel.message_flow(unit = 1, value = msg_size*5, destination = "c")
+        #set infinite message credit
+        channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
+        #check that expected number were received
+        q = self.client.queue("c")
+        for i in range(5):
+            self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+        self.assertEmpty(q)
+
+        #increase credit again and check more are received
+        for i in range(5):
+            channel.message_flow(unit = 1, value = msg_size, destination = "c")
+            self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+            self.assertEmpty(q)
+
+
+    def test_window_flow_messages(self):
+        """
+        Test basic window based flow control with unit = message
+        """
+        #declare an exclusive queue
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        #create consumer (for now that defaults to infinite credit)
+        channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
+        channel.message_flow_mode(mode = 1, destination = "c")
+        #send batch of messages to queue
+        for i in range(1, 11):
+            channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %d" % i))
+
+        #set message credit to finite amount (less than enough for all messages)
+        channel.message_flow(unit = 0, value = 5, destination = "c")
+        #set infinite byte credit
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+        #check that expected number were received
+        q = self.client.queue("c")
+        for i in range(1, 6):
+            msg = q.get(timeout = 1)
+            self.assertDataEquals(channel, msg, "Message %d" % i)
+        self.assertEmpty(q)
+
+        #acknowledge messages and check more are received
+        msg.complete(cumulative=True)
+        for i in range(6, 11):
+            self.assertDataEquals(channel, q.get(timeout = 1), "Message %d" % i)
+        self.assertEmpty(q)
+
+
+    def test_window_flow_bytes(self):
+        """
+        Test basic window based flow control with unit = bytes
+        """
+        #declare an exclusive queue
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        #create consumer (for now that defaults to infinite credit)
+        channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1)
+        channel.message_flow_mode(mode = 1, destination = "c")
+        #send batch of messages to queue
+        for i in range(10):
+            channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "abcdefgh"))
+
+        #each message is currently interpreted as requiring msg_size bytes of credit
+        msg_size = 40
+
+        #set byte credit to finite amount (less than enough for all messages)
+        channel.message_flow(unit = 1, value = msg_size*5, destination = "c")
+        #set infinite message credit
+        channel.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
+        #check that expected number were received
+        q = self.client.queue("c")
+        msgs = []
+        for i in range(5):
+            msg = q.get(timeout = 1)
+            msgs.append(msg)
+            self.assertDataEquals(channel, msg, "abcdefgh")
+        self.assertEmpty(q)
+
+        #ack each message individually and check more are received
+        for i in range(5):
+            msg = msgs.pop()
+            msg.complete(cumulative=False)
+            self.assertDataEquals(channel, q.get(timeout = 1), "abcdefgh")
+            self.assertEmpty(q)
+
+    def test_subscribe_not_acquired(self):
+        """
+        Test the not-acquired modes works as expected for a simple case
+        """
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        for i in range(1, 6):
+            channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+
+        self.subscribe(queue = "q", destination = "a", acquire_mode = 1)
+        self.subscribe(queue = "q", destination = "b", acquire_mode = 1)
+
+        for i in range(6, 11):
+            channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "Message %s" % i))
+
+        #both subscribers should see all messages
+        qA = self.client.queue("a")
+        qB = self.client.queue("b")
+        for i in range(1, 11):
+            for q in [qA, qB]:
+                msg = q.get(timeout = 1)
+                self.assertEquals("Message %s" % i, msg.content.body)
+                msg.complete()
+
+        #messages should still be on the queue:
+        self.assertEquals(10, channel.queue_query(queue = "q").message_count)
+
+    def test_acquire(self):
+        """
+        Test explicit acquire function
+        """
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "acquire me"))
+
+        self.subscribe(queue = "q", destination = "a", acquire_mode = 1, confirm_mode = 1)
+        msg = self.client.queue("a").get(timeout = 1)
+        #message should still be on the queue:
+        self.assertEquals(1, channel.queue_query(queue = "q").message_count)
+
+        channel.message_acquire([msg.command_id, msg.command_id])
+        #check that we get notification (i.e. message_acquired)
+        response = channel.control_queue.get(timeout=1)
+        self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
+        #message should have been removed from the queue:
+        self.assertEquals(0, channel.queue_query(queue = "q").message_count)
+        msg.complete()
+
+
+
+
+    def test_release(self):
+        """
+        Test explicit release function
+        """
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "release me"))
+
+        self.subscribe(queue = "q", destination = "a", acquire_mode = 0, confirm_mode = 1)
+        msg = self.client.queue("a").get(timeout = 1)
+        channel.message_cancel(destination = "a")
+        channel.message_release([msg.command_id, msg.command_id])
+        msg.complete()
+
+        #message should not have been removed from the queue:
+        self.assertEquals(1, channel.queue_query(queue = "q").message_count)
+
+    def test_release_ordering(self):
+        """
+        Test order of released messages is as expected
+        """
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        for i in range (1, 11):
+            channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "released message %s" % (i)))
+
+        channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
+        channel.message_flow(unit = 0, value = 10, destination = "a")
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+        queue = self.client.queue("a")
+        first = queue.get(timeout = 1)
+        for i in range (2, 10):
+            self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body)
+        last = queue.get(timeout = 1)
+        self.assertEmpty(queue)
+        channel.message_release([first.command_id, last.command_id])
+        last.complete()#will re-allocate credit, as in window mode
+        for i in range (1, 11):
+            self.assertEquals("released message %s" % (i), queue.get(timeout = 1).content.body)
+
+    def test_ranged_ack(self):
+        """
+        Test acking of messages ranges
+        """
+        channel = self.channel
+        channel.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        for i in range (1, 11):
+            channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message %s" % (i)))
+
+        channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
+        channel.message_flow(unit = 0, value = 10, destination = "a")
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+        queue = self.client.queue("a")
+        for i in range (1, 11):
+            self.assertEquals("message %s" % (i), queue.get(timeout = 1).content.body)
+        self.assertEmpty(queue)
+
+        #ack all but the third message (command id 2)
+        channel.execution_complete(cumulative_execution_mark=0xFFFFFFFF, ranged_execution_set=[0,1,3,6,7,7,8,9])
+        channel.message_recover()
+        self.assertEquals("message 3", queue.get(timeout = 1).content.body)
+        self.assertEmpty(queue)
+
+    def test_subscribe_not_acquired_2(self):
+        channel = self.channel
+
+        #publish some messages
+        self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        for i in range(1, 11):
+            channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+
+        #consume some of them
+        channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1)
+        channel.message_flow_mode(mode = 0, destination = "a")
+        channel.message_flow(unit = 0, value = 5, destination = "a")
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+
+        queue = self.client.queue("a")
+        for i in range(1, 6):
+            msg = queue.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.content.body)
+            msg.complete()
+        self.assertEmpty(queue)
+
+        #now create a not-acquired subscriber
+        channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+
+        #check it gets those not consumed
+        queue = self.client.queue("b")
+        channel.message_flow(unit = 0, value = 1, destination = "b")
+        for i in range(6, 11):
+            msg = queue.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.content.body)
+            msg.complete()
+        channel.message_flow(unit = 0, value = 1, destination = "b")
+        self.assertEmpty(queue)
+
+        #check all 'browsed' messages are still on the queue
+        self.assertEqual(5, channel.queue_query(queue="q").message_count)
+
+    def test_subscribe_not_acquired_3(self):
+        channel = self.channel
+
+        #publish some messages
+        self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        for i in range(1, 11):
+            channel.message_transfer(content=Content(properties={'routing_key' : "q"}, body = "message-%d" % (i)))
+
+        #create a not-acquired subscriber
+        channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+        channel.message_flow(unit = 0, value = 10, destination = "a")
+
+        #browse through messages
+        queue = self.client.queue("a")
+        for i in range(1, 11):
+            msg = queue.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.content.body)
+            if (i % 2):
+                #try to acquire every second message
+                channel.message_acquire([msg.command_id, msg.command_id])
+                #check that acquire succeeds
+                response = channel.control_queue.get(timeout=1)
+                self.assertEquals(response.transfers, [msg.command_id, msg.command_id])
+            msg.complete()
+        self.assertEmpty(queue)
+
+        #create a second not-acquired subscriber
+        channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+        channel.message_flow(unit = 0, value = 1, destination = "b")
+        #check it gets those not consumed
+        queue = self.client.queue("b")
+        for i in [2,4,6,8,10]:
+            msg = queue.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.content.body)
+            msg.complete()
+        channel.message_flow(unit = 0, value = 1, destination = "b")
+        self.assertEmpty(queue)
+
+        #check all 'browsed' messages are still on the queue
+        self.assertEqual(5, channel.queue_query(queue="q").message_count)
+
+    def test_release_unacquired(self):
+        channel = self.channel
+
+        #create queue
+        self.queue_declare(queue = "q", exclusive=True, auto_delete=True, durable=True)
+
+        #send message
+        channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
+
+        #create two 'browsers'
+        channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=1)
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+        channel.message_flow(unit = 0, value = 10, destination = "a")
+        queueA = self.client.queue("a")
+
+        channel.message_subscribe(queue = "q", destination = "b", confirm_mode = 1, acquire_mode=1)
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+        channel.message_flow(unit = 0, value = 10, destination = "b")
+        queueB = self.client.queue("b")
+        
+        #have each browser release the message
+        msgA = queueA.get(timeout = 1)
+        channel.message_release([msgA.command_id, msgA.command_id])
+
+        msgB = queueB.get(timeout = 1)
+        channel.message_release([msgB.command_id, msgB.command_id])
+        
+        #cancel browsers
+        channel.message_cancel(destination = "a")
+        channel.message_cancel(destination = "b")
+        
+        #create consumer
+        channel.message_subscribe(queue = "q", destination = "c", confirm_mode = 1, acquire_mode=0)
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+        channel.message_flow(unit = 0, value = 10, destination = "c")
+        queueC = self.client.queue("c")
+        #consume the message then ack it
+        msgC = queueC.get(timeout = 1)
+        msgC.complete()
+        #ensure there are no other messages
+        self.assertEmpty(queueC)
+
+    def test_no_size(self):
+        self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+        ch = self.channel
+        ch.message_transfer(content=SizelessContent(properties={'routing_key' : "q"}, body="message-body"))
+
+        ch.message_subscribe(queue = "q", destination="d", confirm_mode = 0)
+        ch.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "d")
+        ch.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "d")
+
+        queue = self.client.queue("d")
+        msg = queue.get(timeout = 3)
+        self.assertEquals("message-body", msg.content.body)
+
+    def assertDataEquals(self, channel, msg, expected):
+        self.assertEquals(expected, msg.content.body)
+
+    def assertEmpty(self, queue):
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Queue not empty, contains: " + extra.content.body)
+        except Empty: None
+
+class SizelessContent(Content):
+
+    def size(self):
+        return None

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10_preview/message.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/python/tests_0-10_preview/persistence.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10_preview/persistence.py?rev=634003&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10_preview/persistence.py (added)
+++ incubator/qpid/trunk/qpid/python/tests_0-10_preview/persistence.py Wed Mar  5 11:56:43 2008
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import testrunner, TestBase
+
+class PersistenceTests(TestBase):
+    def test_delete_queue_after_publish(self):
+        channel = self.channel
+        channel.synchronous = False
+
+        #create queue
+        channel.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+        #send message
+        for i in range(1, 10):
+            channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
+
+        channel.synchronous = True
+        #explicitly delete queue
+        channel.queue_delete(queue = "q")
+
+    def test_ack_message_from_deleted_queue(self):
+        channel = self.channel
+        channel.synchronous = False
+
+        #create queue
+        channel.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+        #send message
+        channel.message_transfer(content=Content(properties={'routing_key' : "q", 'delivery_mode':2}, body = "my-message"))
+
+        #create consumer
+        channel.message_subscribe(queue = "q", destination = "a", confirm_mode = 1, acquire_mode=0)
+        channel.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+        channel.message_flow(unit = 0, value = 10, destination = "a")
+        queue = self.client.queue("a")
+
+        #consume the message, cancel subscription (triggering auto-delete), then ack it
+        msg = queue.get(timeout = 5)
+        channel.message_cancel(destination = "a")        
+        msg.complete()
+
+    def test_queue_deletion(self):
+        channel = self.channel
+        channel.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
+        channel.queue_bind(exchange="amq.topic", queue="durable-subscriber-queue", routing_key="xyz")
+        channel.message_transfer(destination= "amq.topic", content=Content(properties={'routing_key' : "xyz", 'delivery_mode':2}, body = "my-message"))
+        channel.queue_delete(queue = "durable-subscriber-queue")
+    

Propchange: incubator/qpid/trunk/qpid/python/tests_0-10_preview/persistence.py
------------------------------------------------------------------------------
    svn:eol-style = native