You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/20 20:43:26 UTC

svn commit: r1186990 [43/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/exchange.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/exchange.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/exchange.py (original)
+++ qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/exchange.py Thu Oct 20 18:42:46 2011
@@ -226,8 +226,6 @@ class DefaultExchangeRuleTests(TestHelpe
         # Test automatic binding by queue name.
         self.queue_declare(queue="d")
         self.assertPublishConsume(queue="d", routing_key="d")
-        # Test explicit bind to default queue
-        self.verifyDirectExchange("")
 
 
 # TODO aconway 2006-09-27: Fill in empty tests:
@@ -448,9 +446,9 @@ class MiscellaneousErrorsTests(TestHelpe
     def testTypeNotKnown(self):
         try:
             self.session.exchange_declare(exchange="test_type_not_known_exchange", type="invalid_type")
-            self.fail("Expected 503 for declaration of unknown exchange type.")
+            self.fail("Expected 404 for declaration of unknown exchange type.")
         except SessionException, e:
-            self.assertEquals(503, e.args[0].error_code)
+            self.assertEquals(404, e.args[0].error_code)
 
     def testDifferentDeclaredType(self):
         self.exchange_declare(exchange="test_different_declared_type_exchange", type="direct")
@@ -460,7 +458,30 @@ class MiscellaneousErrorsTests(TestHelpe
             self.fail("Expected 530 for redeclaration of exchange with different type.")
         except SessionException, e:
             self.assertEquals(530, e.args[0].error_code)
-    
+
+    def testDefaultAccessBind(self):
+        try:
+            self.session.queue_declare(queue="my-queue", auto_delete=True, exclusive=True)
+            self.session.exchange_bind(exchange="", queue="my-queue", binding_key="another-key")
+            self.fail("Expected 542 (invalid-argument) code for bind to default exchange.")
+        except SessionException, e:
+            self.assertEquals(542, e.args[0].error_code)
+
+    def testDefaultAccessUnbind(self):
+        try:
+            self.session.queue_declare(queue="my-queue", auto_delete=True, exclusive=True)
+            self.session.exchange_unbind(exchange="", queue="my-queue", binding_key="my-queue")
+            self.fail("Expected 542 (invalid-argument) code for unbind from default exchange.")
+        except SessionException, e:
+            self.assertEquals(542, e.args[0].error_code)
+
+    def testDefaultAccessDelete(self):
+        try:
+            self.session.exchange_delete(exchange="")
+            self.fail("Expected 542 (invalid-argument) code for delete of default exchange.")
+        except SessionException, e:
+            self.assertEquals(542, e.args[0].error_code)
+
 class ExchangeTests(TestHelper):
     def testHeadersBindNoMatchArg(self):
         self.session.queue_declare(queue="q", exclusive=True, auto_delete=True)

Modified: qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/extensions.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/extensions.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/extensions.py (original)
+++ qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/extensions.py Thu Oct 20 18:42:46 2011
@@ -20,6 +20,8 @@ from qpid.client import Client, Closed
 from qpid.queue import Empty
 from qpid.content import Content
 from qpid.testlib import TestBase010
+from qpid.session import SessionException
+from qpid.datatypes import uuid4
 from time import sleep
 
 class ExtensionTests(TestBase010):
@@ -28,10 +30,58 @@ class ExtensionTests(TestBase010):
     def test_timed_autodelete(self):
         session = self.session
         session2 = self.conn.session("another-session")
-        session2.queue_declare(queue="my-queue", exclusive=True, auto_delete=True, arguments={"qpid.auto_delete_timeout":5})
+        name=str(uuid4())
+        session2.queue_declare(queue=name, exclusive=True, auto_delete=True, arguments={"qpid.auto_delete_timeout":3})
         session2.close()
-        result = session.queue_query(queue="my-queue")
-        self.assertEqual("my-queue", result.queue)
+        result = session.queue_query(queue=name)
+        self.assertEqual(name, result.queue)
         sleep(5)
-        result = session.queue_query(queue="my-queue")
+        result = session.queue_query(queue=name)
         self.assert_(not result.queue)
+
+    def valid_policy_args(self, args, name="test-queue"):
+        try:
+            self.session.queue_declare(queue=name, arguments=args)
+            self.session.queue_delete(queue=name) # cleanup
+        except SessionException, e:
+            self.fail("declare with valid policy args failed: %s" % (args))
+            self.session = self.conn.session("replacement", 2)
+
+    def invalid_policy_args(self, args, name="test-queue"):
+        # go through invalid declare attempts twice to make sure that
+        # the queue doesn't actually get created first time around
+        # even if exception is thrown
+        for i in range(1, 3):
+            try:
+                self.session.queue_declare(queue=name, arguments=args)
+                self.session.queue_delete(queue=name) # cleanup
+                self.fail("declare with invalid policy args suceeded: %s (iteration %d)" % (args, i))
+            except SessionException, e:
+                self.session = self.conn.session(str(uuid4()))
+
+    def test_policy_max_size_as_valid_string(self):
+        self.valid_policy_args({"qpid.max_size":"3"})
+
+    def test_policy_max_count_as_valid_string(self):
+        self.valid_policy_args({"qpid.max_count":"3"})
+
+    def test_policy_max_count_and_size_as_valid_strings(self):
+        self.valid_policy_args({"qpid.max_count":"3","qpid.max_size":"0"})
+
+    def test_policy_negative_count(self):
+        self.invalid_policy_args({"qpid.max_count":-1})
+
+    def test_policy_negative_size(self):
+        self.invalid_policy_args({"qpid.max_size":-1})
+
+    def test_policy_size_as_invalid_string(self):
+        self.invalid_policy_args({"qpid.max_size":"foo"})
+
+    def test_policy_count_as_invalid_string(self):
+        self.invalid_policy_args({"qpid.max_count":"foo"})
+
+    def test_policy_size_as_float(self):
+        self.invalid_policy_args({"qpid.max_size":3.14159})
+
+    def test_policy_count_as_float(self):
+        self.invalid_policy_args({"qpid.max_count":"2222222.22222"})

Modified: qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/management.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/management.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/management.py (original)
+++ qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/management.py Thu Oct 20 18:42:46 2011
@@ -156,7 +156,7 @@ class ManagementTest (TestBase010):
         queues = self.qmf.getObjects(_class="queue")
 
         "Move 10 messages from src-queue to dest-queue"
-        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10)
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 10, {})
         self.assertEqual (result.status, 0) 
 
         sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
@@ -166,7 +166,7 @@ class ManagementTest (TestBase010):
         self.assertEqual (dq.msgDepth,10)
 
         "Move all remaining messages to destination"
-        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0)
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "dest-queue", 0, {})
         self.assertEqual (result.status,0)
 
         sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
@@ -176,16 +176,16 @@ class ManagementTest (TestBase010):
         self.assertEqual (dq.msgDepth,20)
 
         "Use a bad source queue name"
-        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0)
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("bad-src-queue", "dest-queue", 0, {})
         self.assertEqual (result.status,4)
 
         "Use a bad destination queue name"
-        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0)
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("src-queue", "bad-dest-queue", 0, {})
         self.assertEqual (result.status,4)
 
         " Use a large qty (40) to move from dest-queue back to "
         " src-queue- should move all "
-        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40)
+        result = self.qmf.getObjects(_class="broker")[0].queueMoveMessages("dest-queue", "src-queue", 40, {})
         self.assertEqual (result.status,0)
 
         sq = self.qmf.getObjects(_class="queue", name="src-queue")[0]
@@ -225,23 +225,55 @@ class ManagementTest (TestBase010):
         pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
 
         "Purge top message from purge-queue"
-        result = pq.purge(1)
+        result = pq.purge(1, {})
         self.assertEqual (result.status, 0) 
         pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
         self.assertEqual (pq.msgDepth,19)
 
         "Purge top 9 messages from purge-queue"
-        result = pq.purge(9)
+        result = pq.purge(9, {})
         self.assertEqual (result.status, 0) 
         pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
         self.assertEqual (pq.msgDepth,10)
 
         "Purge all messages from purge-queue"
-        result = pq.purge(0)
+        result = pq.purge(0, {})
         self.assertEqual (result.status, 0) 
         pq = self.qmf.getObjects(_class="queue", name="purge-queue")[0]
         self.assertEqual (pq.msgDepth,0)
 
+    def test_reroute_priority_queue(self):
+        self.startQmf()
+        session = self.session
+
+        #setup test queue supporting multiple priority levels
+        session.queue_declare(queue="test-queue", exclusive=True, auto_delete=True, arguments={'x-qpid-priorities':10})
+
+        #send some messages of varying priority to that queue:
+        for i in range(0, 5):
+            deliveryProps = session.delivery_properties(routing_key="test-queue", priority=i+5)
+            session.message_transfer(message=Message(deliveryProps, "Message %d" % (i+1)))
+
+
+        #declare and bind a queue to amq.fanout through which rerouted
+        #messages can be verified:
+        session.queue_declare(queue="rerouted", exclusive=True, auto_delete=True, arguments={'x-qpid-priorities':10})
+        session.exchange_bind(queue="rerouted", exchange="amq.fanout")
+
+        #reroute messages from test queue to amq.fanout (and hence to
+        #rerouted queue):
+        pq = self.qmf.getObjects(_class="queue", name="test-queue")[0]
+        result = pq.reroute(0, False, "amq.fanout", {})
+        self.assertEqual(result.status, 0) 
+
+        #verify messages are all rerouted:
+        self.subscribe(destination="incoming", queue="rerouted")
+        incoming = session.incoming("incoming")
+        for i in range(0, 5):
+            msg = incoming.get(timeout=1)
+            self.assertEqual("Message %d" % (5-i), msg.body)
+
+
     def test_reroute_queue(self):
         """
         Test ability to reroute messages from the head of a queue.
@@ -269,7 +301,7 @@ class ManagementTest (TestBase010):
         pq = self.qmf.getObjects(_class="queue", name="reroute-queue")[0]
 
         "Reroute top message from reroute-queue to alternate exchange"
-        result = pq.reroute(1, True, "")
+        result = pq.reroute(1, True, "", {})
         self.assertEqual(result.status, 0) 
         pq.update()
         aq = self.qmf.getObjects(_class="queue", name="alt-queue1")[0]
@@ -277,7 +309,7 @@ class ManagementTest (TestBase010):
         self.assertEqual(aq.msgDepth,1)
 
         "Reroute top 9 messages from reroute-queue to alt.direct2"
-        result = pq.reroute(9, False, "alt.direct2")
+        result = pq.reroute(9, False, "alt.direct2", {})
         self.assertEqual(result.status, 0) 
         pq.update()
         aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
@@ -285,11 +317,11 @@ class ManagementTest (TestBase010):
         self.assertEqual(aq.msgDepth,9)
 
         "Reroute using a non-existent exchange"
-        result = pq.reroute(0, False, "amq.nosuchexchange")
+        result = pq.reroute(0, False, "amq.nosuchexchange", {})
         self.assertEqual(result.status, 4)
 
         "Reroute all messages from reroute-queue"
-        result = pq.reroute(0, False, "alt.direct2")
+        result = pq.reroute(0, False, "alt.direct2", {})
         self.assertEqual(result.status, 0) 
         pq.update()
         aq = self.qmf.getObjects(_class="queue", name="alt-queue2")[0]
@@ -305,11 +337,44 @@ class ManagementTest (TestBase010):
             session.message_transfer(destination="amq.direct", message=msg)
 
         "Reroute onto the same queue"
-        result = pq.reroute(0, False, "amq.direct")
+        result = pq.reroute(0, False, "amq.direct", {})
         self.assertEqual(result.status, 0) 
         pq.update()
         self.assertEqual(pq.msgDepth,20)
-        
+
+    def test_reroute_alternate_exchange(self):
+        """
+        Test that when rerouting, the alternate-exchange is considered if relevant
+        """
+        self.startQmf()
+        session = self.session
+        # 1. Create 2 exchanges A and B (fanout) where B is the
+        # alternate exchange for A
+        session.exchange_declare(exchange="B", type="fanout")
+        session.exchange_declare(exchange="A", type="fanout", alternate_exchange="B")
+
+        # 2. Bind queue X to B
+        session.queue_declare(queue="X", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="X", exchange="B")
+
+        # 3. Send 1 message to queue Y
+        session.queue_declare(queue="Y", exclusive=True, auto_delete=True)
+        props = session.delivery_properties(routing_key="Y")
+        session.message_transfer(message=Message(props, "reroute me!"))
+
+        # 4. Call reroute on queue Y and specify that messages should
+        # be sent to exchange A
+        y = self.qmf.getObjects(_class="queue", name="Y")[0]
+        result = y.reroute(1, False, "A", {})
+        self.assertEqual(result.status, 0)
+
+        # 5. verify that the message is rerouted through B (as A has
+        # no matching bindings) to X
+        self.subscribe(destination="x", queue="X")
+        self.assertEqual("reroute me!", session.incoming("x").get(timeout=1).body)
+
+        # Cleanup
+        for e in ["A", "B"]: session.exchange_delete(exchange=e)
 
     def test_methods_async (self):
         """
@@ -519,4 +584,63 @@ class ManagementTest (TestBase010):
         conn_qmf.update()
         self.assertEqual(conn_qmf.msgsToClient, 1)
 
-        
+    def test_timestamp_config(self):
+        """
+        Test message timestamping control.
+        """
+        self.startQmf()
+        conn = self.connect()
+        session = conn.session("timestamp-session")
+
+        #verify that receive message timestamping is OFF by default
+        broker = self.qmf.getObjects(_class="broker")[0]
+        rc = broker.getTimestampConfig()
+        self.assertEqual(rc.status, 0)
+        self.assertEqual(rc.text, "OK")
+        #self.assertEqual(rc.receive, False)
+
+        #try to enable it
+        rc = broker.setTimestampConfig(True)
+        self.assertEqual(rc.status, 0)
+        self.assertEqual(rc.text, "OK")
+
+        rc = broker.getTimestampConfig()
+        self.assertEqual(rc.status, 0)
+        self.assertEqual(rc.text, "OK")
+        self.assertEqual(rc.receive, True)
+
+        #send a message to a queue
+        session.queue_declare(queue="ts-q", exclusive=True, auto_delete=True)
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="ts-q"), "abc"))
+
+        #receive message from queue, and verify timestamp is present
+        session.message_subscribe(destination="d", queue="ts-q")
+        session.message_flow(destination="d", unit=session.credit_unit.message, value=0xFFFFFFFFL)
+        session.message_flow(destination="d", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
+        incoming = session.incoming("d")
+        msg = incoming.get(timeout=1)
+        self.assertEqual("abc", msg.body)
+        self.assertEqual(msg.has("delivery_properties"), True)
+        dp = msg.get("delivery_properties")
+        assert(dp.timestamp)
+
+        #try to disable it
+        rc = broker.setTimestampConfig(False)
+        self.assertEqual(rc.status, 0)
+        self.assertEqual(rc.text, "OK")
+
+        rc = broker.getTimestampConfig()
+        self.assertEqual(rc.status, 0)
+        self.assertEqual(rc.text, "OK")
+        self.assertEqual(rc.receive, False)
+
+        #send another message to the queue
+        session.message_transfer(message=Message(session.delivery_properties(routing_key="ts-q"), "def"))
+
+        #receive message from queue, and verify timestamp is NOT PRESENT
+        msg = incoming.get(timeout=1)
+        self.assertEqual("def", msg.body)
+        self.assertEqual(msg.has("delivery_properties"), True)
+        dp = msg.get("delivery_properties")
+        self.assertEqual(dp.timestamp, None)
+

Modified: qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/message.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/message.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/message.py (original)
+++ qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/message.py Thu Oct 20 18:42:46 2011
@@ -245,26 +245,46 @@ class MessageTests(TestBase010):
             self.fail("Got message after cancellation: " + msg)
         except Empty: None
 
-        #cancellation of non-existant consumers should be handled without error
-        session.message_cancel(destination="my-consumer")
-        session.message_cancel(destination="this-never-existed")
+        #cancellation of non-existant consumers should be result in 404s
+        try:
+            session.message_cancel(destination="my-consumer")
+            self.fail("Expected 404 for recancellation of subscription.")
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
+
+        session = self.conn.session("alternate-session", timeout=10)
+        try:
+            session.message_cancel(destination="this-never-existed")
+            self.fail("Expected 404 for cancellation of unknown subscription.")
+        except SessionException, e:
+            self.assertEquals(404, e.args[0].error_code)
 
 
     def test_ack(self):
         """
-        Test basic ack/recover behaviour
+        Test basic ack/recover behaviour using a combination of implicit and
+        explicit accept subscriptions.
         """
-        session = self.conn.session("alternate-session", timeout=10)
-        session.queue_declare(queue="test-ack-queue", auto_delete=True)
+        self.startQmf()
+        session1 = self.conn.session("alternate-session", timeout=10)
+        session1.queue_declare(queue="test-ack-queue", auto_delete=True)
 
-        session.message_subscribe(queue = "test-ack-queue", destination = "consumer")
-        session.message_flow(destination="consumer", unit=session.credit_unit.message, value=0xFFFFFFFFL)
-        session.message_flow(destination="consumer", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
-        queue = session.incoming("consumer")
-
-        delivery_properties = session.delivery_properties(routing_key="test-ack-queue")
+        delivery_properties = session1.delivery_properties(routing_key="test-ack-queue")
         for i in ["One", "Two", "Three", "Four", "Five"]:
-            session.message_transfer(message=Message(delivery_properties, i))
+            session1.message_transfer(message=Message(delivery_properties, i))
+
+        # verify enqueued message count, use both QMF and session query to verify consistency
+        self.assertEqual(5, session1.queue_query(queue="test-ack-queue").message_count)
+        queueObj = self.qmf.getObjects(_class="queue", name="test-ack-queue")[0]
+        self.assertEquals(queueObj.msgDepth, 5)
+        self.assertEquals(queueObj.msgTotalEnqueues, 5)
+        self.assertEquals(queueObj.msgTotalDequeues, 0)
+
+        # subscribe with implied acquire, explicit accept:
+        session1.message_subscribe(queue = "test-ack-queue", destination = "consumer")
+        session1.message_flow(destination="consumer", unit=session1.credit_unit.message, value=0xFFFFFFFFL)
+        session1.message_flow(destination="consumer", unit=session1.credit_unit.byte, value=0xFFFFFFFFL)
+        queue = session1.incoming("consumer")
 
         msg1 = queue.get(timeout=1)
         msg2 = queue.get(timeout=1)
@@ -278,20 +298,46 @@ class MessageTests(TestBase010):
         self.assertEqual("Four", msg4.body)
         self.assertEqual("Five", msg5.body)
 
-        session.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four
-
-        #subscribe from second session here to ensure queue is not
-        #auto-deleted when alternate session closes (no need to ack on these):
-        self.session.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1)
+        # messages should not be on the queue:
+        self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count)
+        # QMF shows the dequeues as not having happened yet, since they are have
+        # not been accepted
+        queueObj.update()
+        self.assertEquals(queueObj.msgDepth, 5)
+        self.assertEquals(queueObj.msgTotalEnqueues, 5)
+        self.assertEquals(queueObj.msgTotalDequeues, 0)
+
+        session1.message_accept(RangedSet(msg1.id, msg2.id, msg4.id))#One, Two and Four
+
+        # QMF should now reflect the accepted messages as being dequeued
+        self.assertEqual(0, session1.queue_query(queue="test-ack-queue").message_count)
+        queueObj.update()
+        self.assertEquals(queueObj.msgDepth, 2)
+        self.assertEquals(queueObj.msgTotalEnqueues, 5)
+        self.assertEquals(queueObj.msgTotalDequeues, 3)
+
+        #subscribe from second session here to ensure queue is not auto-deleted
+        #when alternate session closes.  Use implicit accept mode to test that
+        #we don't need to explicitly accept
+        session2 = self.conn.session("alternate-session-2", timeout=10)
+        session2.message_subscribe(queue = "test-ack-queue", destination = "checker", accept_mode=1)
 
-        #now close the session, and see that the unacked messages are
+        #now close the first session, and see that the unaccepted messages are
         #then redelivered to another subscriber:
-        session.close(timeout=10)
+        session1.close(timeout=10)
 
-        session = self.session
-        session.message_flow(destination="checker", unit=session.credit_unit.message, value=0xFFFFFFFFL)
-        session.message_flow(destination="checker", unit=session.credit_unit.byte, value=0xFFFFFFFFL)
-        queue = session.incoming("checker")
+        # check the statistics - the queue_query will show the non-accepted
+        # messages have been released. QMF never considered them dequeued, so
+        # those counts won't change
+        self.assertEqual(2, session2.queue_query(queue="test-ack-queue").message_count)
+        queueObj.update()
+        self.assertEquals(queueObj.msgDepth, 2)
+        self.assertEquals(queueObj.msgTotalEnqueues, 5)
+        self.assertEquals(queueObj.msgTotalDequeues, 3)
+
+        session2.message_flow(destination="checker", unit=session2.credit_unit.message, value=0xFFFFFFFFL)
+        session2.message_flow(destination="checker", unit=session2.credit_unit.byte, value=0xFFFFFFFFL)
+        queue = session2.incoming("checker")
 
         msg3b = queue.get(timeout=1)
         msg5b = queue.get(timeout=1)
@@ -304,6 +350,33 @@ class MessageTests(TestBase010):
             self.fail("Got unexpected message: " + extra.body)
         except Empty: None
 
+        self.assertEqual(0, session2.queue_query(queue="test-ack-queue").message_count)
+        queueObj.update()
+        self.assertEquals(queueObj.msgDepth, 0)
+        self.assertEquals(queueObj.msgTotalEnqueues, 5)
+        self.assertEquals(queueObj.msgTotalDequeues, 5)
+
+        # Subscribe one last time to keep the queue available, and to verify
+        # that the implied accept worked by verifying no messages have been
+        # returned when session2 is closed.
+        self.session.message_subscribe(queue = "test-ack-queue", destination = "final-checker")
+
+        session2.close(timeout=10)
+
+        # check the statistics - they should not have changed
+        self.assertEqual(0, self.session.queue_query(queue="test-ack-queue").message_count)
+        queueObj.update()
+        self.assertEquals(queueObj.msgDepth, 0)
+        self.assertEquals(queueObj.msgTotalEnqueues, 5)
+        self.assertEquals(queueObj.msgTotalDequeues, 5)
+
+        self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.message, value=0xFFFFFFFFL)
+        self.session.message_flow(destination="final-checker", unit=self.session.credit_unit.byte, value=0xFFFFFFFFL)
+        try:
+            extra = self.session.incoming("final-checker").get(timeout=1)
+            self.fail("Got unexpected message: " + extra.body)
+        except Empty: None
+
     def test_reject(self):
         session = self.session
         session.queue_declare(queue = "q", exclusive=True, auto_delete=True, alternate_exchange="amq.fanout")
@@ -498,6 +571,47 @@ class MessageTests(TestBase010):
 
         msgB = q.get(timeout=10)
 
+    def test_window_stop(self):
+        """
+        Ensure window based flow control reacts to stop correctly
+        """
+        session = self.session
+        #setup subscriber on a test queue
+        session.queue_declare(queue = "q", exclusive=True, auto_delete=True)
+        session.message_subscribe(queue = "q", destination = "c")
+        session.message_set_flow_mode(flow_mode = 1, destination = "c")
+        session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
+
+
+        #send batch of messages to queue
+        for i in range(0, 10):
+            session.message_transfer(message=Message(session.delivery_properties(routing_key="q"), "Message %d" % (i+1)))
+
+        #retrieve all delivered messages
+        q = session.incoming("c")
+        for i in range(0, 5):
+            msg = q.get(timeout = 1)
+            session.receiver._completed.add(msg.id)#TODO: this may be done automatically
+            self.assertDataEquals(session, msg, "Message %d" % (i+1))
+
+        session.message_stop(destination = "c")
+
+        #now send completions, normally used to move window forward,
+        #but after a stop should not do so
+        session.channel.session_completed(session.receiver._completed)
+
+        #check no more messages are sent
+        self.assertEmpty(q)
+
+        #re-establish window and check remaining messages
+        session.message_flow(unit = session.credit_unit.message, value = 5, destination = "c")
+        session.message_flow(unit = session.credit_unit.byte, value = 0xFFFFFFFFL, destination = "c")
+        for i in range(0, 5):
+            msg = q.get(timeout = 1)
+            self.assertDataEquals(session, msg, "Message %d" % (i+6))
+
+
     def test_subscribe_not_acquired(self):
         """
         Test the not-acquired modes works as expected for a simple case
@@ -902,7 +1016,6 @@ class MessageTests(TestBase010):
         assert messages.get(timeout=1).body == "second"
         self.assertEmpty(messages)
 
-
     def assertDataEquals(self, session, msg, expected):
         self.assertEquals(expected, msg.body)
 

Modified: qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/priority.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/priority.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/priority.py (original)
+++ qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/priority.py Thu Oct 20 18:42:46 2011
@@ -33,13 +33,13 @@ class PriorityTests (Base):
     def setup_session(self):
         return self.conn.session()
 
-    def prioritised_delivery(self, priorities, levels=10):
+    def prioritised_delivery(self, priorities, levels=10, key="x-qpid-priorities"):
         """
         Test that message on a queue are delivered in priority order.
         """
         msgs = [Message(content=str(uuid4()), priority = p) for p in priorities]
 
-        snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s}}}}" % levels,
+        snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{'%s':%s}}}}" % (key, levels),
                               durable=self.durable())
         for m in msgs: snd.send(m)
 
@@ -50,16 +50,16 @@ class PriorityTests (Base):
             assert msg.content == expected.content
             self.ssn.acknowledge(msg)
 
-    def fairshare_delivery(self, priorities, default_limit=5, limits=None, levels=10):
+    def fairshare_delivery(self, priorities, default_limit=5, limits=None, levels=10, level_key="x-qpid-priorities", fairshare_key="x-qpid-fairshare"):
         msgs = [Message(content=str(uuid4()), priority = p) for p in priorities]
 
-        limit_policy = "x-qpid-fairshare:%s" % default_limit
+        limit_policy = "'%s':%s" % (fairshare_key, default_limit)
         if limits:
             for k, v in limits.items():
-                limit_policy += ", x-qpid-fairshare-%s:%s" % (k, v)
+                limit_policy += ", '%s-%s':%s" % (fairshare_key, k, v)
 
-        snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{x-qpid-priorities:%s, %s}}}}"
-                              % (levels, limit_policy),
+        snd = self.ssn.sender("priority-queue; {create: sender, delete: receiver, node: {x-declare:{arguments:{'%s':%s, %s}}}}"
+                              % (level_key, levels, limit_policy),
                               durable=self.durable())
         for m in msgs: snd.send(m)
 
@@ -79,12 +79,18 @@ class PriorityTests (Base):
     def test_prioritised_delivery_1(self):
         self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 10)
 
+    def test_prioritised_delivery_with_alias(self):
+        self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 10, key="qpid.priorities")
+
     def test_prioritised_delivery_2(self):
         self.prioritised_delivery(priorities = [8,9,5,1,2,2,3,4,15,7,8,10,10,2], levels = 5)
 
     def test_fairshare_1(self):
         self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,3,10,3,10,10,10,10,10,10,2,3])
 
+    def test_fairshare_with_alias(self):
+        self.fairshare_delivery(priorities = [4,5,3,6,10,10,2,10,2,10,10,1,10,10,10,3,3,3,10,10,2,3], level_key="qpid.priorities", fairshare_key="qpid.fairshare")
+
     def test_fairshare_2(self):
         self.fairshare_delivery(priorities = [10 for i in range(30)])
 

Modified: qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/threshold.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/threshold.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/threshold.py (original)
+++ qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_10/threshold.py Thu Oct 20 18:42:46 2011
@@ -60,3 +60,18 @@ class ThresholdTests (Base):
 
     def test_alert_size_alias(self):
         self.do_threshold_test("x-qpid-maximum-message-size", 15, [Message("msg-%s" % i) for i in range(3)])
+
+    def test_alert_on_alert_queue(self):
+        rcv = self.ssn.receiver("qmf.default.topic/agent.ind.event.org_apache_qpid_broker.queueThresholdExceeded.#; {link:{x-declare:{arguments:{'qpid.alert_count':1}}}}")
+        rcvQMFv1 = self.ssn.receiver("qpid.management/console.event.#; {link:{x-declare:{arguments:{'qpid.alert_count':1}}}}")
+        snd = self.ssn.sender("ttq; {create:always, node: {x-declare:{auto_delete:True,exclusive:True,arguments:{'qpid.alert_count':1}}}}")
+        snd.send(Message("my-message"))
+        queues = []
+        for i in range(2):
+            event = rcv.fetch()
+            schema = event.content[0]["_schema_id"]
+            assert schema["_class_name"] == "queueThresholdExceeded"
+            values = event.content[0]["_values"]
+            queues.append(values["qName"])
+        assert "ttq" in queues, "expected event for ttq (%s)" % (queues)
+

Propchange: qpid/branches/QPID-2519/tests/src/py/qpid_tests/broker_0_9/queue.py
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Oct 20 18:42:46 2011
@@ -1,2 +1,4 @@
 /qpid/branches/qmfv2/qpid/python/tests_0-9/queue.py:902858,902894
+/qpid/branches/qpid-2935/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py:1061302-1072333
 /qpid/branches/qpid.rnr/python/tests_0-9/queue.py:894071-896158
+/qpid/trunk/qpid/tests/src/py/qpid_tests/broker_0_9/queue.py:1072051-1185907

Modified: qpid/branches/QPID-2519/tools/setup.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tools/setup.py?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tools/setup.py (original)
+++ qpid/branches/QPID-2519/tools/setup.py Thu Oct 20 18:42:46 2011
@@ -20,7 +20,7 @@
 from distutils.core import setup
 
 setup(name="qpid-tools",
-      version="0.9",
+      version="0.13",
       author="Apache Qpid",
       author_email="dev@qpid.apache.org",
       scripts=["src/py/qpid-cluster",
@@ -30,7 +30,8 @@ setup(name="qpid-tools",
                "src/py/qpid-queue-stats",
                "src/py/qpid-route",
                "src/py/qpid-stat",
-               "src/py/qpid-tool"],
+               "src/py/qpid-tool",
+               "src/py/qmf-tool"],
       url="http://qpid.apache.org/",
       license="Apache Software License",
       description="Diagnostic and management tools for Apache Qpid brokers.")

Modified: qpid/branches/QPID-2519/tools/src/py/qmf-tool
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tools/src/py/qmf-tool?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tools/src/py/qmf-tool (original)
+++ qpid/branches/QPID-2519/tools/src/py/qmf-tool Thu Oct 20 18:42:46 2011
@@ -31,6 +31,100 @@ from qpid.disp   import Display
 import cqpid
 import qmf2
 
+class OptsAndArgs(object):
+
+  def __init__(self, argv):
+    self.argv = argv
+    self.usage = """qmf-tool [OPTIONS] [<broker-host>[:<port>]]"""
+    self.option_parser = optparse.OptionParser(usage=self.usage)
+    self.conn_group = optparse.OptionGroup(self.option_parser, "Connection Options")
+    self.conn_group.add_option("-u", "--user", action="store", type="string", help="User name for authentication")
+    self.conn_group.add_option("-p", "--password", action="store", type="string", help="Password for authentication")
+    self.conn_group.add_option("-t", "--transport", action="store", type="string",  help="Transport type (tcp, ssl, rdma)")
+    self.conn_group.add_option("-m", "--mechanism", action="store", type="string", help="SASL Mechanism for security")
+    self.conn_group.add_option("-s", "--service", action="store", type="string", default="qpidd", help="SASL Service name")
+    self.conn_group.add_option("--min-ssf", action="store", type="int", metavar="<n>", help="Minimum acceptable security strength factor")
+    self.conn_group.add_option("--max-ssf", action="store", type="int", metavar="<n>", help="Maximum acceptable security strength factor")
+    self.conn_group.add_option("--conn-option", action="append", default=[], metavar="<NAME=VALUE>", help="Additional connection option(s)")
+    self.option_parser.add_option_group(self.conn_group)
+
+    self.qmf_group = optparse.OptionGroup(self.option_parser, "QMF Session Options")
+    self.qmf_group.add_option("--domain", action="store", type="string", help="QMF Domain")
+    self.qmf_group.add_option("--agent-age", action="store", type="int", metavar="<n>", help="Time, in minutes, to age out non-communicating agents")
+    self.qmf_group.add_option("--qmf-option", action="append", default=[], metavar="<NAME=VALUE>", help="Additional QMF session option(s)")
+    self.option_parser.add_option_group(self.qmf_group)
+
+  def parse(self):
+    host = "localhost"
+    conn_options = []
+    qmf_options = []
+
+    options, encArgs = self.option_parser.parse_args(args=self.argv)
+    try:
+      encoding = locale.getpreferredencoding()
+      args = [a.decode(encoding) for a in encArgs]
+    except:
+      args = encArgs
+
+    if len(args) > 1:
+      host = args[1]
+
+    if options.user:
+      conn_options.append("username:'%s'" % options.user)
+    if options.password:
+      conn_options.append("password:'%s'" % options.password)
+    if options.transport:
+      conn_options.append("transport:'%s'" % options.transport)
+    if options.mechanism:
+      conn_options.append("sasl_mechanisms:'%s'" % options.mechanism)
+    if options.service:
+      conn_options.append("sasl_service:'%s'" % options.service)
+    if options.min_ssf:
+      conn_options.append("sasl_min_ssf:%d" % options.min_ssf)
+    if options.max_ssf:
+      conn_options.append("sasl_max_ssf:%d" % options.max_ssf)
+    for x in options.conn_option:
+      try:
+        key, val = x.split('=')
+        conn_options.append("%s:%s" % (key, val))
+      except:
+        raise Exception("Improperly formatted text for --conn-option: '%s'" % x)
+
+    if options.domain:
+      qmf_options.append("domain:'%s'" % options.domain)
+    if options.agent_age:
+      qmf_options.append("max-agent-age:%d" % options.agent_age)
+    for x in options.qmf_option:
+      try:
+        key, val = x.split('=')
+        qmf_options.append("%s:%s" % (key, val))
+      except:
+        raise Exception("Improperly formatted text for --qmf-option: '%s'" % x)
+
+    conn_string = '{'
+    first = True
+    for x in conn_options:
+      if first:
+        first = None
+      else:
+        conn_string += ','
+      conn_string += x
+    conn_string += '}'
+
+    qmf_string = '{'
+    first = True
+    for x in qmf_options:
+      if first:
+        first = None
+      else:
+        qmf_string += ','
+      qmf_string += x
+    qmf_string += '}'
+
+    return host, conn_string, qmf_string
+
+
+
 class Mcli(Cmd):
   """ Management Command Interpreter """
 
@@ -55,10 +149,11 @@ class Mcli(Cmd):
     print
     print "Agent Commands:"
     print "    set filter <filter-string> - Filter the list of agents"
-    print "    show filter                - Show the agent filter currently in effect"
     print "    list agents                - Print a list of the known Agents"
-    print "    show agent <item-number>   - Print detailed information about an Agent"
     print "    set default <item-number>  - Set the default agent for operations"
+    print "    show filter                - Show the agent filter currently in effect"
+    print "    show agent <item-number>   - Print detailed information about an Agent"
+    print "    show options               - Show option strings used in the QMF session"
     print
     print "Schema Commands:"
     print "    list packages                            - Print a list of packages supported by the default agent"
@@ -112,7 +207,7 @@ class Mcli(Cmd):
   def complete_show(self, text, line, begidx, endidx):
     tokens = split(line[:begidx])
     if len(tokens) == 1:
-      return [i for i in ('filter', 'agent ', 'class ') if i.startswith(text)]
+      return [i for i in ('options', 'filter', 'agent ', 'class ') if i.startswith(text)]
     return []
 
   def do_show(self, data):
@@ -175,13 +270,15 @@ class Mcli(Cmd):
 class QmfData:
   """
   """
-  def __init__(self, disp, url):
+  def __init__(self, disp, url, conn_options, qmf_options):
     self.disp = disp
     self.url = url
+    self.conn_options = conn_options
+    self.qmf_options = qmf_options
     self.agent_filter = '[]'
-    self.connection = cqpid.Connection(self.url)
+    self.connection = cqpid.Connection(self.url, self.conn_options)
     self.connection.open()
-    self.session = qmf2.ConsoleSession(self.connection)
+    self.session = qmf2.ConsoleSession(self.connection, self.qmf_options)
     self.session.setAgentFilter(self.agent_filter)
     self.session.open()
     self.lock = Lock()
@@ -239,6 +336,12 @@ class QmfData:
       print "What do you want to show?  Type 'help' for more information."
       return
 
+    if tokens[0] == 'options':
+      print "Options used in this session:"
+      print "  Connection Options : %s" % self.scrubConnOptions()
+      print "  QMF Session Options: %s" % self.qmf_options
+      return
+
     if tokens[0] == 'agent':
       self.showAgent(tokens[1:])
       return
@@ -636,32 +739,33 @@ class QmfData:
         first = None
     return result
 
-def Usage():
-  print "Usage:  qpid-tool [[<username>/<password>@]<target-host>[:<tcp-port>]]"
-  print
+  def scrubConnOptions(self):
+    pw = self.conn_options.find('password:')
+    if pw < 0:
+      return self.conn_options
+    scrubbed = self.conn_options[:pw + 9] + "***"
+    delim = self.conn_options[pw:].find(',')
+    if delim < 0:
+      delim = self.conn_options[pw:].find('}')
+    scrubbed += self.conn_options[pw + delim:]
+    return scrubbed
+
 
 #=========================================================
 # Main Program
 #=========================================================
-
-# Get host name and port if specified on the command line
-cargs = sys.argv[1:]
-_host = "localhost"
-
-if len(cargs) > 0:
-  _host = cargs[0]
-
-if _host[0] == '-':
-  Usage()
-  if _host != '-h' and _host != "--help":
-    print "qpid-tool: error: no such option:", _host
+try:
+  oa = OptsAndArgs(sys.argv)
+  host, conn_options, qmf_options = oa.parse()
+except Exception, e:
+  print "Parse Error: %s" % e
   sys.exit(1)
 
 disp = Display()
 
 # Attempt to make a connection to the target broker
 try:
-  data = QmfData(disp, _host)
+  data = QmfData(disp, host, conn_options, qmf_options)
 except Exception, e:
   if str(e).find("Exchange not found") != -1:
     print "Management not enabled on broker:  Use '-m yes' option on broker startup."

Modified: qpid/branches/QPID-2519/tools/src/py/qpid-cluster
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tools/src/py/qpid-cluster?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tools/src/py/qpid-cluster (original)
+++ qpid/branches/QPID-2519/tools/src/py/qpid-cluster Thu Oct 20 18:42:46 2011
@@ -247,7 +247,7 @@ def main(argv=None):
         parser.add_option("-k", "--all-stop", action="store_true", default=False, help="Shut down the whole cluster")
         parser.add_option("-f", "--force", action="store_true", default=False, help="Suppress the 'are you sure' prompt")
         parser.add_option("-n", "--numeric", action="store_true", default=False, help="Don't resolve names")
-	
+
         opts, args = parser.parse_args(args=argv)
 
         if args:
@@ -268,12 +268,12 @@ def main(argv=None):
 
         if opts.del_connection:
             config._delConn = opts.del_connection
-            if len(config._delConn.split(":")) != 2: 
+            if len(config._delConn.split(":")) != 2:
                 parser.error("Member ID must be of form: <host or ip>:<number>")
 
         if opts.stop:
-            config._stopID = opts.stop
-            if len(config._stopId.split(":")) != 2: 
+            config._stopId = opts.stop
+            if len(config._stopId.split(":")) != 2:
                 parser.error("Member ID must be of form: <host or ip>:<number>")
 
         config._stopAll = opts.all_stop
@@ -303,6 +303,7 @@ def main(argv=None):
 
         bm.Disconnect()
     except Exception, e:
+        raise
         print str(e)
         return 1
 

Modified: qpid/branches/QPID-2519/tools/src/py/qpid-config
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tools/src/py/qpid-config?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tools/src/py/qpid-config (original)
+++ qpid/branches/QPID-2519/tools/src/py/qpid-config Thu Oct 20 18:42:46 2011
@@ -39,17 +39,12 @@ Usage:  qpid-config [OPTIONS]
         qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]"""
 
 description = """
-ADDRESS syntax:
-
-      [username/password@] hostname
-      ip-address [:<port>]
-
 Examples:
 
 $ qpid-config add queue q
-$ qpid-config add exchange direct d localhost:5672
-$ qpid-config exchanges 10.1.1.7:10000
-$ qpid-config queues guest/guest@broker-host:10000
+$ qpid-config add exchange direct d -a localhost:5672
+$ qpid-config exchanges -a 10.1.1.7:10000
+$ qpid-config queues -a guest/guest@broker-host:10000
 
 Add Exchange <type> values:
 
@@ -80,6 +75,7 @@ class Config:
         self._recursive         = False
         self._host              = "localhost"
         self._connTimeout       = 10
+        self._ignoreDefault     = False
         self._altern_ex         = None
         self._passive           = False
         self._durable           = False
@@ -97,6 +93,14 @@ class Config:
         self._eventGeneration   = None
         self._file              = None
         self._sasl_mechanism    = None
+        self._flowStopCount     = None
+        self._flowResumeCount   = None
+        self._flowStopSize      = None
+        self._flowResumeSize    = None
+        self._msgGroupHeader    = None
+        self._sharedMsgGroup    = False
+        self._extra_arguments   = []
+        self._returnCode        = 0
 
 config = Config()
 
@@ -111,6 +115,20 @@ LVQNB = "qpid.last_value_queue_no_browse
 MSG_SEQUENCE = "qpid.msg_sequence"
 IVE = "qpid.ive"
 QUEUE_EVENT_GENERATION = "qpid.queue_event_generation"
+FLOW_STOP_COUNT   = "qpid.flow_stop_count"
+FLOW_RESUME_COUNT = "qpid.flow_resume_count"
+FLOW_STOP_SIZE    = "qpid.flow_stop_size"
+FLOW_RESUME_SIZE  = "qpid.flow_resume_size"
+MSG_GROUP_HDR_KEY = "qpid.group_header_key"
+SHARED_MSG_GROUP  = "qpid.shared_msg_group"
+#There are various arguments to declare that have specific program
+#options in this utility. However there is now a generic mechanism for
+#passing arguments as well. The SPECIAL_ARGS list contains the
+#arguments for which there are specific program options defined
+#i.e. the arguments for which there is special processing on add and
+#list
+SPECIAL_ARGS=[FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,LVQ,LVQNB,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
+              MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP]
 
 class JHelpFormatter(IndentedHelpFormatter):
     """Format usage and description without stripping newlines from usage strings
@@ -143,10 +161,14 @@ def OptionsAndArguments(argv):
     group1 = OptionGroup(parser, "General Options")
     group1.add_option("-t", "--timeout", action="store", type="int", default=10, metavar="<secs>", help="Maximum time to wait for broker connection (in seconds)")
     group1.add_option("-b", "--bindings", action="store_true", help="Show bindings in queue or exchange list")
-    group1.add_option("-a", "--broker-addr", action="store", type="string", default="localhost:5672", metavar="<address>", help="Maximum time to wait for broker connection (in seconds)")
+    group1.add_option("-a", "--broker-addr", action="store", type="string", default="localhost:5672", metavar="<address>", help="Address of qpidd broker with syntax: [username/password@] hostname | ip-address [:<port>]")
     group1.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). SASL automatically picks the most secure available mechanism - use this option to override.")
     parser.add_option_group(group1)
 
+    group_ls = OptionGroup(parser, "Options for Listing Exchanges and Queues")
+    group_ls.add_option("--ignore-default", action="store_true", help="Ignore the default exchange in exchange or queue list")
+    parser.add_option_group(group_ls)
+
     group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues")
     group2.add_option("--alternate-exchange", action="store", type="string", metavar="<aexname>", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.")
     group2.add_option("--passive", "--dry-run", action="store_true", help="Do not actually add the exchange or queue, ensure that all parameters and permissions are correct and would allow it to be created.")
@@ -156,12 +178,26 @@ def OptionsAndArguments(argv):
     group3 = OptionGroup(parser, "Options for Adding Queues")
     group3.add_option("--cluster-durable", action="store_true", help="The new queue becomes durable if there is only one functioning cluster node")
     group3.add_option("--file-count", action="store", type="int", default=8, metavar="<n>", help="Number of files in queue's persistence journal")
-    group3.add_option("--file-size", action="store", type="int", default=24, metavar="<n>", help="File size in pages (64Kib/page)")
-    group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Number of files in queue's persistence journal")
-    group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Number of files in queue's persistence journal")
+    group3.add_option("--file-size", action="store", type="int", default=24, metavar="<n>", help="File size in pages (64KiB/page)")
+    group3.add_option("--max-queue-size", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as bytes")
+    group3.add_option("--max-queue-count", action="store", type="int", metavar="<n>", help="Maximum in-memory queue size as a number of messages")
     group3.add_option("--limit-policy", action="store", choices=["none", "reject", "flow-to-disk", "ring", "ring-strict"], metavar="<policy>", help="Action to take when queue limit is reached")
     group3.add_option("--order", action="store", choices=["fifo", "lvq", "lvq-no-browse"], metavar="<ordering>", help="Queue ordering policy")
     group3.add_option("--generate-queue-events", action="store", type="int", metavar="<n>", help="If set to 1, every enqueue will generate an event that can be processed by registered listeners (e.g. for replication). If set to 2, events will be generated for enqueues and dequeues.")
+    group3.add_option("--flow-stop-size", action="store", type="int", metavar="<n>",
+                      help="Turn on sender flow control when the number of queued bytes exceeds this value.")
+    group3.add_option("--flow-resume-size", action="store", type="int", metavar="<n>",
+                      help="Turn off sender flow control when the number of queued bytes drops below this value.")
+    group3.add_option("--flow-stop-count", action="store", type="int", metavar="<n>",
+                      help="Turn on sender flow control when the number of queued messages exceeds this value.")
+    group3.add_option("--flow-resume-count", action="store", type="int", metavar="<n>",
+                      help="Turn off sender flow control when the number of queued messages drops below this value.")
+    group3.add_option("--group-header", action="store", type="string", metavar="<header-name>",
+                      help="Enable message groups. Specify name of header that holds group identifier.")
+    group3.add_option("--shared-groups", action="store_true",
+                      help="Allow message group consumption across multiple consumers.")
+    group3.add_option("--argument", dest="extra_arguments", action="append", default=[],
+                      metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments")
     # no option for declaring an exclusive queue - which can only be used by the session that creates it.
     parser.add_option_group(group3)
 
@@ -173,7 +209,7 @@ def OptionsAndArguments(argv):
     group5 = OptionGroup(parser, "Options for Deleting Queues")
     group5.add_option("--force", action="store_true", help="Force delete of queue even if it's currently used or it's not empty")
     group5.add_option("--force-if-not-empty", action="store_true", help="Force delete of queue even if it's not empty")
-    group5.add_option("--force-if-not-used", action="store_true", help="Force delete of queue even if it's currently used")
+    group5.add_option("--force-if-used", action="store_true", help="Force delete of queue even if it's currently used")
     parser.add_option_group(group5)
 
     group6 = OptionGroup(parser, "Options for Declaring Bindings")
@@ -196,6 +232,8 @@ def OptionsAndArguments(argv):
         config._connTimeout = opts.timeout
         if config._connTimeout == 0:
             config._connTimeout = None
+    if opts.ignore_default:
+        config._ignoreDefault = True
     if opts.alternate_exchange:
         config._altern_ex = opts.alternate_exchange
     if opts.passive:
@@ -210,7 +248,7 @@ def OptionsAndArguments(argv):
         config._fileCount = opts.file_count
     if opts.file_size:
         config._fileSize = opts.file_size
-    if opts.max_queue_size:
+    if opts.max_queue_size != None:
         config._maxQueueSize = opts.max_queue_size
     if opts.max_queue_count:
         config._maxQueueCount = opts.max_queue_count
@@ -229,10 +267,24 @@ def OptionsAndArguments(argv):
         config._if_unused = False
     if opts.force_if_not_empty:
         config._if_empty = False
-    if opts.force_if_not_used:
+    if opts.force_if_used:
         config._if_unused = False
     if opts.sasl_mechanism:
         config._sasl_mechanism = opts.sasl_mechanism
+    if opts.flow_stop_size:
+        config._flowStopSize = opts.flow_stop_size
+    if opts.flow_resume_size:
+        config._flowResumeSize = opts.flow_resume_size
+    if opts.flow_stop_count:
+        config._flowStopCount = opts.flow_stop_count
+    if opts.flow_resume_count:
+        config._flowResumeCount = opts.flow_resume_count
+    if opts.group_header:
+        config._msgGroupHeader = opts.group_header
+    if opts.shared_groups:
+        config._sharedMsgGroup = True
+    if opts.extra_arguments:
+        config._extra_arguments = opts.extra_arguments
     return args
 
 
@@ -323,9 +375,16 @@ class BrokerManager:
         caption1 = "Type      "
         caption2 = "Exchange Name"
         maxNameLen = len(caption2)
+        found = False
         for ex in exchanges:
             if self.match(ex.name, filter):
                 if len(ex.name) > maxNameLen:  maxNameLen = len(ex.name)
+                found = True
+        if not found:
+            global config
+            config._returnCode = 1
+            return
+
         print "%s%-*s  Attributes" % (caption1, maxNameLen, caption2)
         line = ""
         for i in range(((maxNameLen + len(caption1)) / 5) + 5):
@@ -333,9 +392,11 @@ class BrokerManager:
         print line
 
         for ex in exchanges:
+            if config._ignoreDefault and not ex.name: continue
             if self.match(ex.name, filter):
                 print "%-10s%-*s " % (ex.type, maxNameLen, ex.name),
                 args = ex.arguments
+                if not args: args = {}
                 if ex.durable:    print "--durable",
                 if MSG_SEQUENCE in args and args[MSG_SEQUENCE] == 1: print "--sequence",
                 if IVE in args and args[IVE] == 1: print "--ive",
@@ -348,6 +409,7 @@ class BrokerManager:
         bindings  = self.qmf.getObjects(_class="binding", _agent=self.brokerAgent)
         queues    = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
         for ex in exchanges:
+            if config._ignoreDefault and not ex.name: continue
             if self.match(ex.name, filter):
                 print "Exchange '%s' (%s)" % (ex.name, ex.type)
                 for bind in bindings:
@@ -361,12 +423,18 @@ class BrokerManager:
 
     def QueueList(self, filter):
         queues = self.qmf.getObjects(_class="queue", _agent=self.brokerAgent)
-
         caption = "Queue Name"
         maxNameLen = len(caption)
+        found = False
         for q in queues:
             if self.match(q.name, filter):
                 if len(q.name) > maxNameLen:  maxNameLen = len(q.name)
+                found = True
+        if not found:
+            global config
+            config._returnCode = 1
+            return
+
         print "%-*s  Attributes" % (maxNameLen, caption)
         line = ""
         for i in range((maxNameLen / 5) + 5):
@@ -377,21 +445,28 @@ class BrokerManager:
             if self.match(q.name, filter):
                 print "%-*s " % (maxNameLen, q.name),
                 args = q.arguments
+                if not args: args = {}
                 if q.durable:    print "--durable",
                 if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable",
                 if q.autoDelete: print "auto-del",
                 if q.exclusive:  print "excl",
-                if FILESIZE in args: print "--file-size=%d" % args[FILESIZE],
-                if FILECOUNT in args: print "--file-count=%d" % args[FILECOUNT],
-                if MAX_QUEUE_SIZE in args: print "--max-queue-size=%d" % args[MAX_QUEUE_SIZE],
-                if MAX_QUEUE_COUNT in args: print "--max-queue-count=%d" % args[MAX_QUEUE_COUNT],
+                if FILESIZE in args: print "--file-size=%s" % args[FILESIZE],
+                if FILECOUNT in args: print "--file-count=%s" % args[FILECOUNT],
+                if MAX_QUEUE_SIZE in args: print "--max-queue-size=%s" % args[MAX_QUEUE_SIZE],
+                if MAX_QUEUE_COUNT in args: print "--max-queue-count=%s" % args[MAX_QUEUE_COUNT],
                 if POLICY_TYPE in args: print "--limit-policy=%s" % args[POLICY_TYPE].replace("_", "-"),
                 if LVQ in args and args[LVQ] == 1: print "--order lvq",
                 if LVQNB in args and args[LVQNB] == 1: print "--order lvq-no-browse",
-                if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%d" % args[QUEUE_EVENT_GENERATION],
+                if QUEUE_EVENT_GENERATION in args: print "--generate-queue-events=%s" % args[QUEUE_EVENT_GENERATION],
                 if q.altExchange:
                     print "--alternate-exchange=%s" % q._altExchange_.name,
-                print
+                if FLOW_STOP_SIZE in args: print "--flow-stop-size=%s" % args[FLOW_STOP_SIZE],
+                if FLOW_RESUME_SIZE in args: print "--flow-resume-size=%s" % args[FLOW_RESUME_SIZE],
+                if FLOW_STOP_COUNT in args: print "--flow-stop-count=%s" % args[FLOW_STOP_COUNT],
+                if FLOW_RESUME_COUNT in args: print "--flow-resume-count=%s" % args[FLOW_RESUME_COUNT],
+                if MSG_GROUP_HDR_KEY in args: print "--group-header=%s" % args[MSG_GROUP_HDR_KEY],
+                if SHARED_MSG_GROUP in args and args[SHARED_MSG_GROUP] == 1: print "--shared-groups",
+                print " ".join(["--argument %s=%s" % (k, v) for k,v in args.iteritems() if not k in SPECIAL_ARGS])
 
     def QueueListRecurse(self, filter):
         exchanges = self.qmf.getObjects(_class="exchange", _agent=self.brokerAgent)
@@ -407,6 +482,7 @@ class BrokerManager:
                         if ex != None:
                             ename = ex.name
                             if ename == "":
+                                if config._ignoreDefault: continue
                                 ename = "''"
                         print "    bind [%s] => %s" % (bind.bindingKey, ename)
 
@@ -436,11 +512,17 @@ class BrokerManager:
             Usage()
         qname    = args[0]
         declArgs = {}
+        for a in config._extra_arguments:
+            r = a.split("=", 1)
+            if len(r) == 2: value = r[1]
+            else: value = None
+            declArgs[r[0]] = value
+
         if config._durable:
             declArgs[FILECOUNT] = config._fileCount
             declArgs[FILESIZE]  = config._fileSize
 
-        if config._maxQueueSize:
+        if config._maxQueueSize != None:
             declArgs[MAX_QUEUE_SIZE]  = config._maxQueueSize
         if config._maxQueueCount:
             declArgs[MAX_QUEUE_COUNT]  = config._maxQueueCount
@@ -468,11 +550,26 @@ class BrokerManager:
         if config._eventGeneration:
             declArgs[QUEUE_EVENT_GENERATION]  = config._eventGeneration
 
+        if config._flowStopSize:
+            declArgs[FLOW_STOP_SIZE]  = config._flowStopSize
+        if config._flowResumeSize:
+            declArgs[FLOW_RESUME_SIZE]  = config._flowResumeSize
+        if config._flowStopCount:
+            declArgs[FLOW_STOP_COUNT]  = config._flowStopCount
+        if config._flowResumeCount:
+            declArgs[FLOW_RESUME_COUNT]  = config._flowResumeCount
+
+        if config._msgGroupHeader:
+            declArgs[MSG_GROUP_HDR_KEY] = config._msgGroupHeader
+        if config._sharedMsgGroup:
+            declArgs[SHARED_MSG_GROUP] = 1
+
         if config._altern_ex != None:
             self.broker.getAmqpSession().queue_declare(queue=qname, alternate_exchange=config._altern_ex, passive=config._passive, durable=config._durable, arguments=declArgs)
         else:
             self.broker.getAmqpSession().queue_declare(queue=qname, passive=config._passive, durable=config._durable, arguments=declArgs)
 
+
     def DelQueue(self, args):
         if len(args) < 1:
             Usage()
@@ -617,7 +714,7 @@ def main(argv=None):
                 print "Failed: %s: %s" % (e.__class__.__name__, e)
                 return 1
 
-    return 0
+    return config._returnCode
 
 if __name__ == "__main__":
         sys.exit(main())

Modified: qpid/branches/QPID-2519/tools/src/py/qpid-printevents
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tools/src/py/qpid-printevents?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tools/src/py/qpid-printevents (original)
+++ qpid/branches/QPID-2519/tools/src/py/qpid-printevents Thu Oct 20 18:42:46 2011
@@ -20,7 +20,7 @@
 #
 
 import os
-import optparse 
+import optparse
 from optparse import IndentedHelpFormatter
 import sys
 import socket
@@ -62,11 +62,11 @@ _usage = "%prog [options] [broker-addr].
 
 _description = \
 """
-Collect and print events from one or more Qpid message brokers.  
+Collect and print events from one or more Qpid message brokers.
 
 If no broker-addr is supplied, %prog connects to 'localhost:5672'.
 
-[broker-addr] syntax:   
+[broker-addr] syntax:
 
           [username/password@] hostname
           ip-address [:<port>]
@@ -91,20 +91,20 @@ def main(argv=None):
   session = Session(console, rcvObjects=False, rcvHeartbeats=options.heartbeats, manageConnections=True)
   brokers = []
   try:
-    for host in arguments:
-      brokers.append(session.addBroker(host, None, options.sasl_mechanism))
-
-      while (True):
-        sleep(10)
-
-  except KeyboardInterrupt:
-      print
-      return 0
-
-  except Exception, e:
-      print "Failed: %s - %s" % (e.__class__.__name__, e)
-      return 1
-
+    try:
+      for host in arguments:
+        brokers.append(session.addBroker(host, None, options.sasl_mechanism))
+
+        while (True):
+          sleep(10)
+
+    except KeyboardInterrupt:
+        print
+        return 0
+
+    except Exception, e:
+        print "Failed: %s - %s" % (e.__class__.__name__, e)
+        return 1
   finally:
     while len(brokers):
       b = brokers.pop()

Modified: qpid/branches/QPID-2519/tools/src/py/qpid-route
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tools/src/py/qpid-route?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tools/src/py/qpid-route (original)
+++ qpid/branches/QPID-2519/tools/src/py/qpid-route Thu Oct 20 18:42:46 2011
@@ -27,18 +27,18 @@ import locale
 from qmf.console import Session, BrokerURL
 
 usage = """
-Usage:  qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]
+Usage:  qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list] [mechanism]
         qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>
 
         qpid-route [OPTIONS] route add   <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list] [mechanism]
         qpid-route [OPTIONS] route del   <dest-broker> <src-broker> <exchange> <routing-key>
-        qpid-route [OPTIONS] queue add   <dest-broker> <src-broker> <exchange> <queue>
+        qpid-route [OPTIONS] queue add   <dest-broker> <src-broker> <exchange> <queue> [mechanism]
         qpid-route [OPTIONS] queue del   <dest-broker> <src-broker> <exchange> <queue>
         qpid-route [OPTIONS] route list  [<dest-broker>]
         qpid-route [OPTIONS] route flush [<dest-broker>]
         qpid-route [OPTIONS] route map   [<broker>]
 
-        qpid-route [OPTIONS] link add  <dest-broker> <src-broker>
+        qpid-route [OPTIONS] link add  <dest-broker> <src-broker> [mechanism]
         qpid-route [OPTIONS] link del  <dest-broker> <src-broker>
         qpid-route [OPTIONS] link list [<dest-broker>]"""
 
@@ -61,7 +61,7 @@ class Config:
         self._transport = "tcp"
         self._ack       = 0
         self._connTimeout = 10
-        self._sasl_mechanism = None
+        self._client_sasl_mechanism = None
 
 config = Config()
 
@@ -95,7 +95,7 @@ def OptionsAndArguments(argv):
     parser.add_option("--ack", action="store", type="int", metavar="<n>", help="Acknowledge transfers over the bridge in batches of N")
     parser.add_option("-t", "--transport", action="store", type="string", default="tcp", metavar="<transport>", help="Transport to use for links, defaults to tcp")
 
-    parser.add_option("--sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.")
+    parser.add_option("--client-sasl-mechanism", action="store", type="string", metavar="<mech>", help="SASL mechanism for authentication (e.g. EXTERNAL, ANONYMOUS, PLAIN, CRAM-MD, DIGEST-MD5, GSSAPI). Used when the client connects to the destination broker (not for authentication between the source and destination brokers - that is specified using the [mechanisms] argument to 'add route'). SASL automatically picks the most secure available mechanism - use this option to override.")
 
     opts, encArgs = parser.parse_args(args=argv)
 
@@ -131,8 +131,8 @@ def OptionsAndArguments(argv):
     if opts.ack:
         config._ack = opts.ack
 
-    if opts.sasl_mechanism:
-        config._sasl_mechanism = opts.sasl_mechanism
+    if opts.client_sasl_mechanism:
+        config._client_sasl_mechanism = opts.client_sasl_mechanism
 
     return args
 
@@ -143,7 +143,7 @@ class RouteManager:
         self.local = BrokerURL(localBroker)
         self.remote  = None
         self.qmf = Session()
-        self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._sasl_mechanism)
+        self.broker = self.qmf.addBroker(localBroker, config._connTimeout, config._client_sasl_mechanism)
         self.broker._waitForStable()
         self.agent = self.broker.getBrokerAgent()
 
@@ -166,7 +166,7 @@ class RouteManager:
                 return link
         return None
 
-    def addLink(self, remoteBroker, mech="PLAIN"):
+    def addLink(self, remoteBroker, interbroker_mechanism=""):
         self.remote = BrokerURL(remoteBroker)
         if self.local.match(self.remote.host, self.remote.port):
             raise Exception("Linking broker to itself is not permitted")
@@ -176,7 +176,7 @@ class RouteManager:
         link = self.getLink()
         if link == None:
             res = broker.connect(self.remote.host, self.remote.port, config._durable,
-                                 mech, self.remote.authName or "", self.remote.authPass or "",
+                                 interbroker_mechanism, self.remote.authName or "", self.remote.authPass or "",
                                  config._transport)
             if config._verbose:
                 print "Connect method returned:", res.status, res.text
@@ -217,11 +217,11 @@ class RouteManager:
             added = False
             links = self.qmf.getObjects(_class="link")
             for link in links:
-                url = BrokerURL("%s:%d" % (link.host, link.port))
+                url = BrokerURL(host=link.host, port=link.port)
                 if url.name() not in self.brokerList:
                     print "    %s..." % url.name(),
                     try:
-                        b = self.qmf.addBroker("%s:%d" % (link.host, link.port), config._connTimeout)
+                        b = self.qmf.addBroker(url, config._connTimeout)
                         self.brokerList[url.name()] = b
                         added = True
                         print "Ok"
@@ -245,7 +245,7 @@ class RouteManager:
             for bridge in bridges:
                 if bridge.src == ex:
                     link = bridge._linkRef_
-                    fromUrl = "%s:%s" % (link.host, link.port)
+                    fromUrl = BrokerURL(host=link.host, port=link.port)
                     toUrl = bridge.getBroker().getUrl()
                     found = False
                     for pair in pairs:
@@ -295,11 +295,11 @@ class RouteManager:
             if b[0] != self.local.name():
                 self.qmf.delBroker(b[1])
 
-    def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, mech="PLAIN", dynamic=False):
+    def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, interbroker_mechanism="", dynamic=False):
         if dynamic and config._srclocal:
             raise Exception("--src-local is not permitted on dynamic routes")
 
-        self.addLink(remoteBroker, mech)
+        self.addLink(remoteBroker, interbroker_mechanism)
         link = self.getLink()
         if link == None:
             raise Exception("Link failed to create")
@@ -320,8 +320,8 @@ class RouteManager:
         if config._verbose:
             print "Bridge method returned:", res.status, res.text
 
-    def addQueueRoute(self, remoteBroker, exchange, queue):
-        self.addLink(remoteBroker)
+    def addQueueRoute(self, remoteBroker, interbroker_mechanism, exchange, queue ):
+        self.addLink(remoteBroker, interbroker_mechanism)
         link = self.getLink()
         if link == None:
             raise Exception("Link failed to create")
@@ -504,10 +504,12 @@ def main(argv=None):
         rm = RouteManager(localBroker)
         if group == "link":
             if cmd == "add":
-                if nargs != 4:
+                if nargs < 3 or nargs > 5:
                     Usage()
                     return(-1)
-                rm.addLink(remoteBroker)
+                interbroker_mechanism = ""
+                if nargs > 4: interbroker_mechanism = args[4]
+                rm.addLink(remoteBroker, interbroker_mechanism)
             elif cmd == "del":
                 if nargs != 4:
                     Usage()
@@ -518,16 +520,17 @@ def main(argv=None):
 
         elif group == "dynamic":
             if cmd == "add":
-                if nargs < 5 or nargs > 7:
+                if nargs < 5 or nargs > 8:
                     Usage()
                     return(-1)
 
                 tag = ""
                 excludes = ""
-                mech = "PLAIN"
+                interbroker_mechanism = ""
                 if nargs > 5: tag = args[5]
                 if nargs > 6: excludes = args[6]
-                rm.addRoute(remoteBroker, args[4], "", tag, excludes, mech, dynamic=True)
+                if nargs > 7: interbroker_mechanism = args[7]
+                rm.addRoute(remoteBroker, args[4], "", tag, excludes, interbroker_mechanism, dynamic=True)
             elif cmd == "del":
                 if nargs != 5:
                     Usage()
@@ -543,11 +546,11 @@ def main(argv=None):
 
                 tag = ""
                 excludes = ""
-                mech = "PLAIN"
+                interbroker_mechanism = ""
                 if nargs > 6: tag = args[6]
                 if nargs > 7: excludes = args[7]
-                if nargs > 8: mech = args[8]
-                rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, mech, dynamic=False)
+                if nargs > 8: interbroker_mechanism = args[8]
+                rm.addRoute(remoteBroker, args[4], args[5], tag, excludes, interbroker_mechanism, dynamic=False)
             elif cmd == "del":
                 if nargs != 6:
                     Usage()
@@ -565,16 +568,21 @@ def main(argv=None):
                     return(-1)
 
         elif group == "queue":
-            if nargs != 6:
+            if nargs < 6 or nargs > 7:
                 Usage()
                 return(-1)
             if cmd == "add":
-                rm.addQueueRoute(remoteBroker, exchange=args[4], queue=args[5])
+                interbroker_mechanism = ""
+                if nargs > 6: interbroker_mechanism = args[6]
+                rm.addQueueRoute(remoteBroker, interbroker_mechanism, exchange=args[4], queue=args[5] )
             elif cmd == "del":
                 rm.delQueueRoute(remoteBroker, exchange=args[4], queue=args[5])
             else:
                 Usage()
                 return(-1)
+        else:
+            Usage()
+            return(-1)
 
     except Exception,e:
         if rm:

Modified: qpid/branches/QPID-2519/tools/src/py/qpid-tool
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/tools/src/py/qpid-tool?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/tools/src/py/qpid-tool (original)
+++ qpid/branches/QPID-2519/tools/src/py/qpid-tool Thu Oct 20 18:42:46 2011
@@ -259,7 +259,24 @@ class QmfData(Console):
       return
     displayId = long(tokens[0])
     methodName = tokens[1]
-    args = tokens[2:]
+    args = []
+    for arg in tokens[2:]:
+      ##
+      ## If the argument is a map, list, boolean, integer, or floating (one decimal point),
+      ## run it through the Python evaluator so it is converted to the correct type.
+      ##
+      ## TODO: use a regex for this instead of this convoluted logic,
+      ## or even consider passing all args through eval() [which would
+      ## be a minor change to the interface as string args would then
+      ## always need to be quoted as strings within a map/list would
+      ## now]
+      if arg[0] == '{' or arg[0] == '[' or arg[0] == '"' or arg[0] == '\'' or arg == "True" or arg == "False" or \
+            ((arg.count('.') < 2 and (arg.count('-') == 0 or \
+            (arg.count('-') == 1 and  arg[0] == '-')) and \
+            arg.replace('.','').replace('-','').isdigit())):
+         args.append(eval(arg))
+      else:
+         args.append(arg)
 
     obj = None
     try:
@@ -333,7 +350,7 @@ class QmfData(Console):
              self.notNone(prop.unit), notes, self.notNone(prop.desc))
       rows.append(row)
     for stat in schema.getStatistics():
-      row = (stat.name, self.typeName(stat.type), "", self.notNone(prop.unit), "", self.notNone(prop.desc))
+      row = (stat.name, self.typeName(stat.type), "", self.notNone(stat.unit), "", self.notNone(stat.desc))
       rows.append(row)
     self.disp.table(title, heads, rows)
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org