You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2010/02/05 16:08:46 UTC

svn commit: r906961 [2/4] - in /qpid/trunk/qpid: cpp/src/tests/ python/ python/tests_0-10/ python/tests_0-8/ python/tests_0-9/ tests/ tests/src/ tests/src/py/ tests/src/py/qpid_tests/ tests/src/py/qpid_tests/broker_0_10/ tests/src/py/qpid_tests/broker_...

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,467 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from qpid.datatypes import Message, RangedSet
+from qpid.testlib import TestBase010
+from qpid.management import managementChannel, managementClient
+from threading import Condition
+from time import sleep
+import qmf.console
+
+class ManagementTest (TestBase010):
+    """
+    Tests for the management hooks
+    """
+
+    def test_broker_connectivity_oldAPI (self):
+        """
+        Call the "echo" method on the broker to verify it is alive and talking.
+        """
+        session = self.session
+ 
+        mc  = managementClient ()
+        mch = mc.addChannel (session)
+
+        mc.syncWaitForStable (mch)
+        brokers = mc.syncGetObjects (mch, "broker")
+        self.assertEqual (len (brokers), 1)
+        broker = brokers[0]
+        args = {}
+        body = "Echo Message Body"
+        args["body"] = body
+
+        for seq in range (1, 5):
+            args["sequence"] = seq
+            res = mc.syncCallMethod (mch, broker.id, broker.classKey, "echo", args)
+            self.assertEqual (res.status,     0)
+            self.assertEqual (res.statusText, "OK")
+            self.assertEqual (res.sequence,   seq)
+            self.assertEqual (res.body,       body)
+        mc.removeChannel (mch)
+
+    def test_methods_sync (self):
+        """
+        Call the "echo" method on the broker to verify it is alive and talking.
+        """
+        session = self.session
+        self.startQmf()
+ 
+        brokers = self.qmf.getObjects(_class="broker")
+        self.assertEqual(len(brokers), 1)
+        broker = brokers[0]
+
+        body = "Echo Message Body"
+        for seq in range(1, 20):
+            res = broker.echo(seq, body)
+            self.assertEqual(res.status, 0)
+            self.assertEqual(res.text, "OK")
+            self.assertEqual(res.sequence, seq)
+            self.assertEqual(res.body, body)
+
+    def test_get_objects(self):
+        self.startQmf()
+
+        # get the package list, verify that the qpid broker package is there
+        packages = self.qmf.getPackages()
+        assert 'org.apache.qpid.broker' in packages
+
+        # get the schema class keys for the broker, verify the broker table and link-down event
+        keys = self.qmf.getClasses('org.apache.qpid.broker')
+        broker = None
+        linkDown = None
+        for key in keys:
+            if key.getClassName() == "broker":  broker = key
+            if key.getClassName() == "brokerLinkDown" : linkDown = key
+        assert broker
+        assert linkDown
+
+        brokerObjs = self.qmf.getObjects(_class="broker")
+        assert len(brokerObjs) == 1
+        brokerObjs = self.qmf.getObjects(_key=broker)
+        assert len(brokerObjs) == 1
+
+    def test_self_session_id (self):
+        self.startQmf()
+        sessionId = self.qmf_broker.getSessionId()
+        brokerSessions = self.qmf.getObjects(_class="session")
+
+        found = False
+        for bs in brokerSessions:
+            if bs.name == sessionId:
+                found = True
+        self.assertEqual (found, True)
+
+    def test_standard_exchanges (self):
+        self.startQmf()
+
+        exchanges = self.qmf.getObjects(_class="exchange")
+        exchange = self.findExchange (exchanges, "")
+        self.assertEqual (exchange.type, "direct")
+        exchange = self.findExchange (exchanges, "amq.direct")
+        self.assertEqual (exchange.type, "direct")
+        exchange = self.findExchange (exchanges, "amq.topic")
+        self.assertEqual (exchange.type, "topic")
+        exchange = self.findExchange (exchanges, "amq.fanout")
+        self.assertEqual (exchange.type, "fanout")
+        exchange = self.findExchange (exchanges, "amq.match")
+        self.assertEqual (exchange.type, "headers")
+        exchange = self.findExchange (exchanges, "qpid.management")
+        self.assertEqual (exchange.type, "topic")
+
+    def findExchange (self, exchanges, name):
+        for exchange in exchanges:
+            if exchange.name == name:
+                return exchange
+        return None
+
+    def test_move_queued_messages(self):
+        """
+        Test ability to move messages from the head of one queue to another.
+        Need to test moveing all and N messages.
+        """
+        self.startQmf()
+        session = self.session
+        "Set up source queue"
+        session.queue_declare(queue="src-queue", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="src-queue", exchange="amq.direct", binding_key="routing_key")
+
+        twenty = range(1,21)
+        props = session.delivery_properties(routing_key="routing_key")
+        for count in twenty:
+            body = "Move Message %d" % count
+            src_msg = Message(props, body)
+            session.message_transfer(destination="amq.direct", message=src_msg)
+
+        "Set up destination queue"
+        session.queue_declare(queue="dest-queue", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="dest-queue", exchange="amq.direct")
+
+        queues = self.qmf.getObjects(_class="queue")
+
+        "Move 10 messages from src-queue to dest-queue"
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
+        self.assertEqual (result.status, 0) 
+
+        sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+        dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
+
+        self.assertEqual (sq.msgDepth,10)
+        self.assertEqual (dq.msgDepth,10)
+
+        "Move all remaining messages to destination"
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
+        self.assertEqual (result.status,0)
+
+        sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+        dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
+
+        self.assertEqual (sq.msgDepth,0)
+        self.assertEqual (dq.msgDepth,20)
+
+        "Use a bad source queue name"
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
+        self.assertEqual (result.status,4)
+
+        "Use a bad destination queue name"
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
+        self.assertEqual (result.status,4)
+
+        " Use a large qty (40) to move from dest-queue back to "
+        " src-queue- should move all "
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
+        self.assertEqual (result.status,0)
+
+        sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
+        dq = self.qmf.getObjects(_class="queue", name="dest-queue")[0]
+
+        self.assertEqual (sq.msgDepth,20)
+        self.assertEqual (dq.msgDepth,0)
+
+        "Consume the messages of the queue and check they are all there in order"
+        session.message_subscribe(queue="src-queue", destination="tag")
+        session.message_flow(destination="tag", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="tag", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        queue = session.incoming("tag")
+        for count in twenty:
+            consumed_msg = queue.get(timeout=1)
+            body = "Move Message %d" % count
+            self.assertEqual(body, consumed_msg.body)
+
+    def test_purge_queue(self):
+        """
+        Test ability to purge messages from the head of a queue.
+        Need to test moveing all, 1 (top message) and N messages.
+        """
+        self.startQmf()
+        session = self.session
+        "Set up purge queue"
+        session.queue_declare(queue="purge-queue", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="purge-queue", exchange="amq.direct", binding_key="routing_key")
+
+        twenty = range(1,21)
+        props = session.delivery_properties(routing_key="routing_key")
+        for count in twenty:
+            body = "Purge Message %d" % count
+            msg = Message(props, body)
+            session.message_transfer(destination="amq.direct", message=msg)
+
+        pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+
+        "Purge top message from purge-queue"
+        result = pq.purge(1)
+        self.assertEqual (result.status, 0) 
+        pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+        self.assertEqual (pq.msgDepth,19)
+
+        "Purge top 9 messages from purge-queue"
+        result = pq.purge(9)
+        self.assertEqual (result.status, 0) 
+        pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+        self.assertEqual (pq.msgDepth,10)
+
+        "Purge all messages from purge-queue"
+        result = pq.purge(0)
+        self.assertEqual (result.status, 0) 
+        pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
+        self.assertEqual (pq.msgDepth,0)
+
+    def test_reroute_queue(self):
+        """
+        Test ability to reroute messages from the head of a queue.
+        Need to test moving all, 1 (top message) and N messages.
+        """
+        self.startQmf()
+        session = self.session
+        "Set up test queue"
+        session.exchange_declare(exchange="alt.direct1", type="direct")
+        session.queue_declare(queue="alt-queue1", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="alt-queue1", exchange="alt.direct1", binding_key="routing_key")
+        session.exchange_declare(exchange="alt.direct2", type="direct")
+        session.queue_declare(queue="alt-queue2", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="alt-queue2", exchange="alt.direct2", binding_key="routing_key")
+        session.queue_declare(queue="reroute-queue", exclusive=True, auto_delete=True, alternate_exchange="alt.direct1")
+        session.exchange_bind(queue="reroute-queue", exchange="amq.direct", binding_key="routing_key")
+
+        twenty = range(1,21)
+        props = session.delivery_properties(routing_key="routing_key")
+        for count in twenty:
+            body = "Reroute Message %d" % count
+            msg = Message(props, body)
+            session.message_transfer(destination="amq.direct", message=msg)
+
+        pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0]
+
+        "Reroute top message from reroute-queue to alternate exchange"
+        result = pq.reroute(1, True, "")
+        self.assertEqual(result.status, 0) 
+        pq.update()
+        aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0]
+        self.assertEqual(pq.msgDepth,19)
+        self.assertEqual(aq.msgDepth,1)
+
+        "Reroute top 9 messages from reroute-queue to alt.direct2"
+        result = pq.reroute(9, False, "alt.direct2")
+        self.assertEqual(result.status, 0) 
+        pq.update()
+        aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
+        self.assertEqual(pq.msgDepth,10)
+        self.assertEqual(aq.msgDepth,9)
+
+        "Reroute using a non-existent exchange"
+        result = pq.reroute(0, False, "amq.nosuchexchange")
+        self.assertEqual(result.status, 4)
+
+        "Reroute all messages from reroute-queue"
+        result = pq.reroute(0, False, "alt.direct2")
+        self.assertEqual(result.status, 0) 
+        pq.update()
+        aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
+        self.assertEqual(pq.msgDepth,0)
+        self.assertEqual(aq.msgDepth,19)
+
+        "Make more messages"
+        twenty = range(1,21)
+        props = session.delivery_properties(routing_key="routing_key")
+        for count in twenty:
+            body = "Reroute Message %d" % count
+            msg = Message(props, body)
+            session.message_transfer(destination="amq.direct", message=msg)
+
+        "Reroute onto the same queue"
+        result = pq.reroute(0, False, "amq.direct")
+        self.assertEqual(result.status, 0) 
+        pq.update()
+        self.assertEqual(pq.msgDepth,20)
+        
+
+    def test_methods_async (self):
+        """
+        """
+        class Handler (qmf.console.Console):
+            def __init__(self):
+                self.cv = Condition()
+                self.xmtList = {}
+                self.rcvList = {}
+
+            def methodResponse(self, broker, seq, response):
+                self.cv.acquire()
+                try:
+                    self.rcvList[seq] = response
+                finally:
+                    self.cv.release()
+
+            def request(self, broker, count):
+                self.count = count
+                for idx in range(count):
+                    self.cv.acquire()
+                    try:
+                        seq = broker.echo(idx, "Echo Message", _async = True)
+                        self.xmtList[seq] = idx
+                    finally:
+                        self.cv.release()
+
+            def check(self):
+                if self.count != len(self.xmtList):
+                    return "fail (attempted send=%d, actual sent=%d)" % (self.count, len(self.xmtList))
+                lost = 0
+                mismatched = 0
+                for seq in self.xmtList:
+                    value = self.xmtList[seq]
+                    if seq in self.rcvList:
+                        result = self.rcvList.pop(seq)
+                        if result.sequence != value:
+                            mismatched += 1
+                    else:
+                        lost += 1
+                spurious = len(self.rcvList)
+                if lost == 0 and mismatched == 0 and spurious == 0:
+                    return "pass"
+                else:
+                    return "fail (lost=%d, mismatch=%d, spurious=%d)" % (lost, mismatched, spurious)
+
+        handler = Handler()
+        self.startQmf(handler)
+        brokers = self.qmf.getObjects(_class="broker")
+        self.assertEqual(len(brokers), 1)
+        broker = brokers[0]
+        handler.request(broker, 20)
+        sleep(1)
+        self.assertEqual(handler.check(), "pass")
+
+    def test_connection_close(self):
+        """
+        Test management method for closing connection
+        """
+        self.startQmf()
+        conn = self.connect()
+        session = conn.session("my-named-session")
+
+        #using qmf find named session and close the corresponding connection:
+        qmf_ssn_object = self.qmf.getObjects(_class="session", name="my-named-session")[0]
+        qmf_ssn_object._connectionRef_.close()
+
+        #check that connection is closed
+        try:
+            conn.session("another-session")
+            self.fail("Expected failure from closed connection")
+        except: None
+        
+        #make sure that the named session has been closed and the name can be re-used
+        conn = self.connect()
+        session = conn.session("my-named-session")
+        session.queue_declare(queue="whatever", exclusive=True, auto_delete=True)
+
+    def test_binding_count_on_queue(self):
+        self.startQmf()
+        conn = self.connect()
+        session = self.session
+
+        QUEUE = "binding_test_queue"
+        EX_DIR = "binding_test_exchange_direct"
+        EX_FAN = "binding_test_exchange_fanout"
+        EX_TOPIC = "binding_test_exchange_topic"
+        EX_HDR = "binding_test_exchange_headers"
+
+        #
+        # Create a test queue
+        #
+        session.queue_declare(queue=QUEUE, exclusive=True, auto_delete=True)
+        queue = self.qmf.getObjects(_class="queue", name=QUEUE)[0]
+        if not queue:
+            self.fail("Queue not found")
+        self.assertEqual(queue.bindingCount, 1, "wrong initial binding count")
+
+        #
+        # Create an exchange of each supported type
+        #
+        session.exchange_declare(exchange=EX_DIR, type="direct")
+        session.exchange_declare(exchange=EX_FAN, type="fanout")
+        session.exchange_declare(exchange=EX_TOPIC, type="topic")
+        session.exchange_declare(exchange=EX_HDR, type="headers")
+
+        #
+        # Bind each exchange to the test queue
+        #
+        match = {}
+        match['x-match'] = "all"
+        match['key'] = "value"
+        session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key1")
+        session.exchange_bind(exchange=EX_DIR, queue=QUEUE, binding_key="key2")
+        session.exchange_bind(exchange=EX_FAN, queue=QUEUE)
+        session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key1.#")
+        session.exchange_bind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#")
+        session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key1", arguments=match)
+        match['key2'] = "value2"
+        session.exchange_bind(exchange=EX_HDR, queue=QUEUE, binding_key="key2", arguments=match)
+
+        #
+        # Verify that the queue's binding count accounts for the new bindings
+        #
+        queue.update()
+        self.assertEqual(queue.bindingCount, 8,
+                         "added bindings not accounted for (expected 8, got %d)" % queue.bindingCount)
+
+        #
+        # Remove some of the bindings
+        #
+        session.exchange_unbind(exchange=EX_DIR, queue=QUEUE, binding_key="key2")
+        session.exchange_unbind(exchange=EX_TOPIC, queue=QUEUE, binding_key="key2.#")
+        session.exchange_unbind(exchange=EX_HDR, queue=QUEUE, binding_key="key2")
+
+        #
+        # Verify that the queue's binding count accounts for the deleted bindings
+        #
+        queue.update()
+        self.assertEqual(queue.bindingCount, 5,
+                         "deleted bindings not accounted for (expected 5, got %d)" % queue.bindingCount)
+        #
+        # Delete the exchanges
+        #
+        session.exchange_delete(exchange=EX_DIR)
+        session.exchange_delete(exchange=EX_FAN)
+        session.exchange_delete(exchange=EX_TOPIC)
+        session.exchange_delete(exchange=EX_HDR)
+
+        #
+        # Verify that the queue's binding count accounts for the lost bindings
+        #
+        queue.update()
+        self.assertEqual(queue.bindingCount, 1,
+                         "deleted bindings not accounted for (expected 1, got %d)" % queue.bindingCount)
+

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/management.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,918 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.testlib import TestBase010
+from qpid.datatypes import Message, RangedSet
+from qpid.session import SessionException
+
+from qpid.content import Content
+from time import sleep
+
+class MessageTests(TestBase010):
+    """Tests for 'methods' on the amqp message 'class'"""
+
+    def test_no_local(self):
+        """
+        NOTE: this is a test of a QPID specific feature
+        
+        Test that the qpid specific no_local arg is honoured.
+        """
+        session = self.session
+        #setup, declare two queues one of which excludes delivery of locally sent messages
+        session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
+        session.queue_declare(queue="test-queue-1b", exclusive=True, auto_delete=True, arguments={'no-local':'true'})
+        #establish two consumers 
+        self.subscribe(destination="local_included", queue="test-queue-1a")
+        self.subscribe(destination="local_excluded", queue="test-queue-1b")
+
+        #send a message
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me"))
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me"))
+
+        #send a message from another session on the same connection to each queue
+        session2 = self.conn.session("my-local-session")
+        session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well"))
+        session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either"))
+
+        #send a message from a session on another connection to each queue
+        for q in ["test-queue-1a", "test-queue-1b"]:
+            session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key")
+        other = self.connect()
+        session3 = other.session("my-other-session")
+        session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local"))
+        other.close()
+
+        #check the queues of the two consumers
+        excluded = session.incoming("local_excluded")
+        included = session.incoming("local_included")
+        for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]:
+            msg = included.get(timeout=1)
+            self.assertEqual(b, msg.body)
+        msg = excluded.get(timeout=1)
+        self.assertEqual("i-am-not-local", msg.body)
+        try:
+            excluded.get(timeout=1)
+            self.fail("Received locally published message though no_local=true")
+        except Empty: None
+
+    def test_no_local_awkward(self):
+
+        """
+        NOTE: this is a test of a QPID specific feature
+        
+        Check that messages which will be excluded through no-local
+        processing will not block subsequent deliveries
+        """
+
+        session = self.session
+        #setup:
+        session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'no-local':'true'})
+        #establish consumer which excludes delivery of locally sent messages
+        self.subscribe(destination="local_excluded", queue="test-queue")
+
+        #send a 'local' message
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"), "local"))
+
+        #send a non local message
+        other = self.connect()
+        session2 = other.session("my-session", 1)
+        session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue"), "foreign"))
+        session2.close()
+        other.close()
+
+        #check that the second message only is delivered
+        excluded = session.incoming("local_excluded")
+        msg = excluded.get(timeout=1)
+        self.assertEqual("foreign", msg.body)
+        try:
+            excluded.get(timeout=1)
+            self.fail("Received extra message")
+        except Empty: None
+        #check queue is empty
+        self.assertEqual(0, session.queue_query(queue="test-queue").message_count)
+
+    def test_no_local_exclusive_subscribe(self):
+        """
+        NOTE: this is a test of a QPID specific feature
+
+        Test that the no_local processing works on queues not declared
+        as exclusive, but with an exclusive subscription
+        """
+        session = self.session
+
+        #setup, declare two queues one of which excludes delivery of
+        #locally sent messages but is not declared as exclusive
+        session.queue_declare(queue="test-queue-1a", exclusive=True, auto_delete=True)
+        session.queue_declare(queue="test-queue-1b", auto_delete=True, arguments={'no-local':'true'})
+        #establish two consumers 
+        self.subscribe(destination="local_included", queue="test-queue-1a")
+        self.subscribe(destination="local_excluded", queue="test-queue-1b", exclusive=True)
+
+        #send a message from the same session to each queue
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1a"), "deliver-me"))
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me"))
+
+        #send a message from another session on the same connection to each queue
+        session2 = self.conn.session("my-session")
+        session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1a"), "deliver-me-as-well"))
+        session2.message_transfer(message=Message(session2.delivery_properties(routing_key="test-queue-1b"), "dont-deliver-me-either"))
+
+        #send a message from a session on another connection to each queue
+        for q in ["test-queue-1a", "test-queue-1b"]:
+            session.exchange_bind(queue=q, exchange="amq.fanout", binding_key="my-key")
+        other = self.connect()
+        session3 = other.session("my-other-session")
+        session3.message_transfer(destination="amq.fanout", message=Message("i-am-not-local"))
+        other.close()
+
+        #check the queues of the two consumers
+        excluded = session.incoming("local_excluded")
+        included = session.incoming("local_included")
+        for b in ["deliver-me", "deliver-me-as-well", "i-am-not-local"]:
+            msg = included.get(timeout=1)
+            self.assertEqual(b, msg.body)
+        msg = excluded.get(timeout=1)
+        self.assertEqual("i-am-not-local", msg.body)
+        try:
+            excluded.get(timeout=1)
+            self.fail("Received locally published message though no_local=true")
+        except Empty: None
+
+
+    def test_consume_exclusive(self):
+        """
+        Test an exclusive consumer prevents other consumer being created
+        """
+        session = self.session
+        session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
+        session.message_subscribe(destination="first", queue="test-queue-2", exclusive=True)
+        try:
+            session.message_subscribe(destination="second", queue="test-queue-2")
+            self.fail("Expected consume request to fail due to previous exclusive consumer")
+        except SessionException, e:
+            self.assertEquals(405, e.args[0].error_code)
+
+    def test_consume_exclusive2(self):
+        """
+        Check that an exclusive consumer cannot be created if a consumer already exists:
+        """
+        session = self.session
+        session.queue_declare(queue="test-queue-2", exclusive=True, auto_delete=True)
+        session.message_subscribe(destination="first", queue="test-queue-2")
+        try:
+            session.message_subscribe(destination="second", queue="test-queue-2", exclusive=True)
+            self.fail("Expected exclusive consume request to fail due to previous consumer")
+        except SessionException, e:
+            self.assertEquals(405, e.args[0].error_code)
+
+    def test_consume_queue_not_found(self):
+        """
+        Test error conditions associated with the queue field of the consume method:
+        """
+        session = self.session
+        try:
+            #queue specified but doesn't exist:
+            session.message_subscribe(queue="invalid-queue", destination="a")
+            self.fail("Expected failure when consuming from non-existent queue")
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
+
+    def test_consume_queue_not_specified(self):
+        session = self.session
+        try:
+            #queue not specified and none previously declared for channel:
+            session.message_subscribe(destination="a")
+            self.fail("Expected failure when consuming from unspecified queue")
+        except SessionException, e:
+            self.assertEquals(531, e.args[0].error_code)
+
+    def test_consume_unique_consumers(self):
+        """
+        Ensure unique consumer tags are enforced
+        """
+        session = self.session
+        #setup, declare a queue:
+        session.queue_declare(queue="test-queue-3", exclusive=True, auto_delete=True)
+
+        #check that attempts to use duplicate tags are detected and prevented:
+        session.message_subscribe(destination="first", queue="test-queue-3")
+        try:
+            session.message_subscribe(destination="first", queue="test-queue-3")
+            self.fail("Expected consume request to fail due to non-unique tag")
+        except SessionException, e:
+            self.assertEquals(530, e.args[0].error_code)
+
+    def test_cancel(self):
+        """
+        Test compliance of the basic.cancel method
+        """
+        session = self.session
+        #setup, declare a queue:
+        session.queue_declare(queue="test-queue-4", exclusive=True, auto_delete=True)
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "One"))
+
+        session.message_subscribe(destination="my-consumer", queue="test-queue-4")
+        myqueue = session.incoming("my-consumer")
+        session.message_flow(destination="my-consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="my-consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+
+        #should flush here
+
+        #cancel should stop messages being delivered
+        session.message_cancel(destination="my-consumer")
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"), "Two"))
+        msg = myqueue.get(timeout=1)
+        self.assertEqual("One", msg.body)
+        try:
+            msg = myqueue.get(timeout=1)
+            self.fail("Got message after cancellation: " + msg)
+        except Empty: None
+
+        #cancellation of non-existant consumers should be handled without error
+        session.message_cancel(destination="my-consumer")
+        session.message_cancel(destination="this-never-existed")
+
+
+    def test_ack(self):
+        """
+        Test basic ack/recover behaviour
+        """
+        session = self.conn.session("alternate-session", timeout=10)
+        session.queue_declare(queue="test-ack-queue", auto_delete=True)
+
+        session.message_subscribe(queue = "test-ack-queue", destination = "consumer")
+        session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        queue = session.incoming("consumer")
+
+        delivery_properties = session.delivery_properties(routing_key="test-ack-queue")
+        for i in ["One", "Two", "Three", "Four", "Five"]:
+            session.message_transfer(message=Message(delivery_properties, i))
+
+        msg1 = queue.get(timeout=1)
+        msg2 = queue.get(timeout=1)
+        msg3 = queue.get(timeout=1)
+        msg4 = queue.get(timeout=1)
+        msg5 = queue.get(timeout=1)
+
+        self.assertEqual("One", msg1.body)
+        self.assertEqual("Two", msg2.body)
+        self.assertEqual("Three", msg3.body)
+        self.assertEqual("Four", msg4.body)
+        self.assertEqual("Five", msg5.body)
+
+        session.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four
+
+        #subscribe from second session here to ensure queue is not
+        #auto-deleted when alternate session closes (no need to ack on these):
+        self.session.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1)
+
+        #now close the session, and see that the unacked messages are
+        #then redelivered to another subscriber:
+        session.close(timeout=10)
+
+        session = self.session
+        session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        queue = session.incoming("checker")
+
+        msg3b = queue.get(timeout=1)
+        msg5b = queue.get(timeout=1)
+
+        self.assertEqual("Three", msg3b.body)
+        self.assertEqual("Five", msg5b.body)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message: " + extra.body)
+        except Empty: None
+
+    def test_reject(self):
+        session = self.session
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout")
+        session.queue_declare(queue = "r", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue = "r", exchange = "amq.fanout")
+
+        session.message_subscribe(queue = "q", destination = "consumer")
+        session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "blah, blah"))
+        msg = session.incoming("consumer").get(timeout = 1)
+        self.assertEquals(msg.body, "blah, blah")
+        session.message_reject(RangedSet(msg.id))
+
+        session.message_subscribe(queue = "r", destination = "checker")
+        session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        msg = session.incoming("checker").get(timeout = 1)
+        self.assertEquals(msg.body, "blah, blah")
+
+    def test_credit_flow_messages(self):
+        """
+        Test basic credit based flow control with unit = message
+        """
+        #declare an exclusive queue
+        session = self.session
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        #create consumer (for now that defaults to infinite credit)
+        session.message_subscribe(queue = "q", destination = "c")
+        session.message_set_flow_mode(flow_mode = 0, destination = "c")
+        #send batch of messages to queue
+        for i in range(1, 11):
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i))
+
+        #set message credit to finite amount (less than enough for all messages)
+        session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
+        #set infinite byte credit
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
+        #check that expected number were received
+        q = session.incoming("c")
+        for i in range(1, 6):
+            self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
+        self.assertEmpty(q)
+
+        #increase credit again and check more are received
+        for i in range(6, 11):
+            session.message_flow(unit = session.credit_unit.message, value = 1, destination = "c")
+            self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
+            self.assertEmpty(q)
+
+    def test_credit_flow_bytes(self):
+        """
+        Test basic credit based flow control with unit = bytes
+        """
+        #declare an exclusive queue
+        session = self.session
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        #create consumer (for now that defaults to infinite credit)
+        session.message_subscribe(queue = "q", destination = "c")
+        session.message_set_flow_mode(flow_mode = 0, destination = "c")
+        #send batch of messages to queue
+        for i in range(10):
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh"))
+
+        #each message is currently interpreted as requiring msg_size bytes of credit
+        msg_size = 19
+
+        #set byte credit to finite amount (less than enough for all messages)
+        session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c")
+        #set infinite message credit
+        session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "c")
+        #check that expected number were received
+        q = session.incoming("c")
+        for i in range(5):
+            self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
+        self.assertEmpty(q)
+
+        #increase credit again and check more are received
+        for i in range(5):
+            session.message_flow(unit = session.credit_unit.byte, value = msg_size, destination = "c")
+            self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
+            self.assertEmpty(q)
+
+
+    def test_window_flow_messages(self):
+        """
+        Test basic window based flow control with unit = message
+        """
+        #declare an exclusive queue
+        session = self.session
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        #create consumer (for now that defaults to infinite credit)
+        session.message_subscribe(queue = "q", destination = "c")
+        session.message_set_flow_mode(flow_mode = 1, destination = "c")
+        #send batch of messages to queue
+        for i in range(1, 11):
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % i))
+
+        #set message credit to finite amount (less than enough for all messages)
+        session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
+        #set infinite byte credit
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
+        #check that expected number were received
+        q = session.incoming("c")
+        for i in range(1, 6):            
+            msg = q.get(timeout = 1)
+            session.receiver._completed.add(msg.id)#TODO: this may be done automatically
+            self.assertDataEquals(session, msg, "Message %d" % i)
+        self.assertEmpty(q)
+
+        #acknowledge messages and check more are received
+        #TODO: there may be a nicer way of doing this
+        session.channel.session_completed(session.receiver._completed)
+
+        for i in range(6, 11):
+            self.assertDataEquals(session, q.get(timeout = 1), "Message %d" % i)
+        self.assertEmpty(q)
+
+
+    def test_window_flow_bytes(self):
+        """
+        Test basic window based flow control with unit = bytes
+        """
+        #declare an exclusive queue
+        session = self.session
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        #create consumer (for now that defaults to infinite credit)
+        session.message_subscribe(queue = "q", destination = "c")
+        session.message_set_flow_mode(flow_mode = 1, destination = "c")
+        #send batch of messages to queue
+        for i in range(10):
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "abcdefgh"))
+
+        #each message is currently interpreted as requiring msg_size bytes of credit
+        msg_size = 19
+
+        #set byte credit to finite amount (less than enough for all messages)
+        session.message_flow(unit = session.credit_unit.byte, value = msg_size*5, destination = "c")
+        #set infinite message credit
+        session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "c")
+        #check that expected number were received
+        q = session.incoming("c")
+        msgs = []
+        for i in range(5):
+            msg = q.get(timeout = 1)
+            msgs.append(msg)
+            self.assertDataEquals(session, msg, "abcdefgh")
+        self.assertEmpty(q)
+
+        #ack each message individually and check more are received
+        for i in range(5):
+            msg = msgs.pop()
+            #TODO: there may be a nicer way of doing this
+            session.receiver._completed.add(msg.id)
+            session.channel.session_completed(session.receiver._completed)
+            self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
+            self.assertEmpty(q)
+
+    def test_window_flush_ack_flow(self):
+        """
+        Test basic window based flow control with unit = bytes
+        """
+        #declare an exclusive queue
+        ssn = self.session
+        ssn.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        #create consumer
+        ssn.message_subscribe(queue = "q", destination = "c",
+                              accept_mode=ssn.accept_mode.explicit)
+        ssn.message_set_flow_mode(flow_mode = ssn.flow_mode.window, destination = "c")
+
+        #send message A
+        ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "A"))
+
+        for unit in ssn.credit_unit.VALUES:
+            ssn.message_flow("c", unit, 0xFFFFFFFFL)
+
+        q = ssn.incoming("c")
+        msgA = q.get(timeout=10)
+
+        ssn.message_flush(destination="c")
+
+        # XXX
+        ssn.receiver._completed.add(msgA.id)
+        ssn.channel.session_completed(ssn.receiver._completed)
+        ssn.message_accept(RangedSet(msgA.id))
+
+        for unit in ssn.credit_unit.VALUES:
+            ssn.message_flow("c", unit, 0xFFFFFFFFL)
+
+        #send message B
+        ssn.message_transfer(message=Message(ssn.delivery_properties(routing_key="q"), "B"))
+
+        msgB = q.get(timeout=10)
+
+    def test_subscribe_not_acquired(self):
+        """
+        Test the not-acquired modes works as expected for a simple case
+        """
+        session = self.session
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        for i in range(1, 6):
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))
+
+        session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
+        session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "a")
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+        session.message_subscribe(queue = "q", destination = "b", acquire_mode = 1)
+        session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "b")
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b")
+
+        for i in range(6, 11):
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %s" % i))
+
+        #both subscribers should see all messages
+        qA = session.incoming("a")
+        qB = session.incoming("b")
+        for i in range(1, 11):
+            for q in [qA, qB]:
+                msg = q.get(timeout = 1)
+                self.assertEquals("Message %s" % i, msg.body)
+                #TODO: tidy up completion
+                session.receiver._completed.add(msg.id)
+
+        #TODO: tidy up completion
+        session.channel.session_completed(session.receiver._completed)
+        #messages should still be on the queue:
+        self.assertEquals(10, session.queue_query(queue = "q").message_count)
+
+    def test_acquire_with_no_accept_and_credit_flow(self):
+        """
+        Test that messages recieved unacquired, with accept not
+        required in windowing mode can be acquired.
+        """
+        session = self.session
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
+
+        session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1, accept_mode = 1)
+        session.message_set_flow_mode(flow_mode = session.flow_mode.credit, destination = "a")
+        session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = "a")
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+        msg = session.incoming("a").get(timeout = 1)
+        self.assertEquals("acquire me", msg.body)
+        #message should still be on the queue:
+        self.assertEquals(1, session.queue_query(queue = "q").message_count)
+
+        transfers = RangedSet(msg.id)
+        response = session.message_acquire(transfers)
+        #check that we get notification (i.e. message_acquired)
+        self.assert_(msg.id in response.transfers)
+        #message should have been removed from the queue:
+        self.assertEquals(0, session.queue_query(queue = "q").message_count)
+
+    def test_acquire(self):
+        """
+        Test explicit acquire function
+        """
+        session = self.session
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "acquire me"))
+
+        session.message_subscribe(queue = "q", destination = "a", acquire_mode = 1)
+        session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        msg = session.incoming("a").get(timeout = 1)
+        self.assertEquals("acquire me", msg.body)
+        #message should still be on the queue:
+        self.assertEquals(1, session.queue_query(queue = "q").message_count)
+
+        transfers = RangedSet(msg.id)
+        response = session.message_acquire(transfers)
+        #check that we get notification (i.e. message_acquired)
+        self.assert_(msg.id in response.transfers)
+        #message should have been removed from the queue:
+        self.assertEquals(0, session.queue_query(queue = "q").message_count)
+        session.message_accept(transfers)
+
+
+    def test_release(self):
+        """
+        Test explicit release function
+        """
+        session = self.session
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "release me"))
+
+        session.message_subscribe(queue = "q", destination = "a")
+        session.message_flow(destination="a", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="a", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        msg = session.incoming("a").get(timeout = 1)
+        self.assertEquals("release me", msg.body)
+        session.message_cancel(destination = "a")
+        session.message_release(RangedSet(msg.id))
+
+        #message should not have been removed from the queue:
+        self.assertEquals(1, session.queue_query(queue = "q").message_count)
+
+    def test_release_ordering(self):
+        """
+        Test order of released messages is as expected
+        """
+        session = self.session
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        for i in range (1, 11):
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "released message %s" % (i)))
+
+        session.message_subscribe(queue = "q", destination = "a")
+        session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+        queue = session.incoming("a")
+        first = queue.get(timeout = 1)
+        for i in range(2, 10):
+            msg = queue.get(timeout = 1)
+            self.assertEquals("released message %s" % (i), msg.body)
+            
+        last = queue.get(timeout = 1)
+        self.assertEmpty(queue)
+        released = RangedSet()
+        released.add(first.id, last.id)
+        session.message_release(released)
+
+        #TODO: may want to clean this up...
+        session.receiver._completed.add(first.id, last.id)
+        session.channel.session_completed(session.receiver._completed)
+        
+        for i in range(1, 11):
+            self.assertEquals("released message %s" % (i), queue.get(timeout = 1).body)
+
+    def test_ranged_ack(self):
+        """
+        Test acking of messages ranges
+        """
+        session = self.conn.session("alternate-session", timeout=10)
+
+        session.queue_declare(queue = "q", auto_delete=True)
+        delivery_properties = session.delivery_properties(routing_key="q")
+        for i in range (1, 11):
+            session.message_transfer(message=Message(delivery_properties, "message %s" % (i)))
+
+        session.message_subscribe(queue = "q", destination = "a")
+        session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+        queue = session.incoming("a")
+        ids = []
+        for i in range (1, 11):
+            msg = queue.get(timeout = 1)
+            self.assertEquals("message %s" % (i), msg.body)
+            ids.append(msg.id)
+            
+        self.assertEmpty(queue)
+
+        #ack all but the fourth message (command id 2)
+        accepted = RangedSet()
+        accepted.add(ids[0], ids[2])
+        accepted.add(ids[4], ids[9])
+        session.message_accept(accepted)
+
+        #subscribe from second session here to ensure queue is not
+        #auto-deleted when alternate session closes (no need to ack on these):
+        self.session.message_subscribe(queue = "q", destination = "checker")
+
+        #now close the session, and see that the unacked messages are
+        #then redelivered to another subscriber:
+        session.close(timeout=10)
+
+        session = self.session
+        session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        queue = session.incoming("checker")
+
+        self.assertEquals("message 4", queue.get(timeout = 1).body)
+        self.assertEmpty(queue)
+
+    def test_subscribe_not_acquired_2(self):
+        session = self.session
+
+        #publish some messages
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        for i in range(1, 11):
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
+
+        #consume some of them
+        session.message_subscribe(queue = "q", destination = "a")
+        session.message_set_flow_mode(flow_mode = 0, destination = "a")
+        session.message_flow(unit = session.credit_unit.message, value = 5, destination = "a")
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+
+        queue = session.incoming("a")
+        for i in range(1, 6):
+            msg = queue.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.body)
+            #complete and accept
+            session.message_accept(RangedSet(msg.id))
+            #TODO: tidy up completion
+            session.receiver._completed.add(msg.id)
+            session.channel.session_completed(session.receiver._completed)
+        self.assertEmpty(queue)
+
+        #now create a not-acquired subscriber
+        session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b")
+
+        #check it gets those not consumed
+        queue = session.incoming("b")
+        session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
+        for i in range(6, 11):
+            msg = queue.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.body)
+            session.message_release(RangedSet(msg.id))
+            #TODO: tidy up completion
+            session.receiver._completed.add(msg.id)
+            session.channel.session_completed(session.receiver._completed)
+        session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
+        self.assertEmpty(queue)
+
+        #check all 'browsed' messages are still on the queue
+        self.assertEqual(5, session.queue_query(queue="q").message_count)
+
+    def test_subscribe_not_acquired_3(self):
+        session = self.session
+
+        #publish some messages
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        for i in range(1, 11):
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
+
+        #create a not-acquired subscriber
+        session.message_subscribe(queue = "q", destination = "a", acquire_mode=1)
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+        session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+
+        #browse through messages
+        queue = session.incoming("a")
+        for i in range(1, 11):
+            msg = queue.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.body)
+            if (i % 2):
+                #try to acquire every second message
+                response = session.message_acquire(RangedSet(msg.id))
+                #check that acquire succeeds
+                self.assert_(msg.id in response.transfers)
+                session.message_accept(RangedSet(msg.id))
+            else:
+                session.message_release(RangedSet(msg.id))
+            session.receiver._completed.add(msg.id)
+            session.channel.session_completed(session.receiver._completed)
+        self.assertEmpty(queue)
+
+        #create a second not-acquired subscriber
+        session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b")
+        session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
+        #check it gets those not consumed
+        queue = session.incoming("b")
+        for i in [2,4,6,8,10]:
+            msg = queue.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.body)
+            session.message_release(RangedSet(msg.id))
+            session.receiver._completed.add(msg.id)
+            session.channel.session_completed(session.receiver._completed)
+        session.message_flow(unit = session.credit_unit.message, value = 1, destination = "b")
+        self.assertEmpty(queue)
+
+        #check all 'browsed' messages are still on the queue
+        self.assertEqual(5, session.queue_query(queue="q").message_count)
+
+    def test_release_unacquired(self):
+        session = self.session
+
+        #create queue
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+        #send message
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "my-message"))
+
+        #create two 'browsers'
+        session.message_subscribe(queue = "q", destination = "a", acquire_mode=1)
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+        session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+        queueA = session.incoming("a")
+
+        session.message_subscribe(queue = "q", destination = "b", acquire_mode=1)
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "b")
+        session.message_flow(unit = session.credit_unit.message, value = 10, destination = "b")
+        queueB = session.incoming("b")
+        
+        #have each browser release the message
+        msgA = queueA.get(timeout = 1)
+        session.message_release(RangedSet(msgA.id))
+
+        msgB = queueB.get(timeout = 1)
+        session.message_release(RangedSet(msgB.id))
+        
+        #cancel browsers
+        session.message_cancel(destination = "a")
+        session.message_cancel(destination = "b")
+        
+        #create consumer
+        session.message_subscribe(queue = "q", destination = "c")
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
+        session.message_flow(unit = session.credit_unit.message, value = 10, destination = "c")
+        queueC = session.incoming("c")
+        #consume the message then ack it
+        msgC = queueC.get(timeout = 1)
+        session.message_accept(RangedSet(msgC.id))
+        #ensure there are no other messages
+        self.assertEmpty(queueC)
+
+    def test_release_order(self):
+        session = self.session
+
+        #create queue
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+
+        #send messages
+        for i in range(1, 11):
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "message-%d" % (i)))
+
+        #subscribe:
+        session.message_subscribe(queue="q", destination="a")
+        a = session.incoming("a")
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+        session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+
+        for i in range(1, 11):
+            msg = a.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.body)
+            if (i % 2):
+                #accept all odd messages
+                session.message_accept(RangedSet(msg.id))
+            else:
+                #release all even messages
+                session.message_release(RangedSet(msg.id))
+
+        #browse:
+        session.message_subscribe(queue="q", destination="b", acquire_mode=1)
+        b = session.incoming("b")
+        b.start()
+        for i in [2, 4, 6, 8, 10]:
+            msg = b.get(timeout = 1)
+            self.assertEquals("message-%d" % (i), msg.body)
+
+
+    def test_empty_body(self):
+        session = self.session
+        session.queue_declare(queue="xyz", exclusive=True, auto_delete=True)
+        props = session.delivery_properties(routing_key="xyz")
+        session.message_transfer(message=Message(props, ""))
+
+        consumer_tag = "tag1"
+        session.message_subscribe(queue="xyz", destination=consumer_tag)
+        session.message_flow(unit = session.credit_unit.message, value = 0xFFFFFFFFL, destination = consumer_tag)
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
+        queue = session.incoming(consumer_tag)
+        msg = queue.get(timeout=1)
+        self.assertEquals("", msg.body)
+        session.message_accept(RangedSet(msg.id))
+
+    def test_incoming_start(self):
+        q = "test_incoming_start"
+        session = self.session
+
+        session.queue_declare(queue=q, exclusive=True, auto_delete=True)
+        session.message_subscribe(queue=q, destination="msgs")
+        messages = session.incoming("msgs")
+        assert messages.destination == "msgs"
+
+        dp = session.delivery_properties(routing_key=q)
+        session.message_transfer(message=Message(dp, "test"))
+
+        messages.start()
+        msg = messages.get()
+        assert msg.body == "test"
+
+    def test_ttl(self):
+        q = "test_ttl"
+        session = self.session
+
+        session.queue_declare(queue=q, exclusive=True, auto_delete=True)
+
+        dp = session.delivery_properties(routing_key=q, ttl=500)#expire in half a second
+        session.message_transfer(message=Message(dp, "first"))
+
+        dp = session.delivery_properties(routing_key=q, ttl=300000)#expire in fives minutes
+        session.message_transfer(message=Message(dp, "second"))
+
+        d = "msgs"
+        session.message_subscribe(queue=q, destination=d)
+        messages = session.incoming(d)
+        sleep(1)
+        session.message_flow(unit = session.credit_unit.message, value=2, destination=d)
+        session.message_flow(unit = session.credit_unit.byte, value=0xFFFFFFFFL, destination=d)
+        assert messages.get(timeout=1).body == "second"
+        self.assertEmpty(messages)
+
+
+    def assertDataEquals(self, session, msg, expected):
+        self.assertEquals(expected, msg.body)
+
+    def assertEmpty(self, queue):
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Queue not empty, contains: " + extra.body)
+        except Empty: None
+
+class SizelessContent(Content):
+
+    def size(self):
+        return None

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/message.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,68 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.datatypes import Message, RangedSet
+#from qpid.testlib import testrunner, TestBase010
+from qpid.testlib import TestBase010
+
+class PersistenceTests(TestBase010):
+    def test_delete_queue_after_publish(self):
+        session = self.session
+        session.auto_sync = False
+
+        #create queue
+        session.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+        #send message
+        for i in range(1, 10):
+            dp = session.delivery_properties(routing_key="q", delivery_mode=2)
+            session.message_transfer(message=Message(dp, "my-message"))
+
+        session.auto_sync = True
+        #explicitly delete queue
+        session.queue_delete(queue = "q")
+
+    def test_ack_message_from_deleted_queue(self):
+        session = self.session
+        session.auto_sync = False
+
+        #create queue
+        session.queue_declare(queue = "q", auto_delete=True, durable=True)
+
+        #send message
+        dp = session.delivery_properties(routing_key="q", delivery_mode=2)
+        session.message_transfer(message=Message(dp, "my-message"))
+
+        #create consumer
+        session.message_subscribe(queue = "q", destination = "a", accept_mode = 1, acquire_mode=0)
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "a")
+        session.message_flow(unit = session.credit_unit.message, value = 10, destination = "a")
+        queue = session.incoming("a")
+
+        #consume the message, cancel subscription (triggering auto-delete), then ack it
+        msg = queue.get(timeout = 5)
+        session.message_cancel(destination = "a")
+        session.message_accept(RangedSet(msg.id))
+
+    def test_queue_deletion(self):
+        session = self.session
+        session.queue_declare(queue = "durable-subscriber-queue", exclusive=True, durable=True)
+        session.exchange_bind(exchange="amq.topic", queue="durable-subscriber-queue", binding_key="xyz")
+        dp = session.delivery_properties(routing_key="xyz", delivery_mode=2)
+        session.message_transfer(destination="amq.topic", message=Message(dp, "my-message"))
+        session.queue_delete(queue = "durable-subscriber-queue")

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/persistence.py
------------------------------------------------------------------------------
    svn:eol-style = native

Added: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/query.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/query.py?rev=906961&view=auto
==============================================================================
--- qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/query.py (added)
+++ qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/query.py Fri Feb  5 15:08:44 2010
@@ -0,0 +1,247 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+# 
+#   http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+from qpid.client import Client, Closed
+from qpid.queue import Empty
+from qpid.content import Content
+from qpid.testlib import TestBase010
+
+class QueryTests(TestBase010):
+    """Tests for various query methods"""
+
+    def test_queue_query(self):
+        session = self.session
+        session.queue_declare(queue="my-queue", exclusive=True)
+        result = session.queue_query(queue="my-queue")
+        self.assertEqual("my-queue", result.queue)
+
+    def test_queue_query_unknown(self):
+        session = self.session
+        result = session.queue_query(queue="I don't exist")
+        self.assert_(not result.queue)
+
+    def test_exchange_query(self):
+        """
+        Test that the exchange_query method works as expected
+        """
+        session = self.session
+        #check returned type for the standard exchanges
+        self.assertEqual("direct", session.exchange_query(name="amq.direct").type)
+        self.assertEqual("topic", session.exchange_query(name="amq.topic").type)
+        self.assertEqual("fanout", session.exchange_query(name="amq.fanout").type)
+        self.assertEqual("headers", session.exchange_query(name="amq.match").type)
+        self.assertEqual("direct", session.exchange_query(name="").type)        
+        #declare an exchange
+        session.exchange_declare(exchange="my-test-exchange", type= "direct", durable=False)
+        #check that the result of a query is as expected
+        response = session.exchange_query(name="my-test-exchange")
+        self.assertEqual("direct", response.type)
+        self.assert_(not response.durable)
+        self.assert_(not response.not_found)
+        #delete the exchange
+        session.exchange_delete(exchange="my-test-exchange")
+        #check that the query now reports not-found
+        self.assert_(session.exchange_query(name="my-test-exchange").not_found)
+
+    def test_exchange_bound_direct(self):
+        """
+        Test that the exchange_bound method works as expected with the direct exchange
+        """
+        self.exchange_bound_with_key("amq.direct")
+
+    def test_exchange_bound_topic(self):
+        """
+        Test that the exchange_bound method works as expected with the direct exchange
+        """
+        self.exchange_bound_with_key("amq.topic")
+
+    def exchange_bound_with_key(self, exchange_name):
+        session = self.session
+        #setup: create two queues
+        session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+        session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+        
+        session.exchange_bind(exchange=exchange_name, queue="used-queue", binding_key="used-key")
+
+        # test detection of any binding to specific queue
+        response = session.exchange_bound(exchange=exchange_name, queue="used-queue")
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assert_(not response.queue_not_matched)        
+
+        # test detection of specific binding to any queue
+        response = session.exchange_bound(exchange=exchange_name, binding_key="used-key")
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assert_(not response.key_not_matched)        
+
+        # test detection of specific binding to specific queue
+        response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="used-key")
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assert_(not response.queue_not_matched)        
+        self.assert_(not response.key_not_matched)        
+
+        # test unmatched queue, unspecified binding
+        response = session.exchange_bound(exchange=exchange_name, queue="unused-queue")
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+
+        # test unspecified queue, unmatched binding
+        response = session.exchange_bound(exchange=exchange_name, binding_key="unused-key")
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assertEqual(True, response.key_not_matched)        
+
+        # test matched queue, unmatched binding
+        response = session.exchange_bound(exchange=exchange_name, queue="used-queue", binding_key="unused-key")
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assert_(not response.queue_not_matched)        
+        self.assertEqual(True, response.key_not_matched)        
+
+        # test unmatched queue, matched binding
+        response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="used-key")
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+        self.assert_(not response.key_not_matched)        
+
+        # test unmatched queue, unmatched binding
+        response = session.exchange_bound(exchange=exchange_name, queue="unused-queue", binding_key="unused-key")
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+        self.assertEqual(True, response.key_not_matched)        
+
+        #test exchange not found
+        self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found)
+
+        #test exchange found, queue not found
+        response = session.exchange_bound(exchange=exchange_name, queue="unknown-queue")
+        self.assertEqual(False, response.exchange_not_found)
+        self.assertEqual(True, response.queue_not_found)
+
+        #test exchange not found, queue found
+        response = session.exchange_bound(exchange="unknown-exchange", queue="used-queue")
+        self.assertEqual(True, response.exchange_not_found)
+        self.assertEqual(False, response.queue_not_found)
+
+        #test not exchange found, queue not found
+        response = session.exchange_bound(exchange="unknown-exchange", queue="unknown-queue")
+        self.assertEqual(True, response.exchange_not_found)
+        self.assertEqual(True, response.queue_not_found)
+
+
+    def test_exchange_bound_fanout(self):
+        """
+        Test that the exchange_bound method works as expected with fanout exchange
+        """
+        session = self.session
+        #setup
+        session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+        session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+        session.exchange_bind(exchange="amq.fanout", queue="used-queue")
+
+        # test detection of any binding to specific queue
+        response = session.exchange_bound(exchange="amq.fanout", queue="used-queue")
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assert_(not response.queue_not_matched)        
+
+        # test unmatched queue, unspecified binding
+        response = session.exchange_bound(exchange="amq.fanout", queue="unused-queue")
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+
+        #test exchange not found
+        self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found)
+
+        #test queue not found
+        self.assertEqual(True, session.exchange_bound(exchange="amq.fanout", queue="unknown-queue").queue_not_found)
+
+    def test_exchange_bound_header(self):
+        """
+        Test that the exchange_bound method works as expected with headers exchanges
+        """
+        session = self.session
+        #setup
+        session.queue_declare(queue="used-queue", exclusive=True, auto_delete=True)
+        session.queue_declare(queue="unused-queue", exclusive=True, auto_delete=True)
+        session.exchange_bind(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"} )
+
+        # test detection of any binding to specific queue
+        response = session.exchange_bound(exchange="amq.match", queue="used-queue")
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assert_(not response.queue_not_matched)        
+
+        # test detection of specific binding to any queue
+        response = session.exchange_bound(exchange="amq.match", arguments={"x-match":"all", "a":"A"})
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assert_(not response.args_not_matched)        
+
+        # test detection of specific binding to specific queue
+        response = session.exchange_bound(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "a":"A"})
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assert_(not response.queue_not_matched)        
+        self.assert_(not response.args_not_matched)        
+
+        # test unmatched queue, unspecified binding
+        response = session.exchange_bound(exchange="amq.match", queue="unused-queue")
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+
+        # test unspecified queue, unmatched binding
+        response = session.exchange_bound(exchange="amq.match", arguments={"x-match":"all", "b":"B"})
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assertEqual(True, response.args_not_matched)        
+
+        # test matched queue, unmatched binding
+        response = session.exchange_bound(exchange="amq.match", queue="used-queue", arguments={"x-match":"all", "b":"B"})
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assert_(not response.queue_not_matched)        
+        self.assertEqual(True, response.args_not_matched)        
+
+        # test unmatched queue, matched binding
+        response = session.exchange_bound(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "a":"A"})
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+        self.assert_(not response.args_not_matched)        
+
+        # test unmatched queue, unmatched binding
+        response = session.exchange_bound(exchange="amq.match", queue="unused-queue", arguments={"x-match":"all", "b":"B"})
+        self.assert_(not response.exchange_not_found)
+        self.assert_(not response.queue_not_found)
+        self.assertEqual(True, response.queue_not_matched)        
+        self.assertEqual(True, response.args_not_matched)        
+
+        #test exchange not found
+        self.assertEqual(True, session.exchange_bound(exchange="unknown-exchange").exchange_not_found)
+
+        #test queue not found
+        self.assertEqual(True, session.exchange_bound(exchange="amq.match", queue="unknown-queue").queue_not_found)
+        

Propchange: qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_10/query.py
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org