You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/05 16:08:46 UTC
svn commit: r906961 [3/4] - in /qpid/trunk/qpid: cpp/src/tests/ python/
python/tests_0-10/ python/tests_0-8/ python/tests_0-9/ tests/ tests/src/
tests/src/py/ tests/src/py/qpid_tests/ tests/src/py/qpid_tests/broker_0_10/
tests/src/py/qpid_tests/broker_...
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,366 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message
+from qpid.session import SessionException
+
+class QueueTests(TestBase010):
+ """Tests for 'methods' on the amqp queue 'class'"""
+
+ def test_purge(self):
+ """
+ Test that the purge method removes messages from the queue
+ """
+ session = self.session
+ #setup, declare a queue and add some messages to it:
+ session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "one"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "two"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "three"))
+
+ #check that the queue now reports 3 messages:
+ session.queue_declare(queue="test-queue")
+ reply = session.queue_query(queue="test-queue")
+ self.assertEqual(3, reply.message_count)
+
+ #now do the purge, then test that three messages are purged and the count drops to 0
+ session.queue_purge(queue="test-queue");
+ reply = session.queue_query(queue="test-queue")
+ self.assertEqual(0, reply.message_count)
+
+ #send a further message and consume it, ensuring that the other messages are really gone
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "four"))
+ session.message_subscribe(queue="test-queue", destination="tag")
+ session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session.incoming("tag")
+ msg = queue.get(timeout=1)
+ self.assertEqual("four", msg.body)
+
+ def test_purge_queue_exists(self):
+ """
+ Test that the correct exception is thrown is no queue exists
+ for the name specified in purge
+ """
+ session = self.session
+ try:
+ #queue specified but doesn't exist:
+ session.queue_purge(queue="invalid-queue")
+ self.fail("Expected failure when purging non-existent queue")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code) #not-found
+
+ def test_purge_empty_name(self):
+ """
+ Test that the correct exception is thrown is no queue name
+ is specified for purge
+ """
+ session = self.session
+ try:
+ #queue not specified and none previously declared for channel:
+ session.queue_purge()
+ self.fail("Expected failure when purging unspecified queue")
+ except SessionException, e:
+ self.assertEquals(531, e.args[0].error_code) #illegal-argument
+
+ def test_declare_exclusive(self):
+ """
+ Test that the exclusive field is honoured in queue.declare
+ """
+ # TestBase.setUp has already opened session(1)
+ s1 = self.session
+ # Here we open a second separate connection:
+ s2 = self.conn.session("other")
+
+ #declare an exclusive queue:
+ s1.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
+ try:
+ #other connection should not be allowed to declare this:
+ s2.queue_declare(queue="exclusive-queue", exclusive=True, auto_delete=True)
+ self.fail("Expected second exclusive queue_declare to raise a channel exception")
+ except SessionException, e:
+ self.assertEquals(405, e.args[0].error_code)
+
+ s3 = self.conn.session("subscriber")
+ try:
+ #other connection should not be allowed to declare this:
+ s3.message_subscribe(queue="exclusive-queue")
+ self.fail("Expected message_subscribe on an exclusive queue to raise a channel exception")
+ except SessionException, e:
+ self.assertEquals(405, e.args[0].error_code)
+
+ s4 = self.conn.session("deleter")
+ try:
+ #other connection should not be allowed to declare this:
+ s4.queue_delete(queue="exclusive-queue")
+ self.fail("Expected queue_delete on an exclusive queue to raise a channel exception")
+ except SessionException, e:
+ self.assertEquals(405, e.args[0].error_code)
+
+
+ def test_declare_passive(self):
+ """
+ Test that the passive field is honoured in queue.declare
+ """
+ session = self.session
+ #declare an exclusive queue:
+ session.queue_declare(queue="passive-queue-1", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="passive-queue-1", passive=True)
+ try:
+ #other connection should not be allowed to declare this:
+ session.queue_declare(queue="passive-queue-2", passive=True)
+ self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code) #not-found
+
+
+ def test_bind(self):
+ """
+ Test various permutations of the queue.bind method
+ """
+ session = self.session
+ session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
+
+ #straightforward case, both exchange & queue exist so no errors expected:
+ session.exchange_bind(queue="queue-1", exchange="amq.direct", binding_key="key1")
+
+ #use the queue name where the routing key is not specified:
+ session.exchange_bind(queue="queue-1", exchange="amq.direct")
+
+ #try and bind to non-existant exchange
+ try:
+ session.exchange_bind(queue="queue-1", exchange="an-invalid-exchange", binding_key="key1")
+ self.fail("Expected bind to non-existant exchange to fail")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+
+ def test_bind_queue_existence(self):
+ session = self.session
+ #try and bind non-existant queue:
+ try:
+ session.exchange_bind(queue="queue-2", exchange="amq.direct", binding_key="key1")
+ self.fail("Expected bind of non-existant queue to fail")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+ def test_unbind_direct(self):
+ self.unbind_test(exchange="amq.direct", routing_key="key")
+
+ def test_unbind_topic(self):
+ self.unbind_test(exchange="amq.topic", routing_key="key")
+
+ def test_unbind_fanout(self):
+ self.unbind_test(exchange="amq.fanout")
+
+ def test_unbind_headers(self):
+ self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"})
+
+ def unbind_test(self, exchange, routing_key="", args=None, headers=None):
+ #bind two queues and consume from them
+ session = self.session
+
+ session.queue_declare(queue="queue-1", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="queue-2", exclusive=True, auto_delete=True)
+
+ session.message_subscribe(queue="queue-1", destination="queue-1")
+ session.message_flow(destination="queue-1", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="queue-1", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ session.message_subscribe(queue="queue-2", destination="queue-2")
+ session.message_flow(destination="queue-2", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="queue-2", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+
+ queue1 = session.incoming("queue-1")
+ queue2 = session.incoming("queue-2")
+
+ session.exchange_bind(exchange=exchange, queue="queue-1", binding_key=routing_key, arguments=args)
+ session.exchange_bind(exchange=exchange, queue="queue-2", binding_key=routing_key, arguments=args)
+
+ dp = session.delivery_properties(routing_key=routing_key)
+ if (headers):
+ mp = session.message_properties(application_headers=headers)
+ msg1 = Message(dp, mp, "one")
+ msg2 = Message(dp, mp, "two")
+ else:
+ msg1 = Message(dp, "one")
+ msg2 = Message(dp, "two")
+
+ #send a message that will match both bindings
+ session.message_transfer(destination=exchange, message=msg1)
+
+ #unbind first queue
+ session.exchange_unbind(exchange=exchange, queue="queue-1", binding_key=routing_key)
+
+ #send another message
+ session.message_transfer(destination=exchange, message=msg2)
+
+ #check one queue has both messages and the other has only one
+ self.assertEquals("one", queue1.get(timeout=1).body)
+ try:
+ msg = queue1.get(timeout=1)
+ self.fail("Got extra message: %s" % msg.body)
+ except Empty: pass
+
+ self.assertEquals("one", queue2.get(timeout=1).body)
+ self.assertEquals("two", queue2.get(timeout=1).body)
+ try:
+ msg = queue2.get(timeout=1)
+ self.fail("Got extra message: " + msg)
+ except Empty: pass
+
+
+ def test_delete_simple(self):
+ """
+ Test core queue deletion behaviour
+ """
+ session = self.session
+
+ #straight-forward case:
+ session.queue_declare(queue="delete-me")
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "a"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "b"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me"), "c"))
+ session.queue_delete(queue="delete-me")
+ #check that it has gone by declaring passively
+ try:
+ session.queue_declare(queue="delete-me", passive=True)
+ self.fail("Queue has not been deleted")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+ def test_delete_queue_exists(self):
+ """
+ Test core queue deletion behaviour
+ """
+ #check attempted deletion of non-existant queue is handled correctly:
+ session = self.session
+ try:
+ session.queue_delete(queue="i-dont-exist", if_empty=True)
+ self.fail("Expected delete of non-existant queue to fail")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+
+
+ def test_delete_ifempty(self):
+ """
+ Test that if_empty field of queue_delete is honoured
+ """
+ session = self.session
+
+ #create a queue and add a message to it (use default binding):
+ session.queue_declare(queue="delete-me-2")
+ session.queue_declare(queue="delete-me-2", passive=True)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="delete-me-2"), "message"))
+
+ #try to delete, but only if empty:
+ try:
+ session.queue_delete(queue="delete-me-2", if_empty=True)
+ self.fail("Expected delete if_empty to fail for non-empty queue")
+ except SessionException, e:
+ self.assertEquals(406, e.args[0].error_code)
+
+ #need new session now:
+ session = self.conn.session("replacement", 2)
+
+ #empty queue:
+ session.message_subscribe(destination="consumer_tag", queue="delete-me-2")
+ session.message_flow(destination="consumer_tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="consumer_tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session.incoming("consumer_tag")
+ msg = queue.get(timeout=1)
+ self.assertEqual("message", msg.body)
+ session.message_cancel(destination="consumer_tag")
+
+ #retry deletion on empty queue:
+ session.queue_delete(queue="delete-me-2", if_empty=True)
+
+ #check that it has gone by declaring passively:
+ try:
+ session.queue_declare(queue="delete-me-2", passive=True)
+ self.fail("Queue has not been deleted")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+ def test_delete_ifunused(self):
+ """
+ Test that if_unused field of queue_delete is honoured
+ """
+ session = self.session
+
+ #create a queue and register a consumer:
+ session.queue_declare(queue="delete-me-3")
+ session.queue_declare(queue="delete-me-3", passive=True)
+ session.message_subscribe(destination="consumer_tag", queue="delete-me-3")
+
+ #need new session now:
+ session2 = self.conn.session("replacement", 2)
+
+ #try to delete, but only if empty:
+ try:
+ session2.queue_delete(queue="delete-me-3", if_unused=True)
+ self.fail("Expected delete if_unused to fail for queue with existing consumer")
+ except SessionException, e:
+ self.assertEquals(406, e.args[0].error_code)
+
+ session.message_cancel(destination="consumer_tag")
+ session.queue_delete(queue="delete-me-3", if_unused=True)
+ #check that it has gone by declaring passively:
+ try:
+ session.queue_declare(queue="delete-me-3", passive=True)
+ self.fail("Queue has not been deleted")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+
+ def test_autodelete_shared(self):
+ """
+ Test auto-deletion (of non-exclusive queues)
+ """
+ session = self.session
+ session2 =self.conn.session("other", 1)
+
+ session.queue_declare(queue="auto-delete-me", auto_delete=True)
+
+ #consume from both sessions
+ tag = "my-tag"
+ session.message_subscribe(queue="auto-delete-me", destination=tag)
+ session2.message_subscribe(queue="auto-delete-me", destination=tag)
+
+ #implicit cancel
+ session2.close()
+
+ #check it is still there
+ session.queue_declare(queue="auto-delete-me", passive=True)
+
+ #explicit cancel => queue is now unused again:
+ session.message_cancel(destination=tag)
+
+ #NOTE: this assumes there is no timeout in use
+
+ #check that it has gone by declaring it passively
+ try:
+ session.queue_declare(queue="auto-delete-me", passive=True)
+ self.fail("Expected queue to have been deleted")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/queue.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,265 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import TestBase010
+
+class TxTests(TestBase010):
+ """
+ Tests for 'methods' on the amqp tx 'class'
+ """
+
+ def test_commit(self):
+ """
+ Test that commited publishes are delivered and commited acks are not re-delivered
+ """
+ session = self.session
+
+ #declare queues and create subscribers in the checking session
+ #to ensure that the queues are not auto-deleted too early:
+ self.declare_queues(["tx-commit-a", "tx-commit-b", "tx-commit-c"])
+ session.message_subscribe(queue="tx-commit-a", destination="qa")
+ session.message_subscribe(queue="tx-commit-b", destination="qb")
+ session.message_subscribe(queue="tx-commit-c", destination="qc")
+
+ #use a separate session for actual work
+ session2 = self.conn.session("worker", 2)
+ self.perform_txn_work(session2, "tx-commit-a", "tx-commit-b", "tx-commit-c")
+ session2.tx_commit()
+ session2.close()
+
+ session.tx_select()
+
+ self.enable_flow("qa")
+ queue_a = session.incoming("qa")
+
+ self.enable_flow("qb")
+ queue_b = session.incoming("qb")
+
+ self.enable_flow("qc")
+ queue_c = session.incoming("qc")
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("TxMessage %d" % i, msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("TxMessage 6", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("TxMessage 7", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ #cleanup
+ session.tx_commit()
+
+ def test_auto_rollback(self):
+ """
+ Test that a session closed with an open transaction is effectively rolled back
+ """
+ session = self.session
+ self.declare_queues(["tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c"])
+ session.message_subscribe(queue="tx-autorollback-a", destination="qa")
+ session.message_subscribe(queue="tx-autorollback-b", destination="qb")
+ session.message_subscribe(queue="tx-autorollback-c", destination="qc")
+
+ session2 = self.conn.session("worker", 2)
+ queue_a, queue_b, queue_c, ignore = self.perform_txn_work(session2, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ session2.close()
+
+ session.tx_select()
+
+ self.enable_flow("qa")
+ queue_a = session.incoming("qa")
+
+ self.enable_flow("qb")
+ queue_b = session.incoming("qb")
+
+ self.enable_flow("qc")
+ queue_c = session.incoming("qc")
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ #cleanup
+ session.tx_commit()
+
+ def test_rollback(self):
+ """
+ Test that rolled back publishes are not delivered and rolled back acks are re-delivered
+ """
+ session = self.session
+ queue_a, queue_b, queue_c, consumed = self.perform_txn_work(session, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ session.tx_rollback()
+
+ #need to release messages to get them redelivered now:
+ session.message_release(consumed)
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ #cleanup
+ session.tx_commit()
+
+ def perform_txn_work(self, session, name_a, name_b, name_c):
+ """
+ Utility method that does some setup and some work under a transaction. Used for testing both
+ commit and rollback
+ """
+ #setup:
+ self.declare_queues([name_a, name_b, name_c])
+
+ key = "my_key_" + name_b
+ topic = "my_topic_" + name_c
+
+ session.exchange_bind(queue=name_b, exchange="amq.direct", binding_key=key)
+ session.exchange_bind(queue=name_c, exchange="amq.topic", binding_key=topic)
+
+ dp = session.delivery_properties(routing_key=name_a)
+ for i in range(1, 5):
+ mp = session.message_properties(message_id="msg%d" % i)
+ session.message_transfer(message=Message(dp, mp, "Message %d" % i))
+
+ dp = session.delivery_properties(routing_key=key)
+ mp = session.message_properties(message_id="msg6")
+ session.message_transfer(destination="amq.direct", message=Message(dp, mp, "Message 6"))
+
+ dp = session.delivery_properties(routing_key=topic)
+ mp = session.message_properties(message_id="msg7")
+ session.message_transfer(destination="amq.topic", message=Message(dp, mp, "Message 7"))
+
+ session.tx_select()
+
+ #consume and ack messages
+ acked = RangedSet()
+ self.subscribe(session, queue=name_a, destination="sub_a")
+ queue_a = session.incoming("sub_a")
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ acked.add(msg.id)
+ self.assertEqual("Message %d" % i, msg.body)
+
+ self.subscribe(session, queue=name_b, destination="sub_b")
+ queue_b = session.incoming("sub_b")
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.body)
+ acked.add(msg.id)
+
+ sub_c = self.subscribe(session, queue=name_c, destination="sub_c")
+ queue_c = session.incoming("sub_c")
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.body)
+ acked.add(msg.id)
+
+ session.message_accept(acked)
+
+ dp = session.delivery_properties(routing_key=topic)
+ #publish messages
+ for i in range(1, 5):
+ mp = session.message_properties(message_id="tx-msg%d" % i)
+ session.message_transfer(destination="amq.topic", message=Message(dp, mp, "TxMessage %d" % i))
+
+ dp = session.delivery_properties(routing_key=key)
+ mp = session.message_properties(message_id="tx-msg6")
+ session.message_transfer(destination="amq.direct", message=Message(dp, mp, "TxMessage 6"))
+
+ dp = session.delivery_properties(routing_key=name_a)
+ mp = session.message_properties(message_id="tx-msg7")
+ session.message_transfer(message=Message(dp, mp, "TxMessage 7"))
+ return queue_a, queue_b, queue_c, acked
+
+ def declare_queues(self, names, session=None):
+ session = session or self.session
+ for n in names:
+ session.queue_declare(queue=n, auto_delete=True)
+
+ def subscribe(self, session=None, **keys):
+ session = session or self.session
+ consumer_tag = keys["destination"]
+ session.message_subscribe(**keys)
+ session.message_flow(destination=consumer_tag, unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination=consumer_tag, unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+
+ def enable_flow(self, tag, session=None):
+ session = session or self.session
+ session.message_flow(destination=tag, unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination=tag, unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+
+ def complete(self, session, msg):
+ session.receiver._completed.add(msg.id)#TODO: this may be done automatically
+ session.channel.session_completed(session.receiver._completed)
+
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/tx.py
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Fri Feb 5 15:08:44 2010
@@ -0,0 +1 @@
+*.pyc
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,22 @@
+# Do not delete - marks this directory as a python package.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import basic, broker, example, exchange, queue, testlib, tx
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/__init__.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,396 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class BasicTests(TestBase):
+ """Tests for 'methods' on the amqp basic 'class'"""
+
+ def test_consume_no_local(self):
+ """
+ Test that the no_local flag is honoured in the consume method
+ """
+ channel = self.channel
+ #setup, declare two queues:
+ channel.queue_declare(queue="test-queue-1a", exclusive=True)
+ channel.queue_declare(queue="test-queue-1b", exclusive=True)
+ #establish two consumers one of which excludes delivery of locally sent messages
+ channel.basic_consume(consumer_tag="local_included", queue="test-queue-1a")
+ channel.basic_consume(consumer_tag="local_excluded", queue="test-queue-1b", no_local=True)
+
+ #send a message
+ channel.basic_publish(routing_key="test-queue-1a", content=Content("consume_no_local"))
+ channel.basic_publish(routing_key="test-queue-1b", content=Content("consume_no_local"))
+
+ #check the queues of the two consumers
+ excluded = self.client.queue("local_excluded")
+ included = self.client.queue("local_included")
+ msg = included.get(timeout=1)
+ self.assertEqual("consume_no_local", msg.content.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received locally published message though no_local=true")
+ except Empty: None
+
+
+ def test_consume_exclusive(self):
+ """
+ Test that the exclusive flag is honoured in the consume method
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-2", exclusive=True)
+
+ #check that an exclusive consumer prevents other consumer being created:
+ channel.basic_consume(consumer_tag="first", queue="test-queue-2", exclusive=True)
+ try:
+ channel.basic_consume(consumer_tag="second", queue="test-queue-2")
+ self.fail("Expected consume request to fail due to previous exclusive consumer")
+ except Closed, e:
+ self.assertChannelException(403, e.args[0])
+
+ #open new channel and cleanup last consumer:
+ channel = self.client.channel(2)
+ channel.channel_open()
+
+ #check that an exclusive consumer cannot be created if a consumer already exists:
+ channel.basic_consume(consumer_tag="first", queue="test-queue-2")
+ try:
+ channel.basic_consume(consumer_tag="second", queue="test-queue-2", exclusive=True)
+ self.fail("Expected exclusive consume request to fail due to previous consumer")
+ except Closed, e:
+ self.assertChannelException(403, e.args[0])
+
+ def test_consume_queue_errors(self):
+ """
+ Test error conditions associated with the queue field of the consume method:
+ """
+ channel = self.channel
+ try:
+ #queue specified but doesn't exist:
+ channel.basic_consume(queue="invalid-queue")
+ self.fail("Expected failure when consuming from non-existent queue")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ channel = self.client.channel(2)
+ channel.channel_open()
+ try:
+ #queue not specified and none previously declared for channel:
+ channel.basic_consume(queue="")
+ self.fail("Expected failure when consuming from unspecified queue")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
+ def test_consume_unique_consumers(self):
+ """
+ Ensure unique consumer tags are enforced
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-3", exclusive=True)
+
+ #check that attempts to use duplicate tags are detected and prevented:
+ channel.basic_consume(consumer_tag="first", queue="test-queue-3")
+ try:
+ channel.basic_consume(consumer_tag="first", queue="test-queue-3")
+ self.fail("Expected consume request to fail due to non-unique tag")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
+ def test_cancel(self):
+ """
+ Test compliance of the basic.cancel method
+ """
+ channel = self.channel
+ #setup, declare a queue:
+ channel.queue_declare(queue="test-queue-4", exclusive=True)
+ channel.basic_consume(consumer_tag="my-consumer", queue="test-queue-4")
+ channel.basic_publish(routing_key="test-queue-4", content=Content("One"))
+
+ myqueue = self.client.queue("my-consumer")
+ msg = myqueue.get(timeout=1)
+ self.assertEqual("One", msg.content.body)
+
+ #cancel should stop messages being delivered
+ channel.basic_cancel(consumer_tag="my-consumer")
+ channel.basic_publish(routing_key="test-queue-4", content=Content("Two"))
+ try:
+ msg = myqueue.get(timeout=1)
+ self.fail("Got message after cancellation: " + msg)
+ except Empty: None
+
+ #cancellation of non-existant consumers should be handled without error
+ channel.basic_cancel(consumer_tag="my-consumer")
+ channel.basic_cancel(consumer_tag="this-never-existed")
+
+
+ def test_ack(self):
+ """
+ Test basic ack/recover behaviour
+ """
+ channel = self.channel
+ channel.queue_declare(queue="test-ack-queue", exclusive=True)
+
+ reply = channel.basic_consume(queue="test-ack-queue", no_ack=False)
+ queue = self.client.queue(reply.consumer_tag)
+
+ channel.basic_publish(routing_key="test-ack-queue", content=Content("One"))
+ channel.basic_publish(routing_key="test-ack-queue", content=Content("Two"))
+ channel.basic_publish(routing_key="test-ack-queue", content=Content("Three"))
+ channel.basic_publish(routing_key="test-ack-queue", content=Content("Four"))
+ channel.basic_publish(routing_key="test-ack-queue", content=Content("Five"))
+
+ msg1 = queue.get(timeout=1)
+ msg2 = queue.get(timeout=1)
+ msg3 = queue.get(timeout=1)
+ msg4 = queue.get(timeout=1)
+ msg5 = queue.get(timeout=1)
+
+ self.assertEqual("One", msg1.content.body)
+ self.assertEqual("Two", msg2.content.body)
+ self.assertEqual("Three", msg3.content.body)
+ self.assertEqual("Four", msg4.content.body)
+ self.assertEqual("Five", msg5.content.body)
+
+ channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
+ channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
+
+ channel.basic_recover(requeue=False)
+
+ msg3b = queue.get(timeout=1)
+ msg5b = queue.get(timeout=1)
+
+ self.assertEqual("Three", msg3b.content.body)
+ self.assertEqual("Five", msg5b.content.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ def test_recover_requeue(self):
+ """
+ Test requeing on recovery
+ """
+ channel = self.channel
+ channel.queue_declare(queue="test-requeue", exclusive=True)
+
+ subscription = channel.basic_consume(queue="test-requeue", no_ack=False)
+ queue = self.client.queue(subscription.consumer_tag)
+
+ channel.basic_publish(routing_key="test-requeue", content=Content("One"))
+ channel.basic_publish(routing_key="test-requeue", content=Content("Two"))
+ channel.basic_publish(routing_key="test-requeue", content=Content("Three"))
+ channel.basic_publish(routing_key="test-requeue", content=Content("Four"))
+ channel.basic_publish(routing_key="test-requeue", content=Content("Five"))
+
+ msg1 = queue.get(timeout=1)
+ msg2 = queue.get(timeout=1)
+ msg3 = queue.get(timeout=1)
+ msg4 = queue.get(timeout=1)
+ msg5 = queue.get(timeout=1)
+
+ self.assertEqual("One", msg1.content.body)
+ self.assertEqual("Two", msg2.content.body)
+ self.assertEqual("Three", msg3.content.body)
+ self.assertEqual("Four", msg4.content.body)
+ self.assertEqual("Five", msg5.content.body)
+
+ channel.basic_ack(delivery_tag=msg2.delivery_tag, multiple=True) #One & Two
+ channel.basic_ack(delivery_tag=msg4.delivery_tag, multiple=False) #Four
+
+ channel.basic_cancel(consumer_tag=subscription.consumer_tag)
+
+ channel.basic_recover(requeue=True)
+
+ subscription2 = channel.basic_consume(queue="test-requeue")
+ queue2 = self.client.queue(subscription2.consumer_tag)
+
+ msg3b = queue2.get(timeout=1)
+ msg5b = queue2.get(timeout=1)
+
+ self.assertEqual("Three", msg3b.content.body)
+ self.assertEqual("Five", msg5b.content.body)
+
+ self.assertEqual(True, msg3b.redelivered)
+ self.assertEqual(True, msg5b.redelivered)
+
+ try:
+ extra = queue2.get(timeout=1)
+ self.fail("Got unexpected message in second queue: " + extra.content.body)
+ except Empty: None
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message in original queue: " + extra.content.body)
+ except Empty: None
+
+
+ def test_qos_prefetch_count(self):
+ """
+ Test that the prefetch count specified is honoured
+ """
+ #setup: declare queue and subscribe
+ channel = self.channel
+ channel.queue_declare(queue="test-prefetch-count", exclusive=True)
+ subscription = channel.basic_consume(queue="test-prefetch-count", no_ack=False)
+ queue = self.client.queue(subscription.consumer_tag)
+
+ #set prefetch to 5:
+ channel.basic_qos(prefetch_count=5)
+
+ #publish 10 messages:
+ for i in range(1, 11):
+ channel.basic_publish(routing_key="test-prefetch-count", content=Content("Message %d" % i))
+
+ #only 5 messages should have been delivered:
+ for i in range(1, 6):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #ack messages and check that the next set arrive ok:
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ for i in range(6, 11):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+ except Empty: None
+
+
+
+ def test_qos_prefetch_size(self):
+ """
+ Test that the prefetch size specified is honoured
+ """
+ #setup: declare queue and subscribe
+ channel = self.channel
+ channel.queue_declare(queue="test-prefetch-size", exclusive=True)
+ subscription = channel.basic_consume(queue="test-prefetch-size", no_ack=False)
+ queue = self.client.queue(subscription.consumer_tag)
+
+ #set prefetch to 50 bytes (each message is 9 or 10 bytes):
+ channel.basic_qos(prefetch_size=50)
+
+ #publish 10 messages:
+ for i in range(1, 11):
+ channel.basic_publish(routing_key="test-prefetch-size", content=Content("Message %d" % i))
+
+ #only 5 messages should have been delivered (i.e. 45 bytes worth):
+ for i in range(1, 6):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 6th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #ack messages and check that the next set arrive ok:
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ for i in range(6, 11):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected 11th message in original queue: " + extra.content.body)
+ except Empty: None
+
+ #make sure that a single oversized message still gets delivered
+ large = "abcdefghijklmnopqrstuvwxyz"
+ large = large + "-" + large;
+ channel.basic_publish(routing_key="test-prefetch-size", content=Content(large))
+ msg = queue.get(timeout=1)
+ self.assertEqual(large, msg.content.body)
+
+ def test_get(self):
+ """
+ Test basic_get method
+ """
+ channel = self.channel
+ channel.queue_declare(queue="test-get", exclusive=True)
+
+ #publish some messages (no_ack=True)
+ for i in range(1, 11):
+ channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+
+ #use basic_get to read back the messages, and check that we get an empty at the end
+ for i in range(1, 11):
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get_ok")
+ self.assertEqual("Message %d" % i, reply.content.body)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get_empty")
+
+ #repeat for no_ack=False
+ for i in range(11, 21):
+ channel.basic_publish(routing_key="test-get", content=Content("Message %d" % i))
+
+ for i in range(11, 21):
+ reply = channel.basic_get(no_ack=False)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get_ok")
+ self.assertEqual("Message %d" % i, reply.content.body)
+ if(i == 13):
+ channel.basic_ack(delivery_tag=reply.delivery_tag, multiple=True)
+ if(i in [15, 17, 19]):
+ channel.basic_ack(delivery_tag=reply.delivery_tag)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get_empty")
+
+ #recover(requeue=True)
+ channel.basic_recover(requeue=True)
+
+ #get the unacked messages again (14, 16, 18, 20)
+ for i in [14, 16, 18, 20]:
+ reply = channel.basic_get(no_ack=False)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get_ok")
+ self.assertEqual("Message %d" % i, reply.content.body)
+ channel.basic_ack(delivery_tag=reply.delivery_tag)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get_empty")
+
+ channel.basic_recover(requeue=True)
+
+ reply = channel.basic_get(no_ack=True)
+ self.assertEqual(reply.method.klass.name, "basic")
+ self.assertEqual(reply.method.name, "get_empty")
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/basic.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,120 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class BrokerTests(TestBase):
+ """Tests for basic Broker functionality"""
+
+ def test_ack_and_no_ack(self):
+ """
+ First, this test tries to receive a message with a no-ack
+ consumer. Second, this test tries to explicitly receive and
+ acknowledge a message with an acknowledging consumer.
+ """
+ ch = self.channel
+ self.queue_declare(ch, queue = "myqueue")
+
+ # No ack consumer
+ ctag = ch.basic_consume(queue = "myqueue", no_ack = True).consumer_tag
+ body = "test no-ack"
+ ch.basic_publish(routing_key = "myqueue", content = Content(body))
+ msg = self.client.queue(ctag).get(timeout = 5)
+ self.assert_(msg.content.body == body)
+
+ # Acknowledging consumer
+ self.queue_declare(ch, queue = "otherqueue")
+ ctag = ch.basic_consume(queue = "otherqueue", no_ack = False).consumer_tag
+ body = "test ack"
+ ch.basic_publish(routing_key = "otherqueue", content = Content(body))
+ msg = self.client.queue(ctag).get(timeout = 5)
+ ch.basic_ack(delivery_tag = msg.delivery_tag)
+ self.assert_(msg.content.body == body)
+
+ def test_basic_delivery_immediate(self):
+ """
+ Test basic message delivery where consume is issued before publish
+ """
+ channel = self.channel
+ self.exchange_declare(channel, exchange="test-exchange", type="direct")
+ self.queue_declare(channel, queue="test-queue")
+ channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+ reply = channel.basic_consume(queue="test-queue", no_ack=True)
+ queue = self.client.queue(reply.consumer_tag)
+
+ body = "Immediate Delivery"
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body), immediate=True)
+ msg = queue.get(timeout=5)
+ self.assert_(msg.content.body == body)
+
+ # TODO: Ensure we fail if immediate=True and there's no consumer.
+
+
+ def test_basic_delivery_queued(self):
+ """
+ Test basic message delivery where publish is issued before consume
+ (i.e. requires queueing of the message)
+ """
+ channel = self.channel
+ self.exchange_declare(channel, exchange="test-exchange", type="direct")
+ self.queue_declare(channel, queue="test-queue")
+ channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+ body = "Queued Delivery"
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body))
+ reply = channel.basic_consume(queue="test-queue", no_ack=True)
+ queue = self.client.queue(reply.consumer_tag)
+ msg = queue.get(timeout=5)
+ self.assert_(msg.content.body == body)
+
+ def test_invalid_channel(self):
+ channel = self.client.channel(200)
+ try:
+ channel.queue_declare(exclusive=True)
+ self.fail("Expected error on queue_declare for invalid channel")
+ except Closed, e:
+ self.assertConnectionException(504, e.args[0])
+
+ def test_closed_channel(self):
+ channel = self.client.channel(200)
+ channel.channel_open()
+ channel.channel_close()
+ try:
+ channel.queue_declare(exclusive=True)
+ self.fail("Expected error on queue_declare for closed channel")
+ except Closed, e:
+ self.assertConnectionException(504, e.args[0])
+
+ def test_channel_flow(self):
+ channel = self.channel
+ channel.queue_declare(queue="flow_test_queue", exclusive=True)
+ ctag = channel.basic_consume(queue="flow_test_queue", no_ack=True).consumer_tag
+ incoming = self.client.queue(ctag)
+
+ channel.channel_flow(active=False)
+ channel.basic_publish(routing_key="flow_test_queue", content=Content("abcdefghijklmnopqrstuvwxyz"))
+ try:
+ incoming.get(timeout=1)
+ self.fail("Received message when flow turned off.")
+ except Empty: None
+
+ channel.channel_flow(active=True)
+ msg = incoming.get(timeout=1)
+ self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body)
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/broker.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/example.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/example.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/example.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/example.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,94 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class ExampleTest (TestBase):
+ """
+ An example Qpid test, illustrating the unittest frameowkr and the
+ python Qpid client. The test class must inherit TestCase. The
+ test code uses the Qpid client to interact with a qpid broker and
+ verify it behaves as expected.
+ """
+
+ def test_example(self):
+ """
+ An example test. Note that test functions must start with 'test_'
+ to be recognized by the test framework.
+ """
+
+ # By inheriting TestBase, self.client is automatically connected
+ # and self.channel is automatically opened as channel(1)
+ # Other channel methods mimic the protocol.
+ channel = self.channel
+
+ # Now we can send regular commands. If you want to see what the method
+ # arguments mean or what other commands are available, you can use the
+ # python builtin help() method. For example:
+ #help(chan)
+ #help(chan.exchange_declare)
+
+ # If you want browse the available protocol methods without being
+ # connected to a live server you can use the amqp-doc utility:
+ #
+ # Usage amqp-doc [<options>] <spec> [<pattern_1> ... <pattern_n>]
+ #
+ # Options:
+ # -e, --regexp use regex instead of glob when matching
+
+ # Now that we know what commands are available we can use them to
+ # interact with the server.
+
+ # Here we use ordinal arguments.
+ self.exchange_declare(channel, 0, "test", "direct")
+
+ # Here we use keyword arguments.
+ self.queue_declare(channel, queue="test-queue")
+ channel.queue_bind(queue="test-queue", exchange="test", routing_key="key")
+
+ # Call Channel.basic_consume to register as a consumer.
+ # All the protocol methods return a message object. The message object
+ # has fields corresponding to the reply method fields, plus a content
+ # field that is filled if the reply includes content. In this case the
+ # interesting field is the consumer_tag.
+ reply = channel.basic_consume(queue="test-queue")
+
+ # We can use the Client.queue(...) method to access the queue
+ # corresponding to our consumer_tag.
+ queue = self.client.queue(reply.consumer_tag)
+
+ # Now lets publish a message and see if our consumer gets it. To do
+ # this we need to import the Content class.
+ body = "Hello World!"
+ channel.basic_publish(exchange="test",
+ routing_key="key",
+ content=Content(body))
+
+ # Now we'll wait for the message to arrive. We can use the timeout
+ # argument in case the server hangs. By default queue.get() will wait
+ # until a message arrives or the connection to the server dies.
+ msg = queue.get(timeout=10)
+
+ # And check that we got the right response with assertEqual
+ self.assertEqual(body, msg.content.body)
+
+ # Now acknowledge the message.
+ channel.basic_ack(msg.delivery_tag, True)
+
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/example.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,327 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+"""
+Tests for exchange behaviour.
+
+Test classes ending in 'RuleTests' are derived from rules in amqp.xml.
+"""
+
+import Queue, logging
+from qpid.testlib import TestBase
+from qpid.content import Content
+from qpid.client import Closed
+
+
+class StandardExchangeVerifier:
+ """Verifies standard exchange behavior.
+
+ Used as base class for classes that test standard exchanges."""
+
+ def verifyDirectExchange(self, ex):
+ """Verify that ex behaves like a direct exchange."""
+ self.queue_declare(queue="q")
+ self.channel.queue_bind(queue="q", exchange=ex, routing_key="k")
+ self.assertPublishConsume(exchange=ex, queue="q", routing_key="k")
+ try:
+ self.assertPublishConsume(exchange=ex, queue="q", routing_key="kk")
+ self.fail("Expected Empty exception")
+ except Queue.Empty: None # Expected
+
+ def verifyFanOutExchange(self, ex):
+ """Verify that ex behaves like a fanout exchange."""
+ self.queue_declare(queue="q")
+ self.channel.queue_bind(queue="q", exchange=ex)
+ self.queue_declare(queue="p")
+ self.channel.queue_bind(queue="p", exchange=ex)
+ for qname in ["q", "p"]: self.assertPublishGet(self.consume(qname), ex)
+
+ def verifyTopicExchange(self, ex):
+ """Verify that ex behaves like a topic exchange"""
+ self.queue_declare(queue="a")
+ self.channel.queue_bind(queue="a", exchange=ex, routing_key="a.#.b.*")
+ q = self.consume("a")
+ self.assertPublishGet(q, ex, "a.b.x")
+ self.assertPublishGet(q, ex, "a.x.b.x")
+ self.assertPublishGet(q, ex, "a.x.x.b.x")
+ # Shouldn't match
+ self.channel.basic_publish(exchange=ex, routing_key="a.b")
+ self.channel.basic_publish(exchange=ex, routing_key="a.b.x.y")
+ self.channel.basic_publish(exchange=ex, routing_key="x.a.b.x")
+ self.channel.basic_publish(exchange=ex, routing_key="a.b")
+ self.assert_(q.empty())
+
+ def verifyHeadersExchange(self, ex):
+ """Verify that ex is a headers exchange"""
+ self.queue_declare(queue="q")
+ self.channel.queue_bind(queue="q", exchange=ex, arguments={ "x-match":"all", "name":"fred" , "age":3} )
+ q = self.consume("q")
+ headers = {"name":"fred", "age":3}
+ self.assertPublishGet(q, exchange=ex, properties={'headers':headers})
+ self.channel.basic_publish(exchange=ex) # No headers, won't deliver
+ self.assertEmpty(q);
+
+
+class RecommendedTypesRuleTests(TestBase, StandardExchangeVerifier):
+ """
+ The server SHOULD implement these standard exchange types: topic, headers.
+
+ Client attempts to declare an exchange with each of these standard types.
+ """
+
+ def testDirect(self):
+ """Declare and test a direct exchange"""
+ self.exchange_declare(0, exchange="d", type="direct")
+ self.verifyDirectExchange("d")
+
+ def testFanout(self):
+ """Declare and test a fanout exchange"""
+ self.exchange_declare(0, exchange="f", type="fanout")
+ self.verifyFanOutExchange("f")
+
+ def testTopic(self):
+ """Declare and test a topic exchange"""
+ self.exchange_declare(0, exchange="t", type="topic")
+ self.verifyTopicExchange("t")
+
+ def testHeaders(self):
+ """Declare and test a headers exchange"""
+ self.exchange_declare(0, exchange="h", type="headers")
+ self.verifyHeadersExchange("h")
+
+
+class RequiredInstancesRuleTests(TestBase, StandardExchangeVerifier):
+ """
+ The server MUST, in each virtual host, pre-declare an exchange instance
+ for each standard exchange type that it implements, where the name of the
+ exchange instance is amq. followed by the exchange type name.
+
+ Client creates a temporary queue and attempts to bind to each required
+ exchange instance (amq.fanout, amq.direct, and amq.topic, amq.match if
+ those types are defined).
+ """
+ def testAmqDirect(self): self.verifyDirectExchange("amq.direct")
+
+ def testAmqFanOut(self): self.verifyFanOutExchange("amq.fanout")
+
+ def testAmqTopic(self): self.verifyTopicExchange("amq.topic")
+
+ def testAmqMatch(self): self.verifyHeadersExchange("amq.match")
+
+class DefaultExchangeRuleTests(TestBase, StandardExchangeVerifier):
+ """
+ The server MUST predeclare a direct exchange to act as the default exchange
+ for content Publish methods and for default queue bindings.
+
+ Client checks that the default exchange is active by specifying a queue
+ binding with no exchange name, and publishing a message with a suitable
+ routing key but without specifying the exchange name, then ensuring that
+ the message arrives in the queue correctly.
+ """
+ def testDefaultExchange(self):
+ # Test automatic binding by queue name.
+ self.queue_declare(queue="d")
+ self.assertPublishConsume(queue="d", routing_key="d")
+ # Test explicit bind to default queue
+ self.verifyDirectExchange("")
+
+
+# TODO aconway 2006-09-27: Fill in empty tests:
+
+class DefaultAccessRuleTests(TestBase):
+ """
+ The server MUST NOT allow clients to access the default exchange except
+ by specifying an empty exchange name in the Queue.Bind and content Publish
+ methods.
+ """
+
+class ExtensionsRuleTests(TestBase):
+ """
+ The server MAY implement other exchange types as wanted.
+ """
+
+
+class DeclareMethodMinimumRuleTests(TestBase):
+ """
+ The server SHOULD support a minimum of 16 exchanges per virtual host and
+ ideally, impose no limit except as defined by available resources.
+
+ The client creates as many exchanges as it can until the server reports
+ an error; the number of exchanges successfuly created must be at least
+ sixteen.
+ """
+
+
+class DeclareMethodTicketFieldValidityRuleTests(TestBase):
+ """
+ The client MUST provide a valid access ticket giving "active" access to
+ the realm in which the exchange exists or will be created, or "passive"
+ access if the if-exists flag is set.
+
+ Client creates access ticket with wrong access rights and attempts to use
+ in this method.
+ """
+
+
+class DeclareMethodExchangeFieldReservedRuleTests(TestBase):
+ """
+ Exchange names starting with "amq." are reserved for predeclared and
+ standardised exchanges. The client MUST NOT attempt to create an exchange
+ starting with "amq.".
+
+
+ """
+
+
+class DeclareMethodTypeFieldTypedRuleTests(TestBase):
+ """
+ Exchanges cannot be redeclared with different types. The client MUST not
+ attempt to redeclare an existing exchange with a different type than used
+ in the original Exchange.Declare method.
+
+
+ """
+
+
+class DeclareMethodTypeFieldSupportRuleTests(TestBase):
+ """
+ The client MUST NOT attempt to create an exchange with a type that the
+ server does not support.
+
+
+ """
+
+
+class DeclareMethodPassiveFieldNotFoundRuleTests(TestBase):
+ """
+ If set, and the exchange does not already exist, the server MUST raise a
+ channel exception with reply code 404 (not found).
+ """
+ def test(self):
+ try:
+ self.channel.exchange_declare(exchange="humpty_dumpty", passive=True)
+ self.fail("Expected 404 for passive declaration of unknown exchange.")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
+class DeclareMethodDurableFieldSupportRuleTests(TestBase):
+ """
+ The server MUST support both durable and transient exchanges.
+
+
+ """
+
+
+class DeclareMethodDurableFieldStickyRuleTests(TestBase):
+ """
+ The server MUST ignore the durable field if the exchange already exists.
+
+
+ """
+
+
+class DeclareMethodAutoDeleteFieldStickyRuleTests(TestBase):
+ """
+ The server MUST ignore the auto-delete field if the exchange already
+ exists.
+
+
+ """
+
+
+class DeleteMethodTicketFieldValidityRuleTests(TestBase):
+ """
+ The client MUST provide a valid access ticket giving "active" access
+ rights to the exchange's access realm.
+
+ Client creates access ticket with wrong access rights and attempts to use
+ in this method.
+ """
+
+
+class DeleteMethodExchangeFieldExistsRuleTests(TestBase):
+ """
+ The client MUST NOT attempt to delete an exchange that does not exist.
+ """
+
+
+class HeadersExchangeTests(TestBase):
+ """
+ Tests for headers exchange functionality.
+ """
+ def setUp(self):
+ TestBase.setUp(self)
+ self.queue_declare(queue="q")
+ self.q = self.consume("q")
+
+ def myAssertPublishGet(self, headers):
+ self.assertPublishGet(self.q, exchange="amq.match", properties={'headers':headers})
+
+ def myBasicPublish(self, headers):
+ self.channel.basic_publish(exchange="amq.match", content=Content("foobar", properties={'headers':headers}))
+
+ def testMatchAll(self):
+ self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'all', "name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred", "age":3, "extra":"ignoreme"})
+
+ # None of these should match
+ self.myBasicPublish({})
+ self.myBasicPublish({"name":"barney"})
+ self.myBasicPublish({"name":10})
+ self.myBasicPublish({"name":"fred", "age":2})
+ self.assertEmpty(self.q)
+
+ def testMatchAny(self):
+ self.channel.queue_bind(queue="q", exchange="amq.match", arguments={ 'x-match':'any', "name":"fred", "age":3})
+ self.myAssertPublishGet({"name":"fred"})
+ self.myAssertPublishGet({"name":"fred", "ignoreme":10})
+ self.myAssertPublishGet({"ignoreme":10, "age":3})
+
+ # Wont match
+ self.myBasicPublish({})
+ self.myBasicPublish({"irrelevant":0})
+ self.assertEmpty(self.q)
+
+
+class MiscellaneousErrorsTests(TestBase):
+ """
+ Test some miscellaneous error conditions
+ """
+ def testTypeNotKnown(self):
+ try:
+ self.channel.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
+ self.fail("Expected 503 for declaration of unknown exchange type.")
+ except Closed, e:
+ self.assertConnectionException(503, e.args[0])
+
+ def testDifferentDeclaredType(self):
+ self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
+ try:
+ self.channel.exchange_declare(exchange="test_different_declared_type_exchange", type="topic")
+ self.fail("Expected 530 for redeclaration of exchange with different type.")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+ #cleanup
+ other = self.connect()
+ c2 = other.channel(1)
+ c2.channel_open()
+ c2.exchange_delete(exchange="test_different_declared_type_exchange")
+
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/exchange.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,255 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class QueueTests(TestBase):
+ """Tests for 'methods' on the amqp queue 'class'"""
+
+ def test_purge(self):
+ """
+ Test that the purge method removes messages from the queue
+ """
+ channel = self.channel
+ #setup, declare a queue and add some messages to it:
+ channel.exchange_declare(exchange="test-exchange", type="direct")
+ channel.queue_declare(queue="test-queue", exclusive=True)
+ channel.queue_bind(queue="test-queue", exchange="test-exchange", routing_key="key")
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("one"))
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("two"))
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("three"))
+
+ #check that the queue now reports 3 messages:
+ reply = channel.queue_declare(queue="test-queue")
+ self.assertEqual(3, reply.message_count)
+
+ #now do the purge, then test that three messages are purged and the count drops to 0
+ reply = channel.queue_purge(queue="test-queue");
+ self.assertEqual(3, reply.message_count)
+ reply = channel.queue_declare(queue="test-queue")
+ self.assertEqual(0, reply.message_count)
+
+ #send a further message and consume it, ensuring that the other messages are really gone
+ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("four"))
+ reply = channel.basic_consume(queue="test-queue", no_ack=True)
+ queue = self.client.queue(reply.consumer_tag)
+ msg = queue.get(timeout=1)
+ self.assertEqual("four", msg.content.body)
+
+ #check error conditions (use new channels):
+ channel = self.client.channel(2)
+ channel.channel_open()
+ try:
+ #queue specified but doesn't exist:
+ channel.queue_purge(queue="invalid-queue")
+ self.fail("Expected failure when purging non-existent queue")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ channel = self.client.channel(3)
+ channel.channel_open()
+ try:
+ #queue not specified and none previously declared for channel:
+ channel.queue_purge()
+ self.fail("Expected failure when purging unspecified queue")
+ except Closed, e:
+ self.assertConnectionException(530, e.args[0])
+
+ #cleanup
+ other = self.connect()
+ channel = other.channel(1)
+ channel.channel_open()
+ channel.exchange_delete(exchange="test-exchange")
+
+ def test_declare_exclusive(self):
+ """
+ Test that the exclusive field is honoured in queue.declare
+ """
+ # TestBase.setUp has already opened channel(1)
+ c1 = self.channel
+ # Here we open a second separate connection:
+ other = self.connect()
+ c2 = other.channel(1)
+ c2.channel_open()
+
+ #declare an exclusive queue:
+ c1.queue_declare(queue="exclusive-queue", exclusive="True")
+ try:
+ #other connection should not be allowed to declare this:
+ c2.queue_declare(queue="exclusive-queue", exclusive="True")
+ self.fail("Expected second exclusive queue_declare to raise a channel exception")
+ except Closed, e:
+ self.assertChannelException(405, e.args[0])
+
+
+ def test_declare_passive(self):
+ """
+ Test that the passive field is honoured in queue.declare
+ """
+ channel = self.channel
+ #declare an exclusive queue:
+ channel.queue_declare(queue="passive-queue-1", exclusive="True")
+ channel.queue_declare(queue="passive-queue-1", passive="True")
+ try:
+ #other connection should not be allowed to declare this:
+ channel.queue_declare(queue="passive-queue-2", passive="True")
+ self.fail("Expected passive declaration of non-existant queue to raise a channel exception")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
+ def test_bind(self):
+ """
+ Test various permutations of the queue.bind method
+ """
+ channel = self.channel
+ channel.queue_declare(queue="queue-1", exclusive="True")
+
+ #straightforward case, both exchange & queue exist so no errors expected:
+ channel.queue_bind(queue="queue-1", exchange="amq.direct", routing_key="key1")
+
+ #bind the default queue for the channel (i.e. last one declared):
+ channel.queue_bind(exchange="amq.direct", routing_key="key2")
+
+ #use the queue name where neither routing key nor queue are specified:
+ channel.queue_bind(exchange="amq.direct")
+
+ #try and bind to non-existant exchange
+ try:
+ channel.queue_bind(queue="queue-1", exchange="an-invalid-exchange", routing_key="key1")
+ self.fail("Expected bind to non-existant exchange to fail")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ #need to reopen a channel:
+ channel = self.client.channel(2)
+ channel.channel_open()
+
+ #try and bind non-existant queue:
+ try:
+ channel.queue_bind(queue="queue-2", exchange="amq.direct", routing_key="key1")
+ self.fail("Expected bind of non-existant queue to fail")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
+ def test_delete_simple(self):
+ """
+ Test basic queue deletion
+ """
+ channel = self.channel
+
+ #straight-forward case:
+ channel.queue_declare(queue="delete-me")
+ channel.basic_publish(routing_key="delete-me", content=Content("a"))
+ channel.basic_publish(routing_key="delete-me", content=Content("b"))
+ channel.basic_publish(routing_key="delete-me", content=Content("c"))
+ reply = channel.queue_delete(queue="delete-me")
+ self.assertEqual(3, reply.message_count)
+ #check that it has gone be declaring passively
+ try:
+ channel.queue_declare(queue="delete-me", passive="True")
+ self.fail("Queue has not been deleted")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ #check attempted deletion of non-existant queue is handled correctly:
+ channel = self.client.channel(2)
+ channel.channel_open()
+ try:
+ channel.queue_delete(queue="i-dont-exist", if_empty="True")
+ self.fail("Expected delete of non-existant queue to fail")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
+
+ def test_delete_ifempty(self):
+ """
+ Test that if_empty field of queue_delete is honoured
+ """
+ channel = self.channel
+
+ #create a queue and add a message to it (use default binding):
+ channel.queue_declare(queue="delete-me-2")
+ channel.queue_declare(queue="delete-me-2", passive="True")
+ channel.basic_publish(routing_key="delete-me-2", content=Content("message"))
+
+ #try to delete, but only if empty:
+ try:
+ channel.queue_delete(queue="delete-me-2", if_empty="True")
+ self.fail("Expected delete if_empty to fail for non-empty queue")
+ except Closed, e:
+ self.assertChannelException(406, e.args[0])
+
+ #need new channel now:
+ channel = self.client.channel(2)
+ channel.channel_open()
+
+ #empty queue:
+ reply = channel.basic_consume(queue="delete-me-2", no_ack=True)
+ queue = self.client.queue(reply.consumer_tag)
+ msg = queue.get(timeout=1)
+ self.assertEqual("message", msg.content.body)
+ channel.basic_cancel(consumer_tag=reply.consumer_tag)
+
+ #retry deletion on empty queue:
+ channel.queue_delete(queue="delete-me-2", if_empty="True")
+
+ #check that it has gone by declaring passively:
+ try:
+ channel.queue_declare(queue="delete-me-2", passive="True")
+ self.fail("Queue has not been deleted")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+ def test_delete_ifunused(self):
+ """
+ Test that if_unused field of queue_delete is honoured
+ """
+ channel = self.channel
+
+ #create a queue and register a consumer:
+ channel.queue_declare(queue="delete-me-3")
+ channel.queue_declare(queue="delete-me-3", passive="True")
+ reply = channel.basic_consume(queue="delete-me-3", no_ack=True)
+
+ #need new channel now:
+ channel2 = self.client.channel(2)
+ channel2.channel_open()
+ #try to delete, but only if empty:
+ try:
+ channel2.queue_delete(queue="delete-me-3", if_unused="True")
+ self.fail("Expected delete if_unused to fail for queue with existing consumer")
+ except Closed, e:
+ self.assertChannelException(406, e.args[0])
+
+
+ channel.basic_cancel(consumer_tag=reply.consumer_tag)
+ channel.queue_delete(queue="delete-me-3", if_unused="True")
+ #check that it has gone by declaring passively:
+ try:
+ channel.queue_declare(queue="delete-me-3", passive="True")
+ self.fail("Queue has not been deleted")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
+
+
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/queue.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+#
+# Tests for the testlib itself.
+#
+
+from qpid.content import Content
+from qpid.testlib import TestBase
+from Queue import Empty
+
+import sys
+from traceback import *
+
+def mytrace(frame, event, arg):
+ print_stack(frame);
+ print "===="
+ return mytrace
+
+class TestBaseTest(TestBase):
+ """Verify TestBase functions work as expected"""
+
+ def testAssertEmptyPass(self):
+ """Test assert empty works"""
+ self.queue_declare(queue="empty")
+ q = self.consume("empty")
+ self.assertEmpty(q)
+ try:
+ q.get(timeout=1)
+ self.fail("Queue is not empty.")
+ except Empty: None # Ignore
+
+ def testAssertEmptyFail(self):
+ self.queue_declare(queue="full")
+ q = self.consume("full")
+ self.channel.basic_publish(routing_key="full")
+ try:
+ self.assertEmpty(q);
+ self.fail("assertEmpty did not assert on non-empty queue")
+ except AssertionError: None # Ignore
+
+ def testMessageProperties(self):
+ """Verify properties are passed with message"""
+ props={"headers":{"x":1, "y":2}}
+ self.queue_declare(queue="q")
+ q = self.consume("q")
+ self.assertPublishGet(q, routing_key="q", properties=props)
+
+
+
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/testlib.py
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org