You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/05 16:08:46 UTC
svn commit: r906961 [2/4] - in /qpid/trunk/qpid: cpp/src/tests/ python/
python/tests_0-10/ python/tests_0-8/ python/tests_0-9/ tests/ tests/src/
tests/src/py/ tests/src/py/qpid_tests/ tests/src/py/qpid_tests/broker_0_10/
tests/src/py/qpid_tests/broker_...
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,467 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import TestBase010
+from qpid.management import managementChannel, managementClient
+from threading import Condition
+from time import sleep
+import qmf.console
+
+class ManagementTest (TestBase010):
+ """
+ Tests for the management hooks
+ """
+
+ def test_broker_connectivity_oldAPI (self):
+ """
+ Call the "echo" method on the broker to verify it is alive and talking.
+ """
+ session = self.session
+
+ mc = managementClient ()
+ mch = mc.addChannel (session)
+
+ mc.syncWaitForStable (mch)
+ brokers = mc.syncGetObjects (mch, "broker")
+ self.assertEqual (len (brokers), 1)
+ broker = brokers[0]
+ args = {}
+ body = "Echo Message Body"
+ args["body"] = body
+
+ for seq in range (1, 5):
+ args["sequence"] = seq
+ res = mc.syncCallMethod (mch, broker.id, broker.classKey, "echo", args)
+ self.assertEqual (res.status, 0)
+ self.assertEqual (res.statusText, "OK")
+ self.assertEqual (res.sequence, seq)
+ self.assertEqual (res.body, body)
+ mc.removeChannel (mch)
+
+ def test_methods_sync (self):
+ """
+ Call the "echo" method on the broker to verify it is alive and talking.
+ """
+ session = self.session
+ self.startQmf()
+
+ brokers = self.qmf.getObjects(_class="broker")
+ self.assertEqual(len(brokers), 1)
+ broker = brokers[0]
+
+ body = "Echo Message Body"
+ for seq in range(1, 20):
+ res = broker.echo(seq, body)
+ self.assertEqual(res.status, 0)
+ self.assertEqual(res.text, "OK")
+ self.assertEqual(res.sequence, seq)
+ self.assertEqual(res.body, body)
+
+ def test_get_objects(self):
+ self.startQmf()
+
+ # get the package list, verify that the qpid broker package is there
+ packages = self.qmf.getPackages()
+ assert 'org.apache.qpid.broker' in packages
+
+ # get the schema class keys for the broker, verify the broker table and link-down event
+ keys = self.qmf.getClasses('org.apache.qpid.broker')
+ broker = None
+ linkDown = None
+ for key in keys:
+ if key.getClassName() == "broker": broker = key
+ if key.getClassName() == "brokerLinkDown" : linkDown = key
+ assert broker
+ assert linkDown
+
+ brokerObjs = self.qmf.getObjects(_class="broker")
+ assert len(brokerObjs) == 1
+ brokerObjs = self.qmf.getObjects(_key=broker)
+ assert len(brokerObjs) == 1
+
+ def test_self_session_id (self):
+ self.startQmf()
+ sessionId = self.qmf_broker.getSessionId()
+ brokerSessions = self.qmf.getObjects(_class="session")
+
+ found = False
+ for bs in brokerSessions:
+ if bs.name == sessionId:
+ found = True
+ self.assertEqual (found, True)
+
+ def test_standard_exchanges (self):
+ self.startQmf()
+
+ exchanges = self.qmf.getObjects(_class="exchange")
+ exchange = self.findExchange (exchanges, "")
+ self.assertEqual (exchange.type, "direct")
+ exchange = self.findExchange (exchanges, "amq.direct")
+ self.assertEqual (exchange.type, "direct")
+ exchange = self.findExchange (exchanges, "amq.topic")
+ self.assertEqual (exchange.type, "topic")
+ exchange = self.findExchange (exchanges, "amq.fanout")
+ self.assertEqual (exchange.type, "fanout")
+ exchange = self.findExchange (exchanges, "amq.match")
+ self.assertEqual (exchange.type, "headers")
+ exchange = self.findExchange (exchanges, "qpid.management")
+ self.assertEqual (exchange.type, "topic")
+
+ def findExchange (self, exchanges, name):
+ for exchange in exchanges:
+ if exchange.name == name:
+ return exchange
+ return None
+
+ def test_move_queued_messages(self):
+ """
+ Test ability to move messages from the head of one queue to another.
+ Need to test moveing all and N messages.
+ """
+ self.startQmf()
+ session = self.session
+ "Set up source queue"
+ session.queue_declare(queue="src-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="src-queue", exchange="amq.direct", binding_key="routing_key")
+
+ twenty = range(1,21)
+ props = session.delivery_properties(routing_key="routing_key")
+ for count in twenty:
+ body = "Move Message %d" % count
+ src_msg = Message(props, body)
+ session.message_transfer(destination="amq.direct", message=src_msg)
+
+ "Set up destination queue"
+ session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="dest-queue", exchange="amq.direct")
+
+ queues = self.qmf.getObjects(_class="queue")
+
+ "Move 10 messages from src-queue to dest-queue"
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
+ self.assertEqual (result.status, 0)
+
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
+
+ self.assertEqual (sq.msgDepth,10)
+ self.assertEqual (dq.msgDepth,10)
+
+ "Move all remaining messages to destination"
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
+ self.assertEqual (result.status,0)
+
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
+
+ self.assertEqual (sq.msgDepth,0)
+ self.assertEqual (dq.msgDepth,20)
+
+ "Use a bad source queue name"
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
+ self.assertEqual (result.status,4)
+
+ "Use a bad destination queue name"
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
+ self.assertEqual (result.status,4)
+
+ " Use a large qty (40) to move from dest-queue back to "
+ " src-queue- should move all "
+ result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
+ self.assertEqual (result.status,0)
+
+ sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+ dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
+
+ self.assertEqual (sq.msgDepth,20)
+ self.assertEqual (dq.msgDepth,0)
+
+ "Consume the messages of the queue and check they are all there in order"
+ session.message_subscribe(queue="src-queue", destination="tag")
+ session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session.incoming("tag")
+ for count in twenty:
+ consumed_msg = queue.get(timeout=1)
+ body = "Move Message %d" % count
+ self.assertEqual(body, consumed_msg.body)
+
+ def test_purge_queue(self):
+ """
+ Test ability to purge messages from the head of a queue.
+ Need to test moveing all, 1 (top message) and N messages.
+ """
+ self.startQmf()
+ session = self.session
+ "Set up purge queue"
+ session.queue_declare(queue="purge-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="purge-queue", exchange="amq.direct", binding_key="routing_key")
+
+ twenty = range(1,21)
+ props = session.delivery_properties(routing_key="routing_key")
+ for count in twenty:
+ body = "Purge Message %d" % count
+ msg = Message(props, body)
+ session.message_transfer(destination="amq.direct", message=msg)
+
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+
+ "Purge top message from purge-queue"
+ result = pq.purge(1)
+ self.assertEqual (result.status, 0)
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+ self.assertEqual (pq.msgDepth,19)
+
+ "Purge top 9 messages from purge-queue"
+ result = pq.purge(9)
+ self.assertEqual (result.status, 0)
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+ self.assertEqual (pq.msgDepth,10)
+
+ "Purge all messages from purge-queue"
+ result = pq.purge(0)
+ self.assertEqual (result.status, 0)
+ pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+ self.assertEqual (pq.msgDepth,0)
+
+ def test_reroute_queue(self):
+ """
+ Test ability to reroute messages from the head of a queue.
+ Need to test moving all, 1 (top message) and N messages.
+ """
+ self.startQmf()
+ session = self.session
+ "Set up test queue"
+ session.exchange_declare(exchange="alt.direct1", type="direct")
+ session.queue_declare(queue="alt-queue1", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="alt-queue1", exchange="alt.direct1", binding_key="routing_key")
+ session.exchange_declare(exchange="alt.direct2", type="direct")
+ session.queue_declare(queue="alt-queue2", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue="alt-queue2", exchange="alt.direct2", binding_key="routing_key")
+ session.queue_declare(queue="reroute-queue", exclusive=True, auto_delete=True, alternate_exchange="alt.direct1")
+ session.exchange_bind(queue="reroute-queue", exchange="amq.direct", binding_key="routing_key")
+
+ twenty = range(1,21)
+ props = session.delivery_properties(routing_key="routing_key")
+ for count in twenty:
+ body = "Reroute Message %d" % count
+ msg = Message(props, body)
+ session.message_transfer(destination="amq.direct", message=msg)
+
+ pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0]
+
+ "Reroute top message from reroute-queue to alternate exchange"
+ result = pq.reroute(1, True, "")
+ self.assertEqual(result.status, 0)
+ pq.update()
+ aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0]
+ self.assertEqual(pq.msgDepth,19)
+ self.assertEqual(aq.msgDepth,1)
+
+ "Reroute top 9 messages from reroute-queue to alt.direct2"
+ result = pq.reroute(9, False, "alt.direct2")
+ self.assertEqual(result.status, 0)
+ pq.update()
+ aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
+ self.assertEqual(pq.msgDepth,10)
+ self.assertEqual(aq.msgDepth,9)
+
+ "Reroute using a non-existent exchange"
+ result = pq.reroute(0, False, "amq.nosuchexchange")
+ self.assertEqual(result.status, 4)
+
+ "Reroute all messages from reroute-queue"
+ result = pq.reroute(0, False, "alt.direct2")
+ self.assertEqual(result.status, 0)
+ pq.update()
+ aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
+ self.assertEqual(pq.msgDepth,0)
+ self.assertEqual(aq.msgDepth,19)
+
+ "Make more messages"
+ twenty = range(1,21)
+ props = session.delivery_properties(routing_key="routing_key")
+ for count in twenty:
+ body = "Reroute Message %d" % count
+ msg = Message(props, body)
+ session.message_transfer(destination="amq.direct", message=msg)
+
+ "Reroute onto the same queue"
+ result = pq.reroute(0, False, "amq.direct")
+ self.assertEqual(result.status, 0)
+ pq.update()
+ self.assertEqual(pq.msgDepth,20)
+
+
+ def test_methods_async (self):
+ """
+ """
+ class Handler (qmf.console.Console):
+ def __init__(self):
+ self.cv = Condition()
+ self.xmtList = {}
+ self.rcvList = {}
+
+ def methodResponse(self, broker, seq, response):
+ self.cv.acquire()
+ try:
+ self.rcvList[seq] = response
+ finally:
+ self.cv.release()
+
+ def request(self, broker, count):
+ self.count = count
+ for idx in range(count):
+ self.cv.acquire()
+ try:
+ seq = broker.echo(idx, "Echo Message", _async = True)
+ self.xmtList[seq] = idx
+ finally:
+ self.cv.release()
+
+ def check(self):
+ if self.count != len(self.xmtList):
+ return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList))
+ lost = 0
+ mismatched = 0
+ for seq in self.xmtList:
+ value = self.xmtList[seq]
+ if seq in self.rcvList:
+ result = self.rcvList.pop(seq)
+ if result.sequence != value:
+ mismatched += 1
+ else:
+ lost += 1
+ spurious = len(self.rcvList)
+ if lost == 0 and mismatched == 0 and spurious == 0:
+ return "pass"
+ else:
+ return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious)
+
+ handler = Handler()
+ self.startQmf(handler)
+ brokers = self.qmf.getObjects(_class="broker")
+ self.assertEqual(len(brokers), 1)
+ broker = brokers[0]
+ handler.request(broker, 20)
+ sleep(1)
+ self.assertEqual(handler.check(), "pass")
+
+ def test_connection_close(self):
+ """
+ Test management method for closing connection
+ """
+ self.startQmf()
+ conn = self.connect()
+ session = conn.session("my-named-session")
+
+ #using qmf find named session and close the corresponding connection:
+ qmf_ssn_object = self.qmf.getObjects(_class="session", name="my-named-session")[0]
+ qmf_ssn_object._connectionRef_.close()
+
+ #check that connection is closed
+ try:
+ conn.session("another-session")
+ self.fail("Expected failure from closed connection")
+ except: None
+
+ #make sure that the named session has been closed and the name can be re-used
+ conn = self.connect()
+ session = conn.session("my-named-session")
+ session.queue_declare(queue="whatever", exclusive=True, auto_delete=True)
+
+ def test_binding_count_on_queue(self):
+ self.startQmf()
+ conn = self.connect()
+ session = self.session
+
+ QUEUE = "binding_test_queue"
+ EX_DIR = "binding_test_exchange_direct"
+ EX_FAN = "binding_test_exchange_fanout"
+ EX_TOPIC = "binding_test_exchange_topic"
+ EX_HDR = "binding_test_exchange_headers"
+
+ #
+ # Create a test queue
+ #
+ session.queue_declare(queue=QUEUE, exclusive=True, auto_delete=True)
+ queue = self.qmf.getObjects(_class="queue", name=QUEUE)[0]
+ if not queue:
+ self.fail("Queue not found")
+ self.assertEqual(queue.bindingCount, 1, "wrong initial binding count")
+
+ #
+ # Create an exchange of each supported type
+ #
+ session.exchange_declare(exchange=EX_DIR, type="direct")
+ session.exchange_declare(exchange=EX_FAN, type="fanout")
+ session.exchange_declare(exchange=EX_TOPIC, type="topic")
+ session.exchange_declare(exchange=EX_HDR, type="headers")
+
+ #
+ # Bind each exchange to the test queue
+ #
+ match = {}
+ match['x-match'] = "all"
+ match['key'] = "value"
+ session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key1")
+ session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key2")
+ session.exchange_bind(exchange=EX_FAN, queue=QUEUE)
+ session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key1.#")
+ session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#")
+ session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key1", arguments=match)
+ match['key2'] = "value2"
+ session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key2", arguments=match)
+
+ #
+ # Verify that the queue's binding count accounts for the new bindings
+ #
+ queue.update()
+ self.assertEqual(queue.bindingCount, 8,
+ "added bindings not accounted for (expected 8, got %d)" % queue.bindingCount)
+
+ #
+ # Remove some of the bindings
+ #
+ session.exchange_unbind(exchange=EX_DIR, queue=QUEUE, binding_key="key2")
+ session.exchange_unbind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#")
+ session.exchange_unbind(exchange=EX_HDR, queue=QUEUE, binding_key="key2")
+
+ #
+ # Verify that the queue's binding count accounts for the deleted bindings
+ #
+ queue.update()
+ self.assertEqual(queue.bindingCount, 5,
+ "deleted bindings not accounted for (expected 5, got %d)" % queue.bindingCount)
+ #
+ # Delete the exchanges
+ #
+ session.exchange_delete(exchange=EX_DIR)
+ session.exchange_delete(exchange=EX_FAN)
+ session.exchange_delete(exchange=EX_TOPIC)
+ session.exchange_delete(exchange=EX_HDR)
+
+ #
+ # Verify that the queue's binding count accounts for the lost bindings
+ #
+ queue.update()
+ self.assertEqual(queue.bindingCount, 1,
+ "deleted bindings not accounted for (expected 1, got %d)" % queue.bindingCount)
+
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,918 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message, RangedSet
+from qpid.session import SessionException
+
+from qpid.content import Content
+from time import sleep
+
+class MessageTests(TestBase010):
+ """Tests for 'methods' on the amqp message 'class'"""
+
+ def test_no_local(self):
+ """
+ NOTE: this is a test of a QPID specific feature
+
+ Test that the qpid specific no_local arg is honoured.
+ """
+ session = self.session
+ #setup, declare two queues one of which excludes delivery of locally sent messages
+ session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True, arguments={'no-local':'true'})
+ #establish two consumers
+ self.subscribe(destination="local_included", queue="test-queue-1a")
+ self.subscribe(destination="local_excluded", queue="test-queue-1b")
+
+ #send a message
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me"))
+
+ #send a message from another session on the same connection to each queue
+ session2 = self.conn.session("my-local-session")
+ session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well"))
+ session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either"))
+
+ #send a message from a session on another connection to each queue
+ for q in ["test-queue-1a", "test-queue-1b"]:
+ session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key")
+ other = self.connect()
+ session3 = other.session("my-other-session")
+ session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local"))
+ other.close()
+
+ #check the queues of the two consumers
+ excluded = session.incoming("local_excluded")
+ included = session.incoming("local_included")
+ for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]:
+ msg = included.get(timeout=1)
+ self.assertEqual(b, msg.body)
+ msg = excluded.get(timeout=1)
+ self.assertEqual("i-am-not-local", msg.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received locally published message though no_local=true")
+ except Empty: None
+
+ def test_no_local_awkward(self):
+
+ """
+ NOTE: this is a test of a QPID specific feature
+
+ Check that messages which will be excluded through no-local
+ processing will not block subsequent deliveries
+ """
+
+ session = self.session
+ #setup:
+ session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'no-local':'true'})
+ #establish consumer which excludes delivery of locally sent messages
+ self.subscribe(destination="local_excluded", queue="test-queue")
+
+ #send a 'local' message
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "local"))
+
+ #send a non local message
+ other = self.connect()
+ session2 = other.session("my-session", 1)
+ session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue"), "foreign"))
+ session2.close()
+ other.close()
+
+ #check that the second message only is delivered
+ excluded = session.incoming("local_excluded")
+ msg = excluded.get(timeout=1)
+ self.assertEqual("foreign", msg.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received extra message")
+ except Empty: None
+ #check queue is empty
+ self.assertEqual(0, session.queue_query(queue="test-queue").message_count)
+
+ def test_no_local_exclusive_subscribe(self):
+ """
+ NOTE: this is a test of a QPID specific feature
+
+ Test that the no_local processing works on queues not declared
+ as exclusive, but with an exclusive subscription
+ """
+ session = self.session
+
+ #setup, declare two queues one of which excludes delivery of
+ #locally sent messages but is not declared as exclusive
+ session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="test-queue-1b", auto_delete=True, arguments={'no-local':'true'})
+ #establish two consumers
+ self.subscribe(destination="local_included", queue="test-queue-1a")
+ self.subscribe(destination="local_excluded", queue="test-queue-1b", exclusive=True)
+
+ #send a message from the same session to each queue
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me"))
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me"))
+
+ #send a message from another session on the same connection to each queue
+ session2 = self.conn.session("my-session")
+ session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well"))
+ session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either"))
+
+ #send a message from a session on another connection to each queue
+ for q in ["test-queue-1a", "test-queue-1b"]:
+ session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key")
+ other = self.connect()
+ session3 = other.session("my-other-session")
+ session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local"))
+ other.close()
+
+ #check the queues of the two consumers
+ excluded = session.incoming("local_excluded")
+ included = session.incoming("local_included")
+ for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]:
+ msg = included.get(timeout=1)
+ self.assertEqual(b, msg.body)
+ msg = excluded.get(timeout=1)
+ self.assertEqual("i-am-not-local", msg.body)
+ try:
+ excluded.get(timeout=1)
+ self.fail("Received locally published message though no_local=true")
+ except Empty: None
+
+
+ def test_consume_exclusive(self):
+ """
+ Test an exclusive consumer prevents other consumer being created
+ """
+ session = self.session
+ session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
+ session.message_subscribe(destination="first", queue="test-queue-2", exclusive=True)
+ try:
+ session.message_subscribe(destination="second", queue="test-queue-2")
+ self.fail("Expected consume request to fail due to previous exclusive consumer")
+ except SessionException, e:
+ self.assertEquals(405, e.args[0].error_code)
+
+ def test_consume_exclusive2(self):
+ """
+ Check that an exclusive consumer cannot be created if a consumer already exists:
+ """
+ session = self.session
+ session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
+ session.message_subscribe(destination="first", queue="test-queue-2")
+ try:
+ session.message_subscribe(destination="second", queue="test-queue-2", exclusive=True)
+ self.fail("Expected exclusive consume request to fail due to previous consumer")
+ except SessionException, e:
+ self.assertEquals(405, e.args[0].error_code)
+
+ def test_consume_queue_not_found(self):
+ """
+ Test error conditions associated with the queue field of the consume method:
+ """
+ session = self.session
+ try:
+ #queue specified but doesn't exist:
+ session.message_subscribe(queue="invalid-queue", destination="a")
+ self.fail("Expected failure when consuming from non-existent queue")
+ except SessionException, e:
+ self.assertEquals(404, e.args[0].error_code)
+
+ def test_consume_queue_not_specified(self):
+ session = self.session
+ try:
+ #queue not specified and none previously declared for channel:
+ session.message_subscribe(destination="a")
+ self.fail("Expected failure when consuming from unspecified queue")
+ except SessionException, e:
+ self.assertEquals(531, e.args[0].error_code)
+
+ def test_consume_unique_consumers(self):
+ """
+ Ensure unique consumer tags are enforced
+ """
+ session = self.session
+ #setup, declare a queue:
+ session.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True)
+
+ #check that attempts to use duplicate tags are detected and prevented:
+ session.message_subscribe(destination="first", queue="test-queue-3")
+ try:
+ session.message_subscribe(destination="first", queue="test-queue-3")
+ self.fail("Expected consume request to fail due to non-unique tag")
+ except SessionException, e:
+ self.assertEquals(530, e.args[0].error_code)
+
+ def test_cancel(self):
+ """
+ Test compliance of the basic.cancel method
+ """
+ session = self.session
+ #setup, declare a queue:
+ session.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "One"))
+
+ session.message_subscribe(destination="my-consumer", queue="test-queue-4")
+ myqueue = session.incoming("my-consumer")
+ session.message_flow(destination="my-consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="my-consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+
+ #should flush here
+
+ #cancel should stop messages being delivered
+ session.message_cancel(destination="my-consumer")
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "Two"))
+ msg = myqueue.get(timeout=1)
+ self.assertEqual("One", msg.body)
+ try:
+ msg = myqueue.get(timeout=1)
+ self.fail("Got message after cancellation: " + msg)
+ except Empty: None
+
+ #cancellation of non-existant consumers should be handled without error
+ session.message_cancel(destination="my-consumer")
+ session.message_cancel(destination="this-never-existed")
+
+
+ def test_ack(self):
+ """
+ Test basic ack/recover behaviour
+ """
+ session = self.conn.session("alternate-session", timeout=10)
+ session.queue_declare(queue="test-ack-queue", auto_delete=True)
+
+ session.message_subscribe(queue = "test-ack-queue", destination = "consumer")
+ session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session.incoming("consumer")
+
+ delivery_properties = session.delivery_properties(routing_key="test-ack-queue")
+ for i in ["One", "Two", "Three", "Four", "Five"]:
+ session.message_transfer(message=Message(delivery_properties, i))
+
+ msg1 = queue.get(timeout=1)
+ msg2 = queue.get(timeout=1)
+ msg3 = queue.get(timeout=1)
+ msg4 = queue.get(timeout=1)
+ msg5 = queue.get(timeout=1)
+
+ self.assertEqual("One", msg1.body)
+ self.assertEqual("Two", msg2.body)
+ self.assertEqual("Three", msg3.body)
+ self.assertEqual("Four", msg4.body)
+ self.assertEqual("Five", msg5.body)
+
+ session.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four
+
+ #subscribe from second session here to ensure queue is not
+ #auto-deleted when alternate session closes (no need to ack on these):
+ self.session.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1)
+
+ #now close the session, and see that the unacked messages are
+ #then redelivered to another subscriber:
+ session.close(timeout=10)
+
+ session = self.session
+ session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session.incoming("checker")
+
+ msg3b = queue.get(timeout=1)
+ msg5b = queue.get(timeout=1)
+
+ self.assertEqual("Three", msg3b.body)
+ self.assertEqual("Five", msg5b.body)
+
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Got unexpected message: " + extra.body)
+ except Empty: None
+
+ def test_reject(self):
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout")
+ session.queue_declare(queue = "r", exclusive=True, auto_delete=True)
+ session.exchange_bind(queue = "r", exchange = "amq.fanout")
+
+ session.message_subscribe(queue = "q", destination = "consumer")
+ session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "blah, blah"))
+ msg = session.incoming("consumer").get(timeout = 1)
+ self.assertEquals(msg.body, "blah, blah")
+ session.message_reject(RangedSet(msg.id))
+
+ session.message_subscribe(queue = "r", destination = "checker")
+ session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ msg = session.incoming("checker").get(timeout = 1)
+ self.assertEquals(msg.body, "blah, blah")
+
+ def test_credit_flow_messages(self):
+ """
+ Test basic credit based flow control with unit = message
+ """
+ #declare an exclusive queue
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer (for now that defaults to infinite credit)
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_set_flow_mode(flow_mode = 0, destination = "c")
+ #send batch of messages to queue
+ for i in range(1, 11):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i))
+
+ #set message credit to finite amount (less than enough for all messages)
+ session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
+ #set infinite byte credit
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
+ #check that expected number were received
+ q = session.incoming("c")
+ for i in range(1, 6):
+ self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+ #increase credit again and check more are received
+ for i in range(6, 11):
+ session.message_flow(unit = session.credit_unit.message, value = 1, destination = "c")
+ self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+ def test_credit_flow_bytes(self):
+ """
+ Test basic credit based flow control with unit = bytes
+ """
+ #declare an exclusive queue
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer (for now that defaults to infinite credit)
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_set_flow_mode(flow_mode = 0, destination = "c")
+ #send batch of messages to queue
+ for i in range(10):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh"))
+
+ #each message is currently interpreted as requiring msg_size bytes of credit
+ msg_size = 19
+
+ #set byte credit to finite amount (less than enough for all messages)
+ session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c")
+ #set infinite message credit
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "c")
+ #check that expected number were received
+ q = session.incoming("c")
+ for i in range(5):
+ self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+ #increase credit again and check more are received
+ for i in range(5):
+ session.message_flow(unit = session.credit_unit.byte, value = msg_size, destination = "c")
+ self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+
+ def test_window_flow_messages(self):
+ """
+ Test basic window based flow control with unit = message
+ """
+ #declare an exclusive queue
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer (for now that defaults to infinite credit)
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_set_flow_mode(flow_mode = 1, destination = "c")
+ #send batch of messages to queue
+ for i in range(1, 11):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i))
+
+ #set message credit to finite amount (less than enough for all messages)
+ session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
+ #set infinite byte credit
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
+ #check that expected number were received
+ q = session.incoming("c")
+ for i in range(1, 6):
+ msg = q.get(timeout = 1)
+ session.receiver._completed.add(msg.id)#TODO: this may be done automatically
+ self.assertDataEquals(session, msg, "Message %d" % i)
+ self.assertEmpty(q)
+
+ #acknowledge messages and check more are received
+ #TODO: there may be a nicer way of doing this
+ session.channel.session_completed(session.receiver._completed)
+
+ for i in range(6, 11):
+ self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
+ self.assertEmpty(q)
+
+
+ def test_window_flow_bytes(self):
+ """
+ Test basic window based flow control with unit = bytes
+ """
+ #declare an exclusive queue
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer (for now that defaults to infinite credit)
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_set_flow_mode(flow_mode = 1, destination = "c")
+ #send batch of messages to queue
+ for i in range(10):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh"))
+
+ #each message is currently interpreted as requiring msg_size bytes of credit
+ msg_size = 19
+
+ #set byte credit to finite amount (less than enough for all messages)
+ session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c")
+ #set infinite message credit
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "c")
+ #check that expected number were received
+ q = session.incoming("c")
+ msgs = []
+ for i in range(5):
+ msg = q.get(timeout = 1)
+ msgs.append(msg)
+ self.assertDataEquals(session, msg, "abcdefgh")
+ self.assertEmpty(q)
+
+ #ack each message individually and check more are received
+ for i in range(5):
+ msg = msgs.pop()
+ #TODO: there may be a nicer way of doing this
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
+ self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
+ self.assertEmpty(q)
+
+ def test_window_flush_ack_flow(self):
+ """
+ Test basic window based flow control with unit = bytes
+ """
+ #declare an exclusive queue
+ ssn = self.session
+ ssn.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ #create consumer
+ ssn.message_subscribe(queue = "q", destination = "c",
+ accept_mode=ssn.accept_mode.explicit)
+ ssn.message_set_flow_mode(flow_mode = ssn.flow_mode.window, destination = "c")
+
+ #send message A
+ ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "A"))
+
+ for unit in ssn.credit_unit.VALUES:
+ ssn.message_flow("c", unit, 0xFFFFFFFFL)
+
+ q = ssn.incoming("c")
+ msgA = q.get(timeout=10)
+
+ ssn.message_flush(destination="c")
+
+ # XXX
+ ssn.receiver._completed.add(msgA.id)
+ ssn.channel.session_completed(ssn.receiver._completed)
+ ssn.message_accept(RangedSet(msgA.id))
+
+ for unit in ssn.credit_unit.VALUES:
+ ssn.message_flow("c", unit, 0xFFFFFFFFL)
+
+ #send message B
+ ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "B"))
+
+ msgB = q.get(timeout=10)
+
+ def test_subscribe_not_acquired(self):
+ """
+ Test the not-acquired modes works as expected for a simple case
+ """
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range(1, 6):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))
+
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1)
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "b")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b")
+
+ for i in range(6, 11):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))
+
+ #both subscribers should see all messages
+ qA = session.incoming("a")
+ qB = session.incoming("b")
+ for i in range(1, 11):
+ for q in [qA, qB]:
+ msg = q.get(timeout = 1)
+ self.assertEquals("Message %s" % i, msg.body)
+ #TODO: tidy up completion
+ session.receiver._completed.add(msg.id)
+
+ #TODO: tidy up completion
+ session.channel.session_completed(session.receiver._completed)
+ #messages should still be on the queue:
+ self.assertEquals(10, session.queue_query(queue = "q").message_count)
+
+ def test_acquire_with_no_accept_and_credit_flow(self):
+ """
+ Test that messages recieved unacquired, with accept not
+ required in windowing mode can be acquired.
+ """
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
+
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, accept_mode = 1)
+ session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ msg = session.incoming("a").get(timeout = 1)
+ self.assertEquals("acquire me", msg.body)
+ #message should still be on the queue:
+ self.assertEquals(1, session.queue_query(queue = "q").message_count)
+
+ transfers = RangedSet(msg.id)
+ response = session.message_acquire(transfers)
+ #check that we get notification (i.e. message_acquired)
+ self.assert_(msg.id in response.transfers)
+ #message should have been removed from the queue:
+ self.assertEquals(0, session.queue_query(queue = "q").message_count)
+
+ def test_acquire(self):
+ """
+ Test explicit acquire function
+ """
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
+
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
+ session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ msg = session.incoming("a").get(timeout = 1)
+ self.assertEquals("acquire me", msg.body)
+ #message should still be on the queue:
+ self.assertEquals(1, session.queue_query(queue = "q").message_count)
+
+ transfers = RangedSet(msg.id)
+ response = session.message_acquire(transfers)
+ #check that we get notification (i.e. message_acquired)
+ self.assert_(msg.id in response.transfers)
+ #message should have been removed from the queue:
+ self.assertEquals(0, session.queue_query(queue = "q").message_count)
+ session.message_accept(transfers)
+
+
+ def test_release(self):
+ """
+ Test explicit release function
+ """
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "release me"))
+
+ session.message_subscribe(queue = "q", destination = "a")
+ session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ msg = session.incoming("a").get(timeout = 1)
+ self.assertEquals("release me", msg.body)
+ session.message_cancel(destination = "a")
+ session.message_release(RangedSet(msg.id))
+
+ #message should not have been removed from the queue:
+ self.assertEquals(1, session.queue_query(queue = "q").message_count)
+
+ def test_release_ordering(self):
+ """
+ Test order of released messages is as expected
+ """
+ session = self.session
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range (1, 11):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "released message %s" % (i)))
+
+ session.message_subscribe(queue = "q", destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ queue = session.incoming("a")
+ first = queue.get(timeout = 1)
+ for i in range(2, 10):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("released message %s" % (i), msg.body)
+
+ last = queue.get(timeout = 1)
+ self.assertEmpty(queue)
+ released = RangedSet()
+ released.add(first.id, last.id)
+ session.message_release(released)
+
+ #TODO: may want to clean this up...
+ session.receiver._completed.add(first.id, last.id)
+ session.channel.session_completed(session.receiver._completed)
+
+ for i in range(1, 11):
+ self.assertEquals("released message %s" % (i), queue.get(timeout = 1).body)
+
+ def test_ranged_ack(self):
+ """
+ Test acking of messages ranges
+ """
+ session = self.conn.session("alternate-session", timeout=10)
+
+ session.queue_declare(queue = "q", auto_delete=True)
+ delivery_properties = session.delivery_properties(routing_key="q")
+ for i in range (1, 11):
+ session.message_transfer(message=Message(delivery_properties, "message %s" % (i)))
+
+ session.message_subscribe(queue = "q", destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ queue = session.incoming("a")
+ ids = []
+ for i in range (1, 11):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message %s" % (i), msg.body)
+ ids.append(msg.id)
+
+ self.assertEmpty(queue)
+
+ #ack all but the fourth message (command id 2)
+ accepted = RangedSet()
+ accepted.add(ids[0], ids[2])
+ accepted.add(ids[4], ids[9])
+ session.message_accept(accepted)
+
+ #subscribe from second session here to ensure queue is not
+ #auto-deleted when alternate session closes (no need to ack on these):
+ self.session.message_subscribe(queue = "q", destination = "checker")
+
+ #now close the session, and see that the unacked messages are
+ #then redelivered to another subscriber:
+ session.close(timeout=10)
+
+ session = self.session
+ session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+ session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+ queue = session.incoming("checker")
+
+ self.assertEquals("message 4", queue.get(timeout = 1).body)
+ self.assertEmpty(queue)
+
+ def test_subscribe_not_acquired_2(self):
+ session = self.session
+
+ #publish some messages
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range(1, 11):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
+
+ #consume some of them
+ session.message_subscribe(queue = "q", destination = "a")
+ session.message_set_flow_mode(flow_mode = 0, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+
+ queue = session.incoming("a")
+ for i in range(1, 6):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+ #complete and accept
+ session.message_accept(RangedSet(msg.id))
+ #TODO: tidy up completion
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
+ self.assertEmpty(queue)
+
+ #now create a not-acquired subscriber
+ session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b")
+
+ #check it gets those not consumed
+ queue = session.incoming("b")
+ session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
+ for i in range(6, 11):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+ session.message_release(RangedSet(msg.id))
+ #TODO: tidy up completion
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
+ session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
+ self.assertEmpty(queue)
+
+ #check all 'browsed' messages are still on the queue
+ self.assertEqual(5, session.queue_query(queue="q").message_count)
+
+ def test_subscribe_not_acquired_3(self):
+ session = self.session
+
+ #publish some messages
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+ for i in range(1, 11):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
+
+ #create a not-acquired subscriber
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode=1)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+
+ #browse through messages
+ queue = session.incoming("a")
+ for i in range(1, 11):
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+ if (i % 2):
+ #try to acquire every second message
+ response = session.message_acquire(RangedSet(msg.id))
+ #check that acquire succeeds
+ self.assert_(msg.id in response.transfers)
+ session.message_accept(RangedSet(msg.id))
+ else:
+ session.message_release(RangedSet(msg.id))
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
+ self.assertEmpty(queue)
+
+ #create a second not-acquired subscriber
+ session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b")
+ session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
+ #check it gets those not consumed
+ queue = session.incoming("b")
+ for i in [2,4,6,8,10]:
+ msg = queue.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+ session.message_release(RangedSet(msg.id))
+ session.receiver._completed.add(msg.id)
+ session.channel.session_completed(session.receiver._completed)
+ session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
+ self.assertEmpty(queue)
+
+ #check all 'browsed' messages are still on the queue
+ self.assertEqual(5, session.queue_query(queue="q").message_count)
+
+ def test_release_unacquired(self):
+ session = self.session
+
+ #create queue
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+ #send message
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "my-message"))
+
+ #create two 'browsers'
+ session.message_subscribe(queue = "q", destination = "a", acquire_mode=1)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+ queueA = session.incoming("a")
+
+ session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "b")
+ queueB = session.incoming("b")
+
+ #have each browser release the message
+ msgA = queueA.get(timeout = 1)
+ session.message_release(RangedSet(msgA.id))
+
+ msgB = queueB.get(timeout = 1)
+ session.message_release(RangedSet(msgB.id))
+
+ #cancel browsers
+ session.message_cancel(destination = "a")
+ session.message_cancel(destination = "b")
+
+ #create consumer
+ session.message_subscribe(queue = "q", destination = "c")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "c")
+ queueC = session.incoming("c")
+ #consume the message then ack it
+ msgC = queueC.get(timeout = 1)
+ session.message_accept(RangedSet(msgC.id))
+ #ensure there are no other messages
+ self.assertEmpty(queueC)
+
+ def test_release_order(self):
+ session = self.session
+
+ #create queue
+ session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+ #send messages
+ for i in range(1, 11):
+ session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
+
+ #subscribe:
+ session.message_subscribe(queue="q", destination="a")
+ a = session.incoming("a")
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+
+ for i in range(1, 11):
+ msg = a.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+ if (i % 2):
+ #accept all odd messages
+ session.message_accept(RangedSet(msg.id))
+ else:
+ #release all even messages
+ session.message_release(RangedSet(msg.id))
+
+ #browse:
+ session.message_subscribe(queue="q", destination="b", acquire_mode=1)
+ b = session.incoming("b")
+ b.start()
+ for i in [2, 4, 6, 8, 10]:
+ msg = b.get(timeout = 1)
+ self.assertEquals("message-%d" % (i), msg.body)
+
+
+ def test_empty_body(self):
+ session = self.session
+ session.queue_declare(queue="xyz", exclusive=True, auto_delete=True)
+ props = session.delivery_properties(routing_key="xyz")
+ session.message_transfer(message=Message(props, ""))
+
+ consumer_tag = "tag1"
+ session.message_subscribe(queue="xyz", destination=consumer_tag)
+ session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = consumer_tag)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
+ queue = session.incoming(consumer_tag)
+ msg = queue.get(timeout=1)
+ self.assertEquals("", msg.body)
+ session.message_accept(RangedSet(msg.id))
+
+ def test_incoming_start(self):
+ q = "test_incoming_start"
+ session = self.session
+
+ session.queue_declare(queue=q, exclusive=True, auto_delete=True)
+ session.message_subscribe(queue=q, destination="msgs")
+ messages = session.incoming("msgs")
+ assert messages.destination == "msgs"
+
+ dp = session.delivery_properties(routing_key=q)
+ session.message_transfer(message=Message(dp, "test"))
+
+ messages.start()
+ msg = messages.get()
+ assert msg.body == "test"
+
+ def test_ttl(self):
+ q = "test_ttl"
+ session = self.session
+
+ session.queue_declare(queue=q, exclusive=True, auto_delete=True)
+
+ dp = session.delivery_properties(routing_key=q, ttl=500)#expire in half a second
+ session.message_transfer(message=Message(dp, "first"))
+
+ dp = session.delivery_properties(routing_key=q, ttl=300000)#expire in fives minutes
+ session.message_transfer(message=Message(dp, "second"))
+
+ d = "msgs"
+ session.message_subscribe(queue=q, destination=d)
+ messages = session.incoming(d)
+ sleep(1)
+ session.message_flow(unit = session.credit_unit.message, value=2, destination=d)
+ session.message_flow(unit = session.credit_unit.byte, value=0xFFFFFFFFL, destination=d)
+ assert messages.get(timeout=1).body == "second"
+ self.assertEmpty(messages)
+
+
+ def assertDataEquals(self, session, msg, expected):
+ self.assertEquals(expected, msg.body)
+
+ def assertEmpty(self, queue):
+ try:
+ extra = queue.get(timeout=1)
+ self.fail("Queue not empty, contains: " + extra.body)
+ except Empty: None
+
+class SizelessContent(Content):
+
+ def size(self):
+ return None
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.datatypes import Message, RangedSet
+#from qpid.testlib import testrunner, TestBase010
+from qpid.testlib import TestBase010
+
+class PersistenceTests(TestBase010):
+ def test_delete_queue_after_publish(self):
+ session = self.session
+ session.auto_sync = False
+
+ #create queue
+ session.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+ #send message
+ for i in range(1, 10):
+ dp = session.delivery_properties(routing_key="q", delivery_mode=2)
+ session.message_transfer(message=Message(dp, "my-message"))
+
+ session.auto_sync = True
+ #explicitly delete queue
+ session.queue_delete(queue = "q")
+
+ def test_ack_message_from_deleted_queue(self):
+ session = self.session
+ session.auto_sync = False
+
+ #create queue
+ session.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+ #send message
+ dp = session.delivery_properties(routing_key="q", delivery_mode=2)
+ session.message_transfer(message=Message(dp, "my-message"))
+
+ #create consumer
+ session.message_subscribe(queue = "q", destination = "a", accept_mode = 1, acquire_mode=0)
+ session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+ queue = session.incoming("a")
+
+ #consume the message, cancel subscription (triggering auto-delete), then ack it
+ msg = queue.get(timeout = 5)
+ session.message_cancel(destination = "a")
+ session.message_accept(RangedSet(msg.id))
+
+ def test_queue_deletion(self):
+ session = self.session
+ session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
+ session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz")
+ dp = session.delivery_properties(routing_key="xyz", delivery_mode=2)
+ session.message_transfer(destination="amq.topic", message=Message(dp, "my-message"))
+ session.queue_delete(queue = "durable-subscriber-queue")
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py
------------------------------------------------------------------------------
svn:eol-style = native
Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/query.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/query.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/query.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/query.py Fri Feb 5 15:08:44 2010
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase010
+
+class QueryTests(TestBase010):
+ """Tests for various query methods"""
+
+ def test_queue_query(self):
+ session = self.session
+ session.queue_declare(queue="my-queue", exclusive=True)
+ result = session.queue_query(queue="my-queue")
+ self.assertEqual("my-queue", result.queue)
+
+ def test_queue_query_unknown(self):
+ session = self.session
+ result = session.queue_query(queue="I don't exist")
+ self.assert_(not result.queue)
+
+ def test_exchange_query(self):
+ """
+ Test that the exchange_query method works as expected
+ """
+ session = self.session
+ #check returned type for the standard exchanges
+ self.assertEqual("direct", session.exchange_query(name="amq.direct").type)
+ self.assertEqual("topic", session.exchange_query(name="amq.topic").type)
+ self.assertEqual("fanout", session.exchange_query(name="amq.fanout").type)
+ self.assertEqual("headers", session.exchange_query(name="amq.match").type)
+ self.assertEqual("direct", session.exchange_query(name="").type)
+ #declare an exchange
+ session.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False)
+ #check that the result of a query is as expected
+ response = session.exchange_query(name="my-test-exchange")
+ self.assertEqual("direct", response.type)
+ self.assert_(not response.durable)
+ self.assert_(not response.not_found)
+ #delete the exchange
+ session.exchange_delete(exchange="my-test-exchange")
+ #check that the query now reports not-found
+ self.assert_(session.exchange_query(name="my-test-exchange").not_found)
+
+ def test_exchange_bound_direct(self):
+ """
+ Test that the exchange_bound method works as expected with the direct exchange
+ """
+ self.exchange_bound_with_key("amq.direct")
+
+ def test_exchange_bound_topic(self):
+ """
+ Test that the exchange_bound method works as expected with the direct exchange
+ """
+ self.exchange_bound_with_key("amq.topic")
+
+ def exchange_bound_with_key(self, exchange_name):
+ session = self.session
+ #setup: create two queues
+ session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+
+ session.exchange_bind(exchange=exchange_name, queue="used-queue", binding_key="used-key")
+
+ # test detection of any binding to specific queue
+ response = session.exchange_bound(exchange=exchange_name, queue="used-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
+
+ # test detection of specific binding to any queue
+ response = session.exchange_bound(exchange=exchange_name, binding_key="used-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.key_not_matched)
+
+ # test detection of specific binding to specific queue
+ response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="used-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
+ self.assert_(not response.key_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = session.exchange_bound(exchange=exchange_name, queue="unused-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ # test unspecified queue, unmatched binding
+ response = session.exchange_bound(exchange=exchange_name, binding_key="unused-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.key_not_matched)
+
+ # test matched queue, unmatched binding
+ response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="unused-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
+ self.assertEqual(True, response.key_not_matched)
+
+ # test unmatched queue, matched binding
+ response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="used-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assert_(not response.key_not_matched)
+
+ # test unmatched queue, unmatched binding
+ response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="unused-key")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(True, response.key_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found)
+
+ #test exchange found, queue not found
+ response = session.exchange_bound(exchange=exchange_name, queue="unknown-queue")
+ self.assertEqual(False, response.exchange_not_found)
+ self.assertEqual(True, response.queue_not_found)
+
+ #test exchange not found, queue found
+ response = session.exchange_bound(exchange="unknown-exchange", queue="used-queue")
+ self.assertEqual(True, response.exchange_not_found)
+ self.assertEqual(False, response.queue_not_found)
+
+ #test not exchange found, queue not found
+ response = session.exchange_bound(exchange="unknown-exchange", queue="unknown-queue")
+ self.assertEqual(True, response.exchange_not_found)
+ self.assertEqual(True, response.queue_not_found)
+
+
+ def test_exchange_bound_fanout(self):
+ """
+ Test that the exchange_bound method works as expected with fanout exchange
+ """
+ session = self.session
+ #setup
+ session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(exchange="amq.fanout", queue="used-queue")
+
+ # test detection of any binding to specific queue
+ response = session.exchange_bound(exchange="amq.fanout", queue="used-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = session.exchange_bound(exchange="amq.fanout", queue="unused-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found)
+
+ #test queue not found
+ self.assertEqual(True, session.exchange_bound(exchange="amq.fanout", queue="unknown-queue").queue_not_found)
+
+ def test_exchange_bound_header(self):
+ """
+ Test that the exchange_bound method works as expected with headers exchanges
+ """
+ session = self.session
+ #setup
+ session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+ session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+ session.exchange_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )
+
+ # test detection of any binding to specific queue
+ response = session.exchange_bound(exchange="amq.match", queue="used-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
+
+ # test detection of specific binding to any queue
+ response = session.exchange_bound(exchange="amq.match", arguments={"x-match":"all", "a":"A"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.args_not_matched)
+
+ # test detection of specific binding to specific queue
+ response = session.exchange_bound(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
+ self.assert_(not response.args_not_matched)
+
+ # test unmatched queue, unspecified binding
+ response = session.exchange_bound(exchange="amq.match", queue="unused-queue")
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+
+ # test unspecified queue, unmatched binding
+ response = session.exchange_bound(exchange="amq.match", arguments={"x-match":"all", "b":"B"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.args_not_matched)
+
+ # test matched queue, unmatched binding
+ response = session.exchange_bound(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assert_(not response.queue_not_matched)
+ self.assertEqual(True, response.args_not_matched)
+
+ # test unmatched queue, matched binding
+ response = session.exchange_bound(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assert_(not response.args_not_matched)
+
+ # test unmatched queue, unmatched binding
+ response = session.exchange_bound(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"})
+ self.assert_(not response.exchange_not_found)
+ self.assert_(not response.queue_not_found)
+ self.assertEqual(True, response.queue_not_matched)
+ self.assertEqual(True, response.args_not_matched)
+
+ #test exchange not found
+ self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found)
+
+ #test queue not found
+ self.assertEqual(True, session.exchange_bound(exchange="amq.match", queue="unknown-queue").queue_not_found)
+
Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/query.py
------------------------------------------------------------------------------
svn:eol-style = native
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org