You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/05 16:08:46 UTC
svn commit: r906961 [4/4] - in /qpid/trunk/qpid: cpp/src/tests/ python/
python/tests_0-10/ python/tests_0-8/ python/tests_0-9/ tests/ tests/src/
tests/src/py/ tests/src/py/qpid_tests/ tests/src/py/qpid_tests/broker_0_10/
tests/src/py/qpid_tests/broker_...
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/tx.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/tx.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/tx.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/tx.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,209 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class TxTests(TestBase):
+ """
+ Tests for 'methods' on the amqp tx 'class'
+ """
+
+ def test_commit(self):
+ """
+ Test that commited publishes are delivered and commited acks are not re-delivered
+ """
+ channel = self.channel
+ queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-commit-a", "tx-commit-b", "tx-commit-c")
+ channel.tx_commit()
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("TxMessage %d" % i, msg.content.body)
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("TxMessage 6", msg.content.body)
+
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("TxMessage 7", msg.content.body)
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ #cleanup
+ channel.basic_ack(delivery_tag=0, multiple=True)
+ channel.tx_commit()
+
+ def test_auto_rollback(self):
+ """
+ Test that a channel closed with an open transaction is effectively rolled back
+ """
+ channel = self.channel
+ queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-autorollback-a", "tx-autorollback-b", "tx-autorollback-c")
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ channel.tx_rollback()
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.content.body)
+
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.content.body)
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ #cleanup
+ channel.basic_ack(delivery_tag=0, multiple=True)
+ channel.tx_commit()
+
+ def test_rollback(self):
+ """
+ Test that rolled back publishes are not delivered and rolled back acks are re-delivered
+ """
+ channel = self.channel
+ queue_a, queue_b, queue_c = self.perform_txn_work(channel, "tx-rollback-a", "tx-rollback-b", "tx-rollback-c")
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ channel.tx_rollback()
+
+ #check results
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.content.body)
+
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.content.body)
+
+ for q in [queue_a, queue_b, queue_c]:
+ try:
+ extra = q.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
+
+ #cleanup
+ channel.basic_ack(delivery_tag=0, multiple=True)
+ channel.tx_commit()
+
+ def perform_txn_work(self, channel, name_a, name_b, name_c):
+ """
+ Utility method that does some setup and some work under a transaction. Used for testing both
+ commit and rollback
+ """
+ #setup:
+ channel.queue_declare(queue=name_a, exclusive=True)
+ channel.queue_declare(queue=name_b, exclusive=True)
+ channel.queue_declare(queue=name_c, exclusive=True)
+
+ key = "my_key_" + name_b
+ topic = "my_topic_" + name_c
+
+ channel.queue_bind(queue=name_b, exchange="amq.direct", routing_key=key)
+ channel.queue_bind(queue=name_c, exchange="amq.topic", routing_key=topic)
+
+ for i in range(1, 5):
+ channel.basic_publish(routing_key=name_a, content=Content("Message %d" % i))
+
+ channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("Message 6"))
+ channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("Message 7"))
+
+ channel.tx_select()
+
+ #consume and ack messages
+ sub_a = channel.basic_consume(queue=name_a, no_ack=False)
+ queue_a = self.client.queue(sub_a.consumer_tag)
+ for i in range(1, 5):
+ msg = queue_a.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True)
+
+ sub_b = channel.basic_consume(queue=name_b, no_ack=False)
+ queue_b = self.client.queue(sub_b.consumer_tag)
+ msg = queue_b.get(timeout=1)
+ self.assertEqual("Message 6", msg.content.body)
+ channel.basic_ack(delivery_tag=msg.delivery_tag)
+
+ sub_c = channel.basic_consume(queue=name_c, no_ack=False)
+ queue_c = self.client.queue(sub_c.consumer_tag)
+ msg = queue_c.get(timeout=1)
+ self.assertEqual("Message 7", msg.content.body)
+ channel.basic_ack(delivery_tag=msg.delivery_tag)
+
+ #publish messages
+ for i in range(1, 5):
+ channel.basic_publish(routing_key=topic, exchange="amq.topic", content=Content("TxMessage %d" % i))
+
+ channel.basic_publish(routing_key=key, exchange="amq.direct", content=Content("TxMessage 6"))
+ channel.basic_publish(routing_key=name_a, content=Content("TxMessage 7"))
+
+ return queue_a, queue_b, queue_c
+
+ def test_commit_overlapping_acks(self):
+ """
+ Test that logically 'overlapping' acks do not cause errors on commit
+ """
+ channel = self.channel
+ channel.queue_declare(queue="commit-overlapping", exclusive=True)
+ for i in range(1, 10):
+ channel.basic_publish(routing_key="commit-overlapping", content=Content("Message %d" % i))
+
+
+ channel.tx_select()
+
+ sub = channel.basic_consume(queue="commit-overlapping", no_ack=False)
+ queue = self.client.queue(sub.consumer_tag)
+ for i in range(1, 10):
+ msg = queue.get(timeout=1)
+ self.assertEqual("Message %d" % i, msg.content.body)
+ if i in [3, 6, 10]:
+ channel.basic_ack(delivery_tag=msg.delivery_tag)
+
+ channel.tx_commit()
+
+ #check all have been acked:
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.content.body)
+ except Empty: None
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_8/tx.py
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Fri Feb 5 15:08:44 2010
@@ -0,0 +1 @@
+*.pyc
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,22 @@
+# Do not delete - marks this directory as a python package.
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import query, queue
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/__init__.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/query.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/query.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/query.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/query.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,224 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class QueryTests(TestBase):
+ """Tests for various query methods introduced in 0-10 and available in 0-9 for preview"""
+
+ def test_exchange_query(self):
+ """
+ Test that the exchange_query method works as expected
+ """
+ channel = self.channel
+ #check returned type for the standard exchanges
+ self.assertEqual("direct", channel.exchange_query(name="amq.direct").type)
+ self.assertEqual("topic", channel.exchange_query(name="amq.topic").type)
+ self.assertEqual("fanout", channel.exchange_query(name="amq.fanout").type)
+ self.assertEqual("headers", channel.exchange_query(name="amq.match").type)
+ self.assertEqual("direct", channel.exchange_query(name="").type)
+ #declare an exchange
+ channel.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False)
+ #check that the result of a query is as expected
+ response = channel.exchange_query(name="my-test-exchange")
+ self.assertEqual("direct", response.type)
+ self.assertEqual(False, response.durable)
+ self.assertEqual(False, response.not_found)
+ #delete the exchange
+ channel.exchange_delete(exchange="my-test-exchange")
+ #check that the query now reports not-found
+ self.assertEqual(True, channel.exchange_query(name="my-test-exchange").not_found)
+
+ def test_binding_query_direct(self):
+ """
+ Test that the binding_query method works as expected with the direct exchange
+ """
+ self.binding_query_with_key("amq.direct")
+
+ def test_binding_query_topic(self):
+ """
+ Test that the binding_query method works as expected with the direct exchange
+ """
+ self.binding_query_with_key("amq.topic")
+
+ def binding_query_with_key(self, exchange_name):
+ channel = self.channel
+ #setup: create two queues
+ channel.queue_declare(queue="used-queue", exclusive=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True)
+
+ channel.queue_bind(exchange=exchange_name, queue="used-queue", routing_key="used-key")
+
+ # test detection of any binding to specific queue
+ response = channel.binding_query(exchange=exchange_name, queue="used-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+
+ # test detection of specific binding to any queue
+ response = channel.binding_query(exchange=exchange_name, routing_key="used-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.key_not_matched)
+
+ # test detection of specific binding to specific queue
+ response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="used-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+ self.assertEqual(False, response.key_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = channel.binding_query(exchange=exchange_name, queue="unused-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ # test unspecified queue, unmatched binding
+ response = channel.binding_query(exchange=exchange_name, routing_key="unused-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.key_not_matched)
+
+ # test matched queue, unmatched binding
+ response = channel.binding_query(exchange=exchange_name, queue="used-queue", routing_key="unused-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+ self.assertEqual(True, response.key_not_matched)
+
+ # test unmatched queue, matched binding
+ response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="used-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(False, response.key_not_matched)
+
+ # test unmatched queue, unmatched binding
+ response = channel.binding_query(exchange=exchange_name, queue="unused-queue", routing_key="unused-key")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(True, response.key_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+
+ #test queue not found
+ self.assertEqual(True, channel.binding_query(exchange=exchange_name, queue="unknown-queue").queue_not_found)
+
+
+ def test_binding_query_fanout(self):
+ """
+ Test that the binding_query method works as expected with fanout exchange
+ """
+ channel = self.channel
+ #setup
+ channel.queue_declare(queue="used-queue", exclusive=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True)
+ channel.queue_bind(exchange="amq.fanout", queue="used-queue")
+
+ # test detection of any binding to specific queue
+ response = channel.binding_query(exchange="amq.fanout", queue="used-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = channel.binding_query(exchange="amq.fanout", queue="unused-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+
+ #test queue not found
+ self.assertEqual(True, channel.binding_query(exchange="amq.fanout", queue="unknown-queue").queue_not_found)
+
+ def test_binding_query_header(self):
+ """
+ Test that the binding_query method works as expected with headers exchanges
+ """
+ channel = self.channel
+ #setup
+ channel.queue_declare(queue="used-queue", exclusive=True)
+ channel.queue_declare(queue="unused-queue", exclusive=True)
+ channel.queue_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )
+
+ # test detection of any binding to specific queue
+ response = channel.binding_query(exchange="amq.match", queue="used-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+
+ # test detection of specific binding to any queue
+ response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "a":"A"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.args_not_matched)
+
+ # test detection of specific binding to specific queue
+ response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+ self.assertEqual(False, response.args_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = channel.binding_query(exchange="amq.match", queue="unused-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ # test unspecified queue, unmatched binding
+ response = channel.binding_query(exchange="amq.match", arguments={"x-match":"all", "b":"B"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.args_not_matched)
+
+ # test matched queue, unmatched binding
+ response = channel.binding_query(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(False, response.queue_not_matched)
+ self.assertEqual(True, response.args_not_matched)
+
+ # test unmatched queue, matched binding
+ response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(False, response.args_not_matched)
+
+ # test unmatched queue, unmatched binding
+ response = channel.binding_query(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"})
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(True, response.args_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, channel.binding_query(exchange="unknown-exchange").exchange_not_found)
+
+ #test queue not found
+ self.assertEqual(True, channel.binding_query(exchange="amq.match", queue="unknown-queue").queue_not_found)
+
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/query.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,111 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase
+
+class QueueTests(TestBase):
+ """Tests for 'methods' on the amqp queue 'class'"""
+
+ def test_unbind_direct(self):
+ self.unbind_test(exchange="amq.direct", routing_key="key")
+
+ def test_unbind_topic(self):
+ self.unbind_test(exchange="amq.topic", routing_key="key")
+
+ def test_unbind_fanout(self):
+ self.unbind_test(exchange="amq.fanout")
+
+ def test_unbind_headers(self):
+ self.unbind_test(exchange="amq.match", args={ "x-match":"all", "a":"b"}, headers={"a":"b"})
+
+ def unbind_test(self, exchange, routing_key="", args=None, headers={}):
+ #bind two queues and consume from them
+ channel = self.channel
+
+ channel.queue_declare(queue="queue-1", exclusive="True")
+ channel.queue_declare(queue="queue-2", exclusive="True")
+
+ channel.basic_consume(queue="queue-1", consumer_tag="queue-1", no_ack=True)
+ channel.basic_consume(queue="queue-2", consumer_tag="queue-2", no_ack=True)
+
+ queue1 = self.client.queue("queue-1")
+ queue2 = self.client.queue("queue-2")
+
+ channel.queue_bind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
+ channel.queue_bind(exchange=exchange, queue="queue-2", routing_key=routing_key, arguments=args)
+
+ #send a message that will match both bindings
+ channel.basic_publish(exchange=exchange, routing_key=routing_key,
+ content=Content("one", properties={"headers": headers}))
+
+ #unbind first queue
+ channel.queue_unbind(exchange=exchange, queue="queue-1", routing_key=routing_key, arguments=args)
+
+ #send another message
+ channel.basic_publish(exchange=exchange, routing_key=routing_key,
+ content=Content("two", properties={"headers": headers}))
+
+ #check one queue has both messages and the other has only one
+ self.assertEquals("one", queue1.get(timeout=1).content.body)
+ try:
+ msg = queue1.get(timeout=1)
+ self.fail("Got extra message: %s" % msg.body)
+ except Empty: pass
+
+ self.assertEquals("one", queue2.get(timeout=1).content.body)
+ self.assertEquals("two", queue2.get(timeout=1).content.body)
+ try:
+ msg = queue2.get(timeout=1)
+ self.fail("Got extra message: " + msg)
+ except Empty: pass
+
+ def test_autodelete_shared(self):
+ """
+ Test auto-deletion (of non-exclusive queues)
+ """
+ channel = self.channel
+ other = self.connect()
+ channel2 = other.channel(1)
+ channel2.channel_open()
+
+ channel.queue_declare(queue="auto-delete-me", auto_delete=True)
+
+ #consume from both channels
+ reply = channel.basic_consume(queue="auto-delete-me", no_ack=True)
+ channel2.basic_consume(queue="auto-delete-me", no_ack=True)
+
+ #implicit cancel
+ channel2.channel_close()
+
+ #check it is still there
+ channel.queue_declare(queue="auto-delete-me", passive=True)
+
+ #explicit cancel => queue is now unused again:
+ channel.basic_cancel(consumer_tag=reply.consumer_tag)
+
+ #NOTE: this assumes there is no timeout in use
+
+ #check that it has gone be declaring passively
+ try:
+ channel.queue_declare(queue="auto-delete-me", passive=True)
+ self.fail("Expected queue to have been deleted")
+ except Closed, e:
+ self.assertChannelException(404, e.args[0])
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Fri Feb 5 15:08:44 2010
@@ -0,0 +1,2 @@
+/qpid/branches/qmfv2/qpid/python/tests_0-9/queue.py:902858,902894
+/qpid/branches/qpid.rnr/python/tests_0-9/queue.py:894071-896158
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org