You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/05 16:08:46 UTC

svn commit: r906961 [3/4] - in /qpid/trunk/qpid: cpp/src/tests/ python/ python/tests_0-10/ python/tests_0-8/ python/tests_0-9/ tests/ tests/src/ tests/src/py/ tests/src/py/qpid_tests/ tests/src/py/qpid_tests/broker_0_10/ tests/src/py/qpid_tests/broker_...

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,366 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message
+from qpid.session import SessionException
+
+class QueueTests(TestBase010):
+    """Tests for 'methods' on the amqp queue 'class'"""
+
+    def test_purge(self):
+        """
+        Test that the purge method removes messages from the queue
+        """
+        session = self.session
+        #setup, declare a queue and add some messages to it:
+        session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "one"))
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "two"))
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "three"))
+
+        #check that the queue now reports 3 messages:
+        session.queue_declare(queue="test-queue")
+        reply = session.queue_query(queue="test-queue")
+        self.assertEqual(3, reply.message_count)
+
+        #now do the purge, then test that three messages are purged and the count drops to 0
+        session.queue_purge(queue="test-queue");
+        reply = session.queue_query(queue="test-queue")
+        self.assertEqual(0, reply.message_count)        
+
+        #send a further message and consume it, ensuring that the other messages are really gone
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "four"))
+        session.message_subscribe(queue="test-queue", destination="tag")
+        session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        queue = session.incoming("tag")
+        msg = queue.get(timeout=1)
+        self.assertEqual("four", msg.body)
+
+    def test_purge_queue_exists(self):
+        """        
+        Test that the correct exception is thrown is no queue exists
+        for the name specified in purge        
+        """        
+        session = self.session
+        try:
+            #queue specified but doesn't exist:
+            session.queue_purge(queue="invalid-queue")            
+            self.fail("Expected failure when purging non-existent queue")
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code) #not-found
+
+    def test_purge_empty_name(self):        
+        """
+        Test that the correct exception is thrown is no queue name
+        is specified for purge
+        """        
+        session = self.session
+        try:
+            #queue not specified and none previously declared for channel:
+            session.queue_purge()
+            self.fail("Expected failure when purging unspecified queue")
+        except SessionException, e:
+            self.assertEquals(531, e.args[0].error_code) #illegal-argument
+
+    def test_declare_exclusive(self):
+        """
+        Test that the exclusive field is honoured in queue.declare
+        """
+        # TestBase.setUp has already opened session(1)
+        s1 = self.session
+        # Here we open a second separate connection:
+        s2 = self.conn.session("other")
+
+        #declare an exclusive queue:
+        s1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
+        try:
+            #other connection should not be allowed to declare this:
+            s2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
+            self.fail("Expected second exclusive queue_declare to raise a channel exception")
+        except SessionException, e:
+            self.assertEquals(405, e.args[0].error_code)
+            
+        s3 = self.conn.session("subscriber")
+        try:
+            #other connection should not be allowed to declare this:
+            s3.message_subscribe(queue="exclusive-queue")
+            self.fail("Expected message_subscribe on an exclusive queue to raise a channel exception")
+        except SessionException, e:
+            self.assertEquals(405, e.args[0].error_code)
+
+        s4 = self.conn.session("deleter")
+        try:
+            #other connection should not be allowed to declare this:
+            s4.queue_delete(queue="exclusive-queue")
+            self.fail("Expected queue_delete on an exclusive queue to raise a channel exception")
+        except SessionException, e:
+            self.assertEquals(405, e.args[0].error_code)
+
+
+    def test_declare_passive(self):
+        """
+        Test that the passive field is honoured in queue.declare
+        """
+        session = self.session
+        #declare an exclusive queue:
+        session.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True)
+        session.queue_declare(queue="passive-queue-1", passive=True)
+        try:
+            #other connection should not be allowed to declare this:
+            session.queue_declare(queue="passive-queue-2", passive=True)
+            self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code) #not-found
+
+
+    def test_bind(self):
+        """
+        Test various permutations of the queue.bind method
+        """
+        session = self.session
+        session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
+
+        #straightforward case, both exchange & queue exist so no errors expected:
+        session.exchange_bind(queue="queue-1", exchange="amq.direct", binding_key="key1")
+
+        #use the queue name where the routing key is not specified:
+        session.exchange_bind(queue="queue-1", exchange="amq.direct")
+
+        #try and bind to non-existant exchange
+        try:
+            session.exchange_bind(queue="queue-1", exchange="an-invalid-exchange", binding_key="key1")
+            self.fail("Expected bind to non-existant exchange to fail")
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
+
+
+    def test_bind_queue_existence(self):
+        session = self.session
+        #try and bind non-existant queue:
+        try:
+            session.exchange_bind(queue="queue-2", exchange="amq.direct", binding_key="key1")
+            self.fail("Expected bind of non-existant queue to fail")
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
+
+    def test_unbind_direct(self):
+        self.unbind_test(exchange="amq.direct", routing_key="key")
+
+    def test_unbind_topic(self):
+        self.unbind_test(exchange="amq.topic", routing_key="key")
+
+    def test_unbind_fanout(self):
+        self.unbind_test(exchange="amq.fanout")
+
+    def test_unbind_headers(self):
+        self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"})
+
+    def unbind_test(self, exchange, routing_key="", args=None, headers=None):
+        #bind two queues and consume from them
+        session = self.session
+        
+        session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
+        session.queue_declare(queue="queue-2", exclusive=True, auto_delete=True)
+
+        session.message_subscribe(queue="queue-1", destination="queue-1")
+        session.message_flow(destination="queue-1", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="queue-1", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        session.message_subscribe(queue="queue-2", destination="queue-2")
+        session.message_flow(destination="queue-2", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="queue-2", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+
+        queue1 = session.incoming("queue-1")
+        queue2 = session.incoming("queue-2")
+
+        session.exchange_bind(exchange=exchange, queue="queue-1", binding_key=routing_key, arguments=args)
+        session.exchange_bind(exchange=exchange, queue="queue-2", binding_key=routing_key, arguments=args)
+
+        dp = session.delivery_properties(routing_key=routing_key)
+        if (headers):
+            mp = session.message_properties(application_headers=headers)
+            msg1 = Message(dp, mp, "one")
+            msg2 = Message(dp, mp, "two")
+        else:
+            msg1 = Message(dp, "one")
+            msg2 = Message(dp, "two")
+            
+        #send a message that will match both bindings
+        session.message_transfer(destination=exchange, message=msg1)
+        
+        #unbind first queue
+        session.exchange_unbind(exchange=exchange, queue="queue-1", binding_key=routing_key)
+        
+        #send another message
+        session.message_transfer(destination=exchange, message=msg2)
+
+        #check one queue has both messages and the other has only one
+        self.assertEquals("one", queue1.get(timeout=1).body)
+        try:
+            msg = queue1.get(timeout=1)
+            self.fail("Got extra message: %s" % msg.body)
+        except Empty: pass
+
+        self.assertEquals("one", queue2.get(timeout=1).body)
+        self.assertEquals("two", queue2.get(timeout=1).body)
+        try:
+            msg = queue2.get(timeout=1)
+            self.fail("Got extra message: " + msg)
+        except Empty: pass        
+
+
+    def test_delete_simple(self):
+        """
+        Test core queue deletion behaviour
+        """
+        session = self.session
+
+        #straight-forward case:
+        session.queue_declare(queue="delete-me")
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "a"))
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "b"))
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "c"))
+        session.queue_delete(queue="delete-me")
+        #check that it has gone by declaring passively
+        try:
+            session.queue_declare(queue="delete-me", passive=True)
+            self.fail("Queue has not been deleted")
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
+
+    def test_delete_queue_exists(self):
+        """
+        Test core queue deletion behaviour
+        """
+        #check attempted deletion of non-existant queue is handled correctly:    
+        session = self.session
+        try:
+            session.queue_delete(queue="i-dont-exist", if_empty=True)
+            self.fail("Expected delete of non-existant queue to fail")
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
+
+        
+
+    def test_delete_ifempty(self):
+        """
+        Test that if_empty field of queue_delete is honoured
+        """
+        session = self.session
+
+        #create a queue and add a message to it (use default binding):
+        session.queue_declare(queue="delete-me-2")
+        session.queue_declare(queue="delete-me-2", passive=True)
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me-2"), "message"))
+
+        #try to delete, but only if empty:
+        try:
+            session.queue_delete(queue="delete-me-2", if_empty=True)
+            self.fail("Expected delete if_empty to fail for non-empty queue")
+        except SessionException, e:
+            self.assertEquals(406, e.args[0].error_code)
+
+        #need new session now:    
+        session = self.conn.session("replacement", 2)
+
+        #empty queue:
+        session.message_subscribe(destination="consumer_tag", queue="delete-me-2")
+        session.message_flow(destination="consumer_tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="consumer_tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        queue = session.incoming("consumer_tag")
+        msg = queue.get(timeout=1)
+        self.assertEqual("message", msg.body)
+        session.message_cancel(destination="consumer_tag")
+
+        #retry deletion on empty queue:
+        session.queue_delete(queue="delete-me-2", if_empty=True)
+
+        #check that it has gone by declaring passively:
+        try:
+            session.queue_declare(queue="delete-me-2", passive=True)
+            self.fail("Queue has not been deleted")
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
+        
+    def test_delete_ifunused(self):
+        """
+        Test that if_unused field of queue_delete is honoured
+        """
+        session = self.session
+
+        #create a queue and register a consumer:
+        session.queue_declare(queue="delete-me-3")
+        session.queue_declare(queue="delete-me-3", passive=True)
+        session.message_subscribe(destination="consumer_tag", queue="delete-me-3")
+
+        #need new session now:    
+        session2 = self.conn.session("replacement", 2)
+
+        #try to delete, but only if empty:
+        try:
+            session2.queue_delete(queue="delete-me-3", if_unused=True)
+            self.fail("Expected delete if_unused to fail for queue with existing consumer")
+        except SessionException, e:
+            self.assertEquals(406, e.args[0].error_code)
+
+        session.message_cancel(destination="consumer_tag")    
+        session.queue_delete(queue="delete-me-3", if_unused=True)
+        #check that it has gone by declaring passively:
+        try:
+            session.queue_declare(queue="delete-me-3", passive=True)
+            self.fail("Queue has not been deleted")
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
+
+
+    def test_autodelete_shared(self):
+        """
+        Test auto-deletion (of non-exclusive queues)
+        """
+        session = self.session
+        session2 =self.conn.session("other", 1)
+
+        session.queue_declare(queue="auto-delete-me", auto_delete=True)
+
+        #consume from both sessions
+        tag = "my-tag"
+        session.message_subscribe(queue="auto-delete-me", destination=tag)
+        session2.message_subscribe(queue="auto-delete-me", destination=tag)
+
+        #implicit cancel
+        session2.close()
+
+        #check it is still there
+        session.queue_declare(queue="auto-delete-me", passive=True)
+
+        #explicit cancel => queue is now unused again:
+        session.message_cancel(destination=tag)
+
+        #NOTE: this assumes there is no timeout in use
+
+        #check that it has gone by declaring it passively
+        try:
+            session.queue_declare(queue="auto-delete-me", passive=True)
+            self.fail("Expected queue to have been deleted")
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
+
+

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,265 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import TestBase010
+
+class TxTests(TestBase010):
+    """
+    Tests for 'methods' on the amqp tx 'class'
+    """
+
+    def test_commit(self):
+        """
+        Test that commited publishes are delivered and commited acks are not re-delivered
+        """
+        session = self.session
+
+        #declare queues and create subscribers in the checking session
+        #to ensure that the queues are not auto-deleted too early:        
+        self.declare_queues(["tx-commit-a", "tx-commit-b", "tx-commit-c"])
+        session.message_subscribe(queue="tx-commit-a", destination="qa")
+        session.message_subscribe(queue="tx-commit-b", destination="qb")
+        session.message_subscribe(queue="tx-commit-c", destination="qc")
+
+        #use a separate session for actual work
+        session2 = self.conn.session("worker", 2)
+        self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
+        session2.tx_commit()
+        session2.close()
+
+        session.tx_select()
+
+        self.enable_flow("qa")
+        queue_a = session.incoming("qa")
+
+        self.enable_flow("qb")
+        queue_b = session.incoming("qb")
+
+        self.enable_flow("qc")
+        queue_c = session.incoming("qc")
+
+        #check results
+        for i in range(1, 5):
+            msg = queue_c.get(timeout=1)
+            self.assertEqual("TxMessage %d" % i, msg.body)
+            session.message_accept(RangedSet(msg.id))
+
+        msg = queue_b.get(timeout=1)
+        self.assertEqual("TxMessage 6", msg.body)
+        session.message_accept(RangedSet(msg.id))
+
+        msg = queue_a.get(timeout=1)
+        self.assertEqual("TxMessage 7", msg.body)
+        session.message_accept(RangedSet(msg.id))
+
+        for q in [queue_a, queue_b, queue_c]:
+            try:
+                extra = q.get(timeout=1)
+                self.fail("Got unexpected message: " + extra.body)
+            except Empty: None
+
+        #cleanup
+        session.tx_commit()
+
+    def test_auto_rollback(self):
+        """
+        Test that a session closed with an open transaction is effectively rolled back
+        """
+        session = self.session
+        self.declare_queues(["tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c"])
+        session.message_subscribe(queue="tx-autorollback-a", destination="qa")
+        session.message_subscribe(queue="tx-autorollback-b", destination="qb")
+        session.message_subscribe(queue="tx-autorollback-c", destination="qc")
+
+        session2 = self.conn.session("worker", 2)
+        queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
+
+        for q in [queue_a, queue_b, queue_c]:
+            try:
+                extra = q.get(timeout=1)
+                self.fail("Got unexpected message: " + extra.body)
+            except Empty: None
+
+        session2.close()
+
+        session.tx_select()
+
+        self.enable_flow("qa")
+        queue_a = session.incoming("qa")
+
+        self.enable_flow("qb")
+        queue_b = session.incoming("qb")
+
+        self.enable_flow("qc")
+        queue_c = session.incoming("qc")
+
+        #check results
+        for i in range(1, 5):
+            msg = queue_a.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.body)
+            session.message_accept(RangedSet(msg.id))
+
+        msg = queue_b.get(timeout=1)
+        self.assertEqual("Message 6", msg.body)
+        session.message_accept(RangedSet(msg.id))
+
+        msg = queue_c.get(timeout=1)
+        self.assertEqual("Message 7", msg.body)
+        session.message_accept(RangedSet(msg.id))
+
+        for q in [queue_a, queue_b, queue_c]:
+            try:
+                extra = q.get(timeout=1)
+                self.fail("Got unexpected message: " + extra.body)
+            except Empty: None
+
+        #cleanup
+        session.tx_commit()
+
+    def test_rollback(self):
+        """
+        Test that rolled back publishes are not delivered and rolled back acks are re-delivered
+        """
+        session = self.session
+        queue_a, queue_b, queue_c, consumed = self.perform_txn_work(session, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
+
+        for q in [queue_a, queue_b, queue_c]:
+            try:
+                extra = q.get(timeout=1)
+                self.fail("Got unexpected message: " + extra.body)
+            except Empty: None
+
+        session.tx_rollback()
+
+        #need to release messages to get them redelivered now:
+        session.message_release(consumed)
+
+        #check results
+        for i in range(1, 5):
+            msg = queue_a.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.body)
+            session.message_accept(RangedSet(msg.id))
+
+        msg = queue_b.get(timeout=1)
+        self.assertEqual("Message 6", msg.body)
+        session.message_accept(RangedSet(msg.id))
+
+        msg = queue_c.get(timeout=1)
+        self.assertEqual("Message 7", msg.body)
+        session.message_accept(RangedSet(msg.id))
+
+        for q in [queue_a, queue_b, queue_c]:
+            try:
+                extra = q.get(timeout=1)
+                self.fail("Got unexpected message: " + extra.body)
+            except Empty: None
+
+        #cleanup
+        session.tx_commit()
+
+    def perform_txn_work(self, session, name_a, name_b, name_c):
+        """
+        Utility method that does some setup and some work under a transaction. Used for testing both
+        commit and rollback
+        """
+        #setup:
+        self.declare_queues([name_a, name_b, name_c])
+
+        key = "my_key_" + name_b
+        topic = "my_topic_" + name_c 
+    
+        session.exchange_bind(queue=name_b, exchange="amq.direct", binding_key=key)
+        session.exchange_bind(queue=name_c, exchange="amq.topic", binding_key=topic)
+
+        dp = session.delivery_properties(routing_key=name_a)
+        for i in range(1, 5):
+            mp = session.message_properties(message_id="msg%d" % i)
+            session.message_transfer(message=Message(dp, mp, "Message %d" % i))
+
+        dp = session.delivery_properties(routing_key=key)
+        mp = session.message_properties(message_id="msg6")
+        session.message_transfer(destination="amq.direct", message=Message(dp, mp, "Message 6"))
+
+        dp = session.delivery_properties(routing_key=topic)
+        mp = session.message_properties(message_id="msg7")
+        session.message_transfer(destination="amq.topic", message=Message(dp, mp, "Message 7"))
+
+        session.tx_select()
+
+        #consume and ack messages
+        acked = RangedSet()
+        self.subscribe(session, queue=name_a, destination="sub_a")
+        queue_a = session.incoming("sub_a")
+        for i in range(1, 5):
+            msg = queue_a.get(timeout=1)
+            acked.add(msg.id)
+            self.assertEqual("Message %d" % i, msg.body)
+
+        self.subscribe(session, queue=name_b, destination="sub_b")
+        queue_b = session.incoming("sub_b")
+        msg = queue_b.get(timeout=1)
+        self.assertEqual("Message 6", msg.body)
+        acked.add(msg.id)
+
+        sub_c = self.subscribe(session, queue=name_c, destination="sub_c")
+        queue_c = session.incoming("sub_c")
+        msg = queue_c.get(timeout=1)
+        self.assertEqual("Message 7", msg.body)
+        acked.add(msg.id)
+
+        session.message_accept(acked)
+
+        dp = session.delivery_properties(routing_key=topic)
+        #publish messages
+        for i in range(1, 5):
+            mp = session.message_properties(message_id="tx-msg%d" % i)
+            session.message_transfer(destination="amq.topic", message=Message(dp, mp, "TxMessage %d" % i))
+
+        dp = session.delivery_properties(routing_key=key)
+        mp = session.message_properties(message_id="tx-msg6")
+        session.message_transfer(destination="amq.direct", message=Message(dp, mp, "TxMessage 6"))
+        
+        dp = session.delivery_properties(routing_key=name_a)
+        mp = session.message_properties(message_id="tx-msg7")
+        session.message_transfer(message=Message(dp, mp, "TxMessage 7"))
+        return queue_a, queue_b, queue_c, acked
+
+    def declare_queues(self, names, session=None):
+        session = session or self.session
+        for n in names:
+            session.queue_declare(queue=n, auto_delete=True)
+
+    def subscribe(self, session=None, **keys):
+        session = session or self.session
+        consumer_tag = keys["destination"]
+        session.message_subscribe(**keys)
+        session.message_flow(destination=consumer_tag, unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination=consumer_tag, unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+
+    def enable_flow(self, tag, session=None):
+        session = session or self.session
+        session.message_flow(destination=tag, unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination=tag, unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+
+    def complete(self, session, msg):
+        session.receiver._completed.add(msg.id)#TODO: this may be done automatically
+        session.channel.session_completed(session.receiver._completed)
+

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Fri Feb  5 15:08:44 2010
@@ -0,0 +1 @@
+*.pyc

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,22 @@
+# Do not delete - marks this directory as a python package.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import basic, broker, example, exchange, queue, testlib, tx

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,396 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class BasicTests(TestBase):
+    """Tests for 'methods' on the amqp basic 'class'"""
+
+    def test_consume_no_local(self):
+        """
+        Test that the no_local flag is honoured in the consume method
+        """
+        channel = self.channel
+        #setup, declare two queues:
+        channel.queue_declare(queue="test-queue-1a", exclusive=True)
+        channel.queue_declare(queue="test-queue-1b", exclusive=True)
+        #establish two consumers one of which excludes delivery of locally sent messages
+        channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a")
+        channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True)
+
+        #send a message
+        channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local"))
+        channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local"))
+
+        #check the queues of the two consumers
+        excluded = self.client.queue("local_excluded")
+        included = self.client.queue("local_included")
+        msg = included.get(timeout=1)
+        self.assertEqual("consume_no_local", msg.content.body)
+        try:
+            excluded.get(timeout=1) 
+            self.fail("Received locally published message though no_local=true")
+        except Empty: None
+
+
+    def test_consume_exclusive(self):
+        """
+        Test that the exclusive flag is honoured in the consume method
+        """
+        channel = self.channel
+        #setup, declare a queue:
+        channel.queue_declare(queue="test-queue-2", exclusive=True)
+
+        #check that an exclusive consumer prevents other consumer being created:
+        channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True)
+        try:
+            channel.basic_consume(consumer_tag="second", queue="test-queue-2")
+            self.fail("Expected consume request to fail due to previous exclusive consumer")
+        except Closed, e:
+            self.assertChannelException(403, e.args[0])
+
+        #open new channel and cleanup last consumer:    
+        channel = self.client.channel(2)
+        channel.channel_open()
+
+        #check that an exclusive consumer cannot be created if a consumer already exists:
+        channel.basic_consume(consumer_tag="first", queue="test-queue-2")
+        try:
+            channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True)
+            self.fail("Expected exclusive consume request to fail due to previous consumer")
+        except Closed, e:
+            self.assertChannelException(403, e.args[0])
+
+    def test_consume_queue_errors(self):
+        """
+        Test error conditions associated with the queue field of the consume method:
+        """
+        channel = self.channel
+        try:
+            #queue specified but doesn't exist:
+            channel.basic_consume(queue="invalid-queue")
+            self.fail("Expected failure when consuming from non-existent queue")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+        channel = self.client.channel(2)
+        channel.channel_open()
+        try:
+            #queue not specified and none previously declared for channel:
+            channel.basic_consume(queue="")
+            self.fail("Expected failure when consuming from unspecified queue")
+        except Closed, e:
+            self.assertConnectionException(530, e.args[0])
+
+    def test_consume_unique_consumers(self):
+        """
+        Ensure unique consumer tags are enforced
+        """
+        channel = self.channel
+        #setup, declare a queue:
+        channel.queue_declare(queue="test-queue-3", exclusive=True)
+
+        #check that attempts to use duplicate tags are detected and prevented:
+        channel.basic_consume(consumer_tag="first", queue="test-queue-3")
+        try:
+            channel.basic_consume(consumer_tag="first", queue="test-queue-3")
+            self.fail("Expected consume request to fail due to non-unique tag")
+        except Closed, e:
+            self.assertConnectionException(530, e.args[0])
+
+    def test_cancel(self):
+        """
+        Test compliance of the basic.cancel method
+        """
+        channel = self.channel
+        #setup, declare a queue:
+        channel.queue_declare(queue="test-queue-4", exclusive=True)
+        channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4")
+        channel.basic_publish(routing_key="test-queue-4", content=Content("One"))
+
+        myqueue = self.client.queue("my-consumer")
+        msg = myqueue.get(timeout=1)
+        self.assertEqual("One", msg.content.body)
+	
+        #cancel should stop messages being delivered
+        channel.basic_cancel(consumer_tag="my-consumer")
+        channel.basic_publish(routing_key="test-queue-4", content=Content("Two"))
+        try:
+            msg = myqueue.get(timeout=1) 
+            self.fail("Got message after cancellation: " + msg)
+        except Empty: None
+
+        #cancellation of non-existant consumers should be handled without error
+        channel.basic_cancel(consumer_tag="my-consumer")
+        channel.basic_cancel(consumer_tag="this-never-existed")
+
+
+    def test_ack(self):
+        """
+        Test basic ack/recover behaviour
+        """
+        channel = self.channel
+        channel.queue_declare(queue="test-ack-queue", exclusive=True)
+        
+        reply = channel.basic_consume(queue="test-ack-queue", no_ack=False)
+        queue = self.client.queue(reply.consumer_tag)
+
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("One"))
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("Two"))
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("Three"))
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("Four"))
+        channel.basic_publish(routing_key="test-ack-queue", content=Content("Five"))
+                
+        msg1 = queue.get(timeout=1)
+        msg2 = queue.get(timeout=1)
+        msg3 = queue.get(timeout=1)
+        msg4 = queue.get(timeout=1)
+        msg5 = queue.get(timeout=1)
+        
+        self.assertEqual("One", msg1.content.body)
+        self.assertEqual("Two", msg2.content.body)
+        self.assertEqual("Three", msg3.content.body)
+        self.assertEqual("Four", msg4.content.body)
+        self.assertEqual("Five", msg5.content.body)
+
+        channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True)  #One & Two
+        channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
+
+        channel.basic_recover(requeue=False)
+        
+        msg3b = queue.get(timeout=1)
+        msg5b = queue.get(timeout=1)
+        
+        self.assertEqual("Three", msg3b.content.body)
+        self.assertEqual("Five", msg5b.content.body)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message: " + extra.content.body)
+        except Empty: None
+
+    def test_recover_requeue(self):
+        """
+        Test requeing on recovery
+        """
+        channel = self.channel
+        channel.queue_declare(queue="test-requeue", exclusive=True)
+        
+        subscription = channel.basic_consume(queue="test-requeue", no_ack=False)
+        queue = self.client.queue(subscription.consumer_tag)
+
+        channel.basic_publish(routing_key="test-requeue", content=Content("One"))
+        channel.basic_publish(routing_key="test-requeue", content=Content("Two"))
+        channel.basic_publish(routing_key="test-requeue", content=Content("Three"))
+        channel.basic_publish(routing_key="test-requeue", content=Content("Four"))
+        channel.basic_publish(routing_key="test-requeue", content=Content("Five"))
+                
+        msg1 = queue.get(timeout=1)
+        msg2 = queue.get(timeout=1)
+        msg3 = queue.get(timeout=1)
+        msg4 = queue.get(timeout=1)
+        msg5 = queue.get(timeout=1)
+        
+        self.assertEqual("One", msg1.content.body)
+        self.assertEqual("Two", msg2.content.body)
+        self.assertEqual("Three", msg3.content.body)
+        self.assertEqual("Four", msg4.content.body)
+        self.assertEqual("Five", msg5.content.body)
+
+        channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True)  #One & Two
+        channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
+
+        channel.basic_cancel(consumer_tag=subscription.consumer_tag)
+
+        channel.basic_recover(requeue=True)
+
+        subscription2 = channel.basic_consume(queue="test-requeue")
+        queue2 = self.client.queue(subscription2.consumer_tag)
+        
+        msg3b = queue2.get(timeout=1)
+        msg5b = queue2.get(timeout=1)
+        
+        self.assertEqual("Three", msg3b.content.body)
+        self.assertEqual("Five", msg5b.content.body)
+
+        self.assertEqual(True, msg3b.redelivered)
+        self.assertEqual(True, msg5b.redelivered)
+
+        try:
+            extra = queue2.get(timeout=1)
+            self.fail("Got unexpected message in second queue: " + extra.content.body)
+        except Empty: None
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in original queue: " + extra.content.body)
+        except Empty: None
+        
+        
+    def test_qos_prefetch_count(self):
+        """
+        Test that the prefetch count specified is honoured
+        """
+        #setup: declare queue and subscribe
+        channel = self.channel
+        channel.queue_declare(queue="test-prefetch-count", exclusive=True)
+        subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False)
+        queue = self.client.queue(subscription.consumer_tag)
+
+        #set prefetch to 5:
+        channel.basic_qos(prefetch_count=5)
+
+        #publish 10 messages:
+        for i in range(1, 11):
+            channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i))
+
+        #only 5 messages should have been delivered:
+        for i in range(1, 6):
+            msg = queue.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+        except Empty: None
+
+        #ack messages and check that the next set arrive ok:
+        channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+        for i in range(6, 11):
+            msg = queue.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+
+        channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+        except Empty: None
+
+
+        
+    def test_qos_prefetch_size(self):
+        """
+        Test that the prefetch size specified is honoured
+        """
+        #setup: declare queue and subscribe
+        channel = self.channel
+        channel.queue_declare(queue="test-prefetch-size", exclusive=True)
+        subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False)
+        queue = self.client.queue(subscription.consumer_tag)
+
+        #set prefetch to 50 bytes (each message is 9 or 10 bytes):
+        channel.basic_qos(prefetch_size=50)
+
+        #publish 10 messages:
+        for i in range(1, 11):
+            channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i))
+
+        #only 5 messages should have been delivered (i.e. 45 bytes worth):
+        for i in range(1, 6):
+            msg = queue.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+        except Empty: None
+
+        #ack messages and check that the next set arrive ok:
+        channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+        for i in range(6, 11):
+            msg = queue.get(timeout=1)
+            self.assertEqual("Message %d" % i, msg.content.body)
+
+        channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+        except Empty: None
+
+        #make sure that a single oversized message still gets delivered
+        large = "abcdefghijklmnopqrstuvwxyz"
+        large = large + "-" + large;
+        channel.basic_publish(routing_key="test-prefetch-size", content=Content(large))
+        msg = queue.get(timeout=1)
+        self.assertEqual(large, msg.content.body)
+
+    def test_get(self):
+        """
+        Test basic_get method
+        """
+        channel = self.channel
+        channel.queue_declare(queue="test-get", exclusive=True)
+        
+        #publish some messages (no_ack=True)
+        for i in range(1, 11):
+            channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+
+        #use basic_get to read back the messages, and check that we get an empty at the end
+        for i in range(1, 11):
+            reply = channel.basic_get(no_ack=True)
+            self.assertEqual(reply.method.klass.name, "basic")
+            self.assertEqual(reply.method.name, "get_ok")
+            self.assertEqual("Message %d" % i, reply.content.body)
+
+        reply = channel.basic_get(no_ack=True)
+        self.assertEqual(reply.method.klass.name, "basic")
+        self.assertEqual(reply.method.name, "get_empty")
+
+        #repeat for no_ack=False
+        for i in range(11, 21):
+            channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+
+        for i in range(11, 21):
+            reply = channel.basic_get(no_ack=False)
+            self.assertEqual(reply.method.klass.name, "basic")
+            self.assertEqual(reply.method.name, "get_ok")
+            self.assertEqual("Message %d" % i, reply.content.body)
+            if(i == 13):
+                channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True)
+            if(i in [15, 17, 19]):
+                channel.basic_ack(delivery_tag=reply.delivery_tag)
+
+        reply = channel.basic_get(no_ack=True)
+        self.assertEqual(reply.method.klass.name, "basic")
+        self.assertEqual(reply.method.name, "get_empty")
+
+        #recover(requeue=True)
+        channel.basic_recover(requeue=True)
+        
+        #get the unacked messages again (14, 16, 18, 20)
+        for i in [14, 16, 18, 20]:
+            reply = channel.basic_get(no_ack=False)
+            self.assertEqual(reply.method.klass.name, "basic")
+            self.assertEqual(reply.method.name, "get_ok")
+            self.assertEqual("Message %d" % i, reply.content.body)
+            channel.basic_ack(delivery_tag=reply.delivery_tag)
+
+        reply = channel.basic_get(no_ack=True)
+        self.assertEqual(reply.method.klass.name, "basic")
+        self.assertEqual(reply.method.name, "get_empty")
+
+        channel.basic_recover(requeue=True)
+
+        reply = channel.basic_get(no_ack=True)
+        self.assertEqual(reply.method.klass.name, "basic")
+        self.assertEqual(reply.method.name, "get_empty")

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class BrokerTests(TestBase):
+    """Tests for basic Broker functionality"""
+
+    def test_ack_and_no_ack(self):
+        """
+        First, this test tries to receive a message with a no-ack
+        consumer. Second, this test tries to explicitly receive and
+        acknowledge a message with an acknowledging consumer.
+        """
+        ch = self.channel
+        self.queue_declare(ch, queue = "myqueue")
+
+        # No ack consumer
+        ctag = ch.basic_consume(queue = "myqueue", no_ack = True).consumer_tag
+        body = "test no-ack"
+        ch.basic_publish(routing_key = "myqueue", content = Content(body))
+        msg = self.client.queue(ctag).get(timeout = 5)
+        self.assert_(msg.content.body == body)
+
+        # Acknowledging consumer
+        self.queue_declare(ch, queue = "otherqueue")
+        ctag = ch.basic_consume(queue = "otherqueue", no_ack = False).consumer_tag
+        body = "test ack"
+        ch.basic_publish(routing_key = "otherqueue", content = Content(body))
+        msg = self.client.queue(ctag).get(timeout = 5)
+        ch.basic_ack(delivery_tag = msg.delivery_tag)
+        self.assert_(msg.content.body == body)
+        
+    def test_basic_delivery_immediate(self):
+        """
+        Test basic message delivery where consume is issued before publish
+        """
+        channel = self.channel
+        self.exchange_declare(channel, exchange="test-exchange", type="direct")
+        self.queue_declare(channel, queue="test-queue") 
+        channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+        reply = channel.basic_consume(queue="test-queue", no_ack=True)
+        queue = self.client.queue(reply.consumer_tag)
+
+        body = "Immediate Delivery"
+        channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body), immediate=True)
+        msg = queue.get(timeout=5)
+        self.assert_(msg.content.body == body)
+
+        # TODO: Ensure we fail if immediate=True and there's no consumer.
+
+
+    def test_basic_delivery_queued(self):
+        """
+        Test basic message delivery where publish is issued before consume
+        (i.e. requires queueing of the message)
+        """
+        channel = self.channel
+        self.exchange_declare(channel, exchange="test-exchange", type="direct")
+        self.queue_declare(channel, queue="test-queue")
+        channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+        body = "Queued Delivery"
+        channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body))
+        reply = channel.basic_consume(queue="test-queue", no_ack=True)
+        queue = self.client.queue(reply.consumer_tag)
+        msg = queue.get(timeout=5)
+        self.assert_(msg.content.body == body)
+
+    def test_invalid_channel(self):
+        channel = self.client.channel(200)
+        try:
+            channel.queue_declare(exclusive=True)
+            self.fail("Expected error on queue_declare for invalid channel")
+        except Closed, e:
+            self.assertConnectionException(504, e.args[0])
+        
+    def test_closed_channel(self):
+        channel = self.client.channel(200)
+        channel.channel_open()
+        channel.channel_close()
+        try:
+            channel.queue_declare(exclusive=True)
+            self.fail("Expected error on queue_declare for closed channel")
+        except Closed, e:
+            self.assertConnectionException(504, e.args[0])
+
+    def test_channel_flow(self):
+        channel = self.channel
+        channel.queue_declare(queue="flow_test_queue", exclusive=True)
+        ctag = channel.basic_consume(queue="flow_test_queue", no_ack=True).consumer_tag
+        incoming = self.client.queue(ctag)
+        
+        channel.channel_flow(active=False)        
+        channel.basic_publish(routing_key="flow_test_queue", content=Content("abcdefghijklmnopqrstuvwxyz"))
+        try:
+            incoming.get(timeout=1) 
+            self.fail("Received message when flow turned off.")
+        except Empty: None
+        
+        channel.channel_flow(active=True)
+        msg = incoming.get(timeout=1)
+        self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body)

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/example.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/example.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/example.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/example.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class ExampleTest (TestBase):
+    """
+    An example Qpid test, illustrating the unittest frameowkr and the
+    python Qpid client. The test class must inherit TestCase.  The
+    test code uses the Qpid client to interact with a qpid broker and
+    verify it behaves as expected.
+    """ 
+
+    def test_example(self):
+        """
+        An example test. Note that test functions must start with 'test_'
+        to be recognized by the test framework.
+        """
+
+        # By inheriting TestBase, self.client is automatically connected
+        # and self.channel is automatically opened as channel(1)
+        # Other channel methods mimic the protocol.
+        channel = self.channel
+
+        # Now we can send regular commands. If you want to see what the method
+        # arguments mean or what other commands are available, you can use the
+        # python builtin help() method. For example:
+        #help(chan)
+        #help(chan.exchange_declare)
+
+        # If you want browse the available protocol methods without being
+        # connected to a live server you can use the amqp-doc utility:
+        #
+        #   Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>]
+        #
+        #   Options:
+        #       -e, --regexp    use regex instead of glob when matching
+
+        # Now that we know what commands are available we can use them to
+        # interact with the server.
+
+        # Here we use ordinal arguments.
+        self.exchange_declare(channel, 0, "test", "direct")
+
+        # Here we use keyword arguments.
+        self.queue_declare(channel, queue="test-queue")
+        channel.queue_bind(queue="test-queue", exchange="test", routing_key="key")
+
+        # Call Channel.basic_consume to register as a consumer.
+        # All the protocol methods return a message object. The message object
+        # has fields corresponding to the reply method fields, plus a content
+        # field that is filled if the reply includes content. In this case the
+        # interesting field is the consumer_tag.
+        reply = channel.basic_consume(queue="test-queue")
+
+        # We can use the Client.queue(...) method to access the queue
+        # corresponding to our consumer_tag.
+        queue = self.client.queue(reply.consumer_tag)
+
+        # Now lets publish a message and see if our consumer gets it. To do
+        # this we need to import the Content class.
+        body = "Hello World!"
+        channel.basic_publish(exchange="test",
+                              routing_key="key",
+                              content=Content(body))
+
+        # Now we'll wait for the message to arrive. We can use the timeout
+        # argument in case the server hangs. By default queue.get() will wait
+        # until a message arrives or the connection to the server dies.
+        msg = queue.get(timeout=10)
+
+        # And check that we got the right response with assertEqual
+        self.assertEqual(body, msg.content.body)
+
+        # Now acknowledge the message.
+        channel.basic_ack(msg.delivery_tag, True)
+

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/example.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,327 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Tests for exchange behaviour.
+
+Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
+"""
+
+import Queue, logging
+from qpid.testlib import TestBase
+from qpid.content import Content
+from qpid.client import Closed
+
+
+class StandardExchangeVerifier:
+    """Verifies standard exchange behavior.
+
+    Used as base class for classes that test standard exchanges."""
+
+    def verifyDirectExchange(self, ex):
+        """Verify that ex behaves like a direct exchange."""
+        self.queue_declare(queue="q")
+        self.channel.queue_bind(queue="q", exchange=ex, routing_key="k")
+        self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
+        try:
+            self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
+            self.fail("Expected Empty exception")
+        except Queue.Empty: None # Expected
+
+    def verifyFanOutExchange(self, ex):
+        """Verify that ex behaves like a fanout exchange."""
+        self.queue_declare(queue="q") 
+        self.channel.queue_bind(queue="q", exchange=ex)
+        self.queue_declare(queue="p") 
+        self.channel.queue_bind(queue="p", exchange=ex)
+        for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
+
+    def verifyTopicExchange(self, ex):
+        """Verify that ex behaves like a topic exchange"""
+        self.queue_declare(queue="a")
+        self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*")
+        q = self.consume("a")
+        self.assertPublishGet(q, ex, "a.b.x")
+        self.assertPublishGet(q, ex, "a.x.b.x")
+        self.assertPublishGet(q, ex, "a.x.x.b.x")
+        # Shouldn't match
+        self.channel.basic_publish(exchange=ex, routing_key="a.b")        
+        self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y")        
+        self.channel.basic_publish(exchange=ex, routing_key="x.a.b.x")        
+        self.channel.basic_publish(exchange=ex, routing_key="a.b")
+        self.assert_(q.empty())
+
+    def verifyHeadersExchange(self, ex):
+        """Verify that ex is a headers exchange"""
+        self.queue_declare(queue="q")
+        self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
+        q = self.consume("q")
+        headers = {"name":"fred", "age":3}
+        self.assertPublishGet(q, exchange=ex, properties={'headers':headers})
+        self.channel.basic_publish(exchange=ex) # No headers, won't deliver
+        self.assertEmpty(q);                 
+        
+
+class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
+    """
+    The server SHOULD implement these standard exchange types: topic, headers.
+    
+    Client attempts to declare an exchange with each of these standard types.
+    """
+
+    def testDirect(self):
+        """Declare and test a direct exchange"""
+        self.exchange_declare(0, exchange="d", type="direct")
+        self.verifyDirectExchange("d")
+
+    def testFanout(self):
+        """Declare and test a fanout exchange"""
+        self.exchange_declare(0, exchange="f", type="fanout")
+        self.verifyFanOutExchange("f")
+
+    def testTopic(self):
+        """Declare and test a topic exchange"""
+        self.exchange_declare(0, exchange="t", type="topic")
+        self.verifyTopicExchange("t")
+
+    def testHeaders(self):
+        """Declare and test a headers exchange"""
+        self.exchange_declare(0, exchange="h", type="headers")
+        self.verifyHeadersExchange("h")
+        
+
+class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
+    """
+    The server MUST, in each virtual host, pre-declare an exchange instance
+    for each standard exchange type that it implements, where the name of the
+    exchange instance is amq. followed by the exchange type name.
+    
+    Client creates a temporary queue and attempts to bind to each required
+    exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if
+    those types are defined).
+    """
+    def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
+
+    def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
+
+    def testAmqTopic(self):  self.verifyTopicExchange("amq.topic")
+        
+    def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
+
+class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
+    """
+    The server MUST predeclare a direct exchange to act as the default exchange
+    for content Publish methods and for default queue bindings.
+    
+    Client checks that the default exchange is active by specifying a queue
+    binding with no exchange name, and publishing a message with a suitable
+    routing key but without specifying the exchange name, then ensuring that
+    the message arrives in the queue correctly.
+    """
+    def testDefaultExchange(self):
+        # Test automatic binding by queue name.
+        self.queue_declare(queue="d")
+        self.assertPublishConsume(queue="d", routing_key="d")
+        # Test explicit bind to default queue
+        self.verifyDirectExchange("")
+
+
+# TODO aconway 2006-09-27: Fill in empty tests:
+
+class DefaultAccessRuleTests(TestBase):
+    """
+    The server MUST NOT allow clients to access the default exchange except
+    by specifying an empty exchange name in the Queue.Bind and content Publish
+    methods.
+    """
+
+class ExtensionsRuleTests(TestBase):
+    """
+    The server MAY implement other exchange types as wanted.
+    """
+
+
+class DeclareMethodMinimumRuleTests(TestBase):
+    """
+    The server SHOULD support a minimum of 16 exchanges per virtual host and
+    ideally, impose no limit except as defined by available resources.
+    
+    The client creates as many exchanges as it can until the server reports
+    an error; the number of exchanges successfuly created must be at least
+    sixteen.
+    """
+
+
+class DeclareMethodTicketFieldValidityRuleTests(TestBase):
+    """
+    The client MUST provide a valid access ticket giving "active" access to
+    the realm in which the exchange exists or will be created, or "passive"
+    access if the if-exists flag is set.
+    
+    Client creates access ticket with wrong access rights and attempts to use
+    in this method.
+    """
+
+
+class DeclareMethodExchangeFieldReservedRuleTests(TestBase):
+    """
+    Exchange names starting with "amq." are reserved for predeclared and
+    standardised exchanges. The client MUST NOT attempt to create an exchange
+    starting with "amq.".
+    
+    
+    """
+
+
+class DeclareMethodTypeFieldTypedRuleTests(TestBase):
+    """
+    Exchanges cannot be redeclared with different types.  The client MUST not
+    attempt to redeclare an existing exchange with a different type than used
+    in the original Exchange.Declare method.
+    
+    
+    """
+
+
+class DeclareMethodTypeFieldSupportRuleTests(TestBase):
+    """
+    The client MUST NOT attempt to create an exchange with a type that the
+    server does not support.
+    
+    
+    """
+
+
+class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase):
+    """
+    If set, and the exchange does not already exist, the server MUST raise a
+    channel exception with reply code 404 (not found).    
+    """
+    def test(self):
+        try:
+            self.channel.exchange_declare(exchange="humpty_dumpty", passive=True)
+            self.fail("Expected 404 for passive declaration of unknown exchange.")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+
+class DeclareMethodDurableFieldSupportRuleTests(TestBase):
+    """
+    The server MUST support both durable and transient exchanges.
+    
+    
+    """
+
+
+class DeclareMethodDurableFieldStickyRuleTests(TestBase):
+    """
+    The server MUST ignore the durable field if the exchange already exists.
+    
+    
+    """
+
+
+class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase):
+    """
+    The server MUST ignore the auto-delete field if the exchange already
+    exists.
+    
+    
+    """
+
+
+class DeleteMethodTicketFieldValidityRuleTests(TestBase):
+    """
+    The client MUST provide a valid access ticket giving "active" access
+    rights to the exchange's access realm.
+    
+    Client creates access ticket with wrong access rights and attempts to use
+    in this method.
+    """
+
+
+class DeleteMethodExchangeFieldExistsRuleTests(TestBase):
+    """
+    The client MUST NOT attempt to delete an exchange that does not exist.
+    """
+
+
+class HeadersExchangeTests(TestBase):
+    """
+    Tests for headers exchange functionality.
+    """
+    def setUp(self):
+        TestBase.setUp(self)
+        self.queue_declare(queue="q")
+        self.q = self.consume("q")
+
+    def myAssertPublishGet(self, headers):
+        self.assertPublishGet(self.q, exchange="amq.match", properties={'headers':headers})
+
+    def myBasicPublish(self, headers):
+        self.channel.basic_publish(exchange="amq.match", content=Content("foobar", properties={'headers':headers}))
+        
+    def testMatchAll(self):
+        self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
+        self.myAssertPublishGet({"name":"fred", "age":3})
+        self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
+        
+        # None of these should match
+        self.myBasicPublish({})
+        self.myBasicPublish({"name":"barney"})
+        self.myBasicPublish({"name":10})
+        self.myBasicPublish({"name":"fred", "age":2})
+        self.assertEmpty(self.q)
+
+    def testMatchAny(self):
+        self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
+        self.myAssertPublishGet({"name":"fred"})
+        self.myAssertPublishGet({"name":"fred", "ignoreme":10})
+        self.myAssertPublishGet({"ignoreme":10, "age":3})
+
+        # Wont match
+        self.myBasicPublish({})
+        self.myBasicPublish({"irrelevant":0})
+        self.assertEmpty(self.q)
+
+
+class MiscellaneousErrorsTests(TestBase):
+    """
+    Test some miscellaneous error conditions
+    """
+    def testTypeNotKnown(self):
+        try:
+            self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
+            self.fail("Expected 503 for declaration of unknown exchange type.")
+        except Closed, e:
+            self.assertConnectionException(503, e.args[0])
+
+    def testDifferentDeclaredType(self):
+        self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
+        try:
+            self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
+            self.fail("Expected 530 for redeclaration of exchange with different type.")
+        except Closed, e:
+            self.assertConnectionException(530, e.args[0])
+        #cleanup    
+        other = self.connect()
+        c2 = other.channel(1)
+        c2.channel_open()
+        c2.exchange_delete(exchange="test_different_declared_type_exchange")
+    

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,255 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class QueueTests(TestBase):
+    """Tests for 'methods' on the amqp queue 'class'"""
+
+    def test_purge(self):
+        """
+        Test that the purge method removes messages from the queue
+        """
+        channel = self.channel
+        #setup, declare a queue and add some messages to it:
+        channel.exchange_declare(exchange="test-exchange", type="direct")
+        channel.queue_declare(queue="test-queue", exclusive=True)
+        channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+        channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("one"))
+        channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("two"))
+        channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("three"))
+
+        #check that the queue now reports 3 messages:
+        reply = channel.queue_declare(queue="test-queue")
+        self.assertEqual(3, reply.message_count)
+
+        #now do the purge, then test that three messages are purged and the count drops to 0
+        reply = channel.queue_purge(queue="test-queue");
+        self.assertEqual(3, reply.message_count)        
+        reply = channel.queue_declare(queue="test-queue")
+        self.assertEqual(0, reply.message_count)
+
+        #send a further message and consume it, ensuring that the other messages are really gone
+        channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("four"))
+        reply = channel.basic_consume(queue="test-queue", no_ack=True)
+        queue = self.client.queue(reply.consumer_tag)
+        msg = queue.get(timeout=1)
+        self.assertEqual("four", msg.content.body)
+
+        #check error conditions (use new channels): 
+        channel = self.client.channel(2)
+        channel.channel_open()
+        try:
+            #queue specified but doesn't exist:
+            channel.queue_purge(queue="invalid-queue")
+            self.fail("Expected failure when purging non-existent queue")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+        channel = self.client.channel(3)
+        channel.channel_open()
+        try:
+            #queue not specified and none previously declared for channel:
+            channel.queue_purge()
+            self.fail("Expected failure when purging unspecified queue")
+        except Closed, e:
+            self.assertConnectionException(530, e.args[0])
+
+        #cleanup    
+        other = self.connect()
+        channel = other.channel(1)
+        channel.channel_open()
+        channel.exchange_delete(exchange="test-exchange")
+
+    def test_declare_exclusive(self):
+        """
+        Test that the exclusive field is honoured in queue.declare
+        """
+        # TestBase.setUp has already opened channel(1)
+        c1 = self.channel
+        # Here we open a second separate connection:
+        other = self.connect()
+        c2 = other.channel(1)
+        c2.channel_open()
+
+        #declare an exclusive queue:
+        c1.queue_declare(queue="exclusive-queue", exclusive="True")
+        try:
+            #other connection should not be allowed to declare this:
+            c2.queue_declare(queue="exclusive-queue", exclusive="True")
+            self.fail("Expected second exclusive queue_declare to raise a channel exception")
+        except Closed, e:
+            self.assertChannelException(405, e.args[0])
+
+
+    def test_declare_passive(self):
+        """
+        Test that the passive field is honoured in queue.declare
+        """
+        channel = self.channel
+        #declare an exclusive queue:
+        channel.queue_declare(queue="passive-queue-1", exclusive="True")
+        channel.queue_declare(queue="passive-queue-1", passive="True")
+        try:
+            #other connection should not be allowed to declare this:
+            channel.queue_declare(queue="passive-queue-2", passive="True")
+            self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+
+    def test_bind(self):
+        """
+        Test various permutations of the queue.bind method
+        """
+        channel = self.channel
+        channel.queue_declare(queue="queue-1", exclusive="True")
+
+        #straightforward case, both exchange & queue exist so no errors expected:
+        channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1")
+
+        #bind the default queue for the channel (i.e. last one declared):
+        channel.queue_bind(exchange="amq.direct", routing_key="key2")
+
+        #use the queue name where neither routing key nor queue are specified:
+        channel.queue_bind(exchange="amq.direct")
+
+        #try and bind to non-existant exchange
+        try:
+            channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1")
+            self.fail("Expected bind to non-existant exchange to fail")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+        #need to reopen a channel:    
+        channel = self.client.channel(2)
+        channel.channel_open()
+
+        #try and bind non-existant queue:
+        try:
+            channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1")
+            self.fail("Expected bind of non-existant queue to fail")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+
+    def test_delete_simple(self):
+        """
+        Test basic queue deletion
+        """
+        channel = self.channel
+
+        #straight-forward case:
+        channel.queue_declare(queue="delete-me")
+        channel.basic_publish(routing_key="delete-me", content=Content("a"))
+        channel.basic_publish(routing_key="delete-me", content=Content("b"))
+        channel.basic_publish(routing_key="delete-me", content=Content("c"))        
+        reply = channel.queue_delete(queue="delete-me")
+        self.assertEqual(3, reply.message_count)
+        #check that it has gone be declaring passively
+        try:
+            channel.queue_declare(queue="delete-me", passive="True")
+            self.fail("Queue has not been deleted")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+        #check attempted deletion of non-existant queue is handled correctly:    
+        channel = self.client.channel(2)
+        channel.channel_open()
+        try:
+            channel.queue_delete(queue="i-dont-exist", if_empty="True")
+            self.fail("Expected delete of non-existant queue to fail")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+        
+
+    def test_delete_ifempty(self):
+        """
+        Test that if_empty field of queue_delete is honoured
+        """
+        channel = self.channel
+
+        #create a queue and add a message to it (use default binding):
+        channel.queue_declare(queue="delete-me-2")
+        channel.queue_declare(queue="delete-me-2", passive="True")
+        channel.basic_publish(routing_key="delete-me-2", content=Content("message"))
+
+        #try to delete, but only if empty:
+        try:
+            channel.queue_delete(queue="delete-me-2", if_empty="True")
+            self.fail("Expected delete if_empty to fail for non-empty queue")
+        except Closed, e:
+            self.assertChannelException(406, e.args[0])
+
+        #need new channel now:    
+        channel = self.client.channel(2)
+        channel.channel_open()
+
+        #empty queue:
+        reply = channel.basic_consume(queue="delete-me-2", no_ack=True)
+        queue = self.client.queue(reply.consumer_tag)
+        msg = queue.get(timeout=1)
+        self.assertEqual("message", msg.content.body)
+        channel.basic_cancel(consumer_tag=reply.consumer_tag)
+
+        #retry deletion on empty queue:
+        channel.queue_delete(queue="delete-me-2", if_empty="True")
+
+        #check that it has gone by declaring passively:
+        try:
+            channel.queue_declare(queue="delete-me-2", passive="True")
+            self.fail("Queue has not been deleted")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+        
+    def test_delete_ifunused(self):
+        """
+        Test that if_unused field of queue_delete is honoured
+        """
+        channel = self.channel
+
+        #create a queue and register a consumer:
+        channel.queue_declare(queue="delete-me-3")
+        channel.queue_declare(queue="delete-me-3", passive="True")
+        reply = channel.basic_consume(queue="delete-me-3", no_ack=True)
+
+        #need new channel now:    
+        channel2 = self.client.channel(2)
+        channel2.channel_open()
+        #try to delete, but only if empty:
+        try:
+            channel2.queue_delete(queue="delete-me-3", if_unused="True")
+            self.fail("Expected delete if_unused to fail for queue with existing consumer")
+        except Closed, e:
+            self.assertChannelException(406, e.args[0])
+
+
+        channel.basic_cancel(consumer_tag=reply.consumer_tag)    
+        channel.queue_delete(queue="delete-me-3", if_unused="True")
+        #check that it has gone by declaring passively:
+        try:
+            channel.queue_declare(queue="delete-me-3", passive="True")
+            self.fail("Queue has not been deleted")
+        except Closed, e:
+            self.assertChannelException(404, e.args[0])
+
+

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Tests for the testlib itself.
+# 
+
+from qpid.content import Content
+from qpid.testlib import TestBase
+from Queue import Empty
+
+import sys
+from traceback import *
+
+def mytrace(frame, event, arg):
+    print_stack(frame);
+    print "===="
+    return mytrace
+    
+class TestBaseTest(TestBase):
+    """Verify TestBase functions work as expected""" 
+
+    def testAssertEmptyPass(self):
+        """Test assert empty works"""
+        self.queue_declare(queue="empty")
+        q = self.consume("empty")
+        self.assertEmpty(q)
+        try:
+            q.get(timeout=1)
+            self.fail("Queue is not empty.")
+        except Empty: None              # Ignore
+
+    def testAssertEmptyFail(self):
+        self.queue_declare(queue="full")
+        q = self.consume("full")
+        self.channel.basic_publish(routing_key="full")
+        try:
+            self.assertEmpty(q);
+            self.fail("assertEmpty did not assert on non-empty queue")
+        except AssertionError: None     # Ignore
+
+    def testMessageProperties(self):
+        """Verify properties are passed with message"""
+        props={"headers":{"x":1, "y":2}}
+        self.queue_declare(queue="q")
+        q = self.consume("q")
+        self.assertPublishGet(q, routing_key="q", properties=props)
+
+
+

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org