You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC

svn commit: r1187375 [17/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/cluster_tests.py?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/cluster_tests.py (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/cluster_tests.py Fri Oct 21 14:42:12 2011
@@ -18,12 +18,13 @@
 # under the License.
 #
 
-import os, signal, sys, time, imp, re, subprocess, glob, cluster_test_logs
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging
+import cluster_test_logs
 from qpid import datatypes, messaging
 from brokertest import *
 from qpid.harness import Skipped
-from qpid.messaging import Message, Empty
-from threading import Thread, Lock
+from qpid.messaging import Message, Empty, Disposition, REJECTED, util
+from threading import Thread, Lock, Condition
 from logging import getLogger
 from itertools import chain
 from tempfile import NamedTemporaryFile
@@ -96,9 +97,15 @@ class ShortTests(BrokerTest):
             destination="amq.direct",
             message=qpid.datatypes.Message(props, "content"))
 
+        # Try message with TTL and differnet headers/properties
+        cluster[0].send_message("q", Message(durable=True, ttl=100000))
+        cluster[0].send_message("q", Message(durable=True, properties={}, ttl=100000))
+        cluster[0].send_message("q", Message(durable=True, properties={"x":10}, ttl=100000))
+
         # Now update a new member and compare their dumps.
         cluster.start(args=["--test-store-dump", "updatee.dump"])
         assert readfile("direct.dump") == readfile("updatee.dump")
+
         os.remove("direct.dump")
         os.remove("updatee.dump")
 
@@ -108,19 +115,22 @@ class ShortTests(BrokerTest):
         acl=os.path.join(os.getcwd(), "policy.acl")
         aclf=file(acl,"w")
         aclf.write("""
-acl deny zag@QPID create queue
-acl allow all all
+acl allow zig@QPID all all
+acl deny all all
 """)
         aclf.close()
-        cluster = self.cluster(2, args=["--auth", "yes",
+        cluster = self.cluster(1, args=["--auth", "yes",
                                         "--sasl-config", sasl_config,
                                         "--load-module", os.getenv("ACL_LIB"),
                                         "--acl-file", acl])
 
         # Valid user/password, ensure queue is created.
         c = cluster[0].connect(username="zig", password="zig")
-        c.session().sender("ziggy;{create:always}")
+        c.session().sender("ziggy;{create:always,node:{x-declare:{exclusive:true}}}")
         c.close()
+        cluster.start()                 # Start second node.
+
+        # Check queue is created on second node.
         c = cluster[1].connect(username="zig", password="zig")
         c.session().receiver("ziggy;{assert:always}")
         c.close()
@@ -149,7 +159,7 @@ acl allow all all
             self.fail("Expected exception")
         except messaging.exceptions.UnauthorizedAccess: pass
         # make sure the queue was not created at the other node.
-        c = cluster[0].connect(username="zag", password="zag")
+        c = cluster[1].connect(username="zig", password="zig")
         try:
             s = c.session()
             s.sender("zaggy;{assert:always}")
@@ -157,6 +167,35 @@ acl allow all all
             self.fail("Expected exception")
         except messaging.exceptions.NotFound: pass
 
+    def test_sasl_join(self):
+        """Verify SASL authentication between brokers when joining a cluster."""
+        sasl_config=os.path.join(self.rootdir, "sasl_config")
+        # Test with a valid username/password
+        cluster = self.cluster(1, args=["--auth", "yes",
+                                        "--sasl-config", sasl_config,
+                                        "--load-module", os.getenv("ACL_LIB"),
+                                        "--cluster-username=zig",
+                                        "--cluster-password=zig",
+                                        "--cluster-mechanism=PLAIN"
+                                        ])
+        cluster.start()
+        cluster.ready()
+        c = cluster[1].connect(username="zag", password="zag")
+
+        # Test with an invalid username/password
+        cluster = self.cluster(1, args=["--auth", "yes",
+                                        "--sasl-config", sasl_config,
+                                        "--load-module", os.getenv("ACL_LIB"),
+                                        "--cluster-username=x",
+                                        "--cluster-password=y",
+                                        "--cluster-mechanism=PLAIN"
+                                        ])
+        try:
+            cluster.start(expect=EXPECT_EXIT_OK)
+            cluster[1].ready()
+            self.fail("Expected exception")
+        except: pass
+
     def test_user_id_update(self):
         """Ensure that user-id of an open session is updated to new cluster members"""
         sasl_config=os.path.join(self.rootdir, "sasl_config")
@@ -246,25 +285,6 @@ acl allow all all
         session1 = cluster[1].connect().session()
         for q in queues: self.assert_browse(session1, "q1", ["foo"])
 
-    def test_dr_no_message(self):
-        """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=655141
-        Joining broker crashes with 'error deliveryRecord no update message'
-        """
-
-        cluster = self.cluster(1)
-        session0 = cluster[0].connect().session()
-        s = session0.sender("q1;{create:always}")
-        s.send(Message("a", ttl=0.05), sync=False)
-        s.send(Message("b", ttl=0.05), sync=False)
-        r1 = session0.receiver("q1")
-        self.assertEqual("a", r1.fetch(timeout=0).content)
-        r2 = session0.receiver("q1;{mode:browse}")
-        self.assertEqual("b", r2.fetch(timeout=0).content)
-        # Leave messages un-acknowledged, let the expire, then start new broker.
-        time.sleep(.1)
-        cluster.start()
-        self.assertRaises(Empty, cluster[1].connect().session().receiver("q1").fetch,0)
-
     def test_route_update(self):
         """Regression test for https://issues.apache.org/jira/browse/QPID-2982
         Links and bridges associated with routes were not replicated on update.
@@ -272,6 +292,7 @@ acl allow all all
         client was attached.
         """
         args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
+        # First broker will be killed.
         cluster0 = self.cluster(1, args=args)
         cluster1 = self.cluster(1, args=args)
         assert 0 == subprocess.call(
@@ -301,9 +322,695 @@ acl allow all all
             qpid_tool.wait()
             scanner.join()
         assert scanner.found
+        # Regression test for https://issues.apache.org/jira/browse/QPID-3235
+        # Inconsistent stats when changing elder.
+
+        # Force a change of elder
+        cluster0.start()
+        cluster0[0].expect=EXPECT_EXIT_FAIL # About to die.
+        cluster0[0].kill()
+        time.sleep(2) # Allow a management interval to pass.
         # Verify logs are consistent
         cluster_test_logs.verify_logs()
 
+    def test_redelivered(self):
+        """Verify that redelivered flag is set correctly on replayed messages"""
+        cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
+        url = "amqp:tcp:%s,tcp:%s" % (cluster[0].host_port(), cluster[1].host_port())
+        queue = "my-queue"
+        cluster[0].declare_queue(queue)
+        self.sender = self.popen(
+            ["qpid-send",
+             "--broker", url,
+             "--address", queue,
+             "--sequence=true",
+             "--send-eos=1",
+             "--messages=100000",
+             "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS)
+             ])
+        self.receiver = self.popen(
+            ["qpid-receive",
+             "--broker", url,
+             "--address", queue,
+             "--ignore-duplicates",
+             "--check-redelivered",
+             "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
+             "--forever"
+             ])
+        time.sleep(1)#give sender enough time to have some messages to replay
+        cluster[0].kill()
+        self.sender.wait()
+        self.receiver.wait()
+        cluster[1].kill()
+
+    class BlockedSend(Thread):
+        """Send a message, send is expected to block.
+        Verify that it does block (for a given timeout), then allow
+        waiting till it unblocks when it is expected to do so."""
+        def __init__(self, sender, msg):
+            self.sender, self.msg = sender, msg
+            self.blocked = True
+            self.condition = Condition()
+            self.timeout = 0.1    # Time to wait for expected results.
+            Thread.__init__(self)
+        def run(self):
+            try:
+                self.sender.send(self.msg, sync=True)
+                self.condition.acquire()
+                try:
+                    self.blocked = False
+                    self.condition.notify()
+                finally: self.condition.release()
+            except Exception,e: print "BlockedSend exception: %s"%e
+        def start(self):
+            Thread.start(self)
+            time.sleep(self.timeout)
+            assert self.blocked         # Expected to block
+        def assert_blocked(self): assert self.blocked
+        def wait(self):                 # Now expecting to unblock
+            self.condition.acquire()
+            try:
+                while self.blocked:
+                    self.condition.wait(self.timeout)
+                    if self.blocked: raise Exception("Timed out waiting for send to unblock")
+            finally: self.condition.release()
+            self.join()
+
+    def queue_flowlimit_test(self, brokers):
+        """Verify that the queue's flowlimit configuration and state are
+        correctly replicated.
+        The brokers argument allows this test to run on single broker,
+        cluster of 2 pre-startd brokers or cluster where second broker
+        starts after queue is in flow control.
+        """
+        # configure a queue with a specific flow limit on first broker
+        ssn0 = brokers.first().connect().session()
+        s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+        brokers.first().startQmf()
+        q1 = [q for q in brokers.first().qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+        oid = q1.getObjectId()
+        self.assertEqual(q1.name, "flq")
+        self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+        assert not q1.flowStopped
+        self.assertEqual(q1.flowStoppedCount, 0)
+
+        # fill the queue on one broker until flow control is active
+        for x in range(5): s0.send(Message(str(x)))
+        sender = ShortTests.BlockedSend(s0, Message(str(6)))
+        sender.start()                  # Tests that sender does block
+        # Verify the broker queue goes into a flowStopped state
+        deadline = time.time() + 1
+        while not q1.flowStopped and time.time() < deadline: q1.update()
+        assert q1.flowStopped
+        self.assertEqual(q1.flowStoppedCount, 1)
+        sender.assert_blocked()         # Still blocked
+
+        # Now verify the  both brokers in cluster have same configuration
+        brokers.second().startQmf()
+        qs = brokers.second().qmf_session.getObjects(_objectId=oid)
+        self.assertEqual(len(qs), 1)
+        q2 = qs[0]
+        self.assertEqual(q2.name, "flq")
+        self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+        assert q2.flowStopped
+        self.assertEqual(q2.flowStoppedCount, 1)
+
+        # now drain the queue using a session to the other broker
+        ssn1 = brokers.second().connect().session()
+        r1 = ssn1.receiver("flq", capacity=6)
+        for x in range(4):
+            r1.fetch(timeout=0)
+            ssn1.acknowledge()
+        sender.wait()                   # Verify no longer blocked.
+
+        # and re-verify state of queue on both brokers
+        q1.update()
+        assert not q1.flowStopped
+        q2.update()
+        assert not q2.flowStopped
+
+        ssn0.connection.close()
+        ssn1.connection.close()
+        cluster_test_logs.verify_logs()
+
+    def test_queue_flowlimit(self):
+        """Test flow limits on a standalone broker"""
+        broker = self.broker()
+        class Brokers:
+            def first(self): return broker
+            def second(self): return broker
+        self.queue_flowlimit_test(Brokers())
+
+    def test_queue_flowlimit_cluster(self):
+        cluster = self.cluster(2)
+        class Brokers:
+            def first(self): return cluster[0]
+            def second(self): return cluster[1]
+        self.queue_flowlimit_test(Brokers())
+
+    def test_queue_flowlimit_cluster_join(self):
+        cluster = self.cluster(1)
+        class Brokers:
+            def first(self): return cluster[0]
+            def second(self):
+                if len(cluster) == 1: cluster.start()
+                return cluster[1]
+        self.queue_flowlimit_test(Brokers())
+
+    def test_queue_flowlimit_replicate(self):
+        """ Verify that a queue which is in flow control BUT has drained BELOW
+        the flow control 'stop' threshold, is correctly replicated when a new
+        broker is added to the cluster.
+        """
+
+        class AsyncSender(Thread):
+            """Send a fixed number of msgs from a sender in a separate thread
+            so it may block without blocking the test.
+            """
+            def __init__(self, broker, address, count=1, size=4):
+                Thread.__init__(self)
+                self.daemon = True
+                self.broker = broker
+                self.queue = address
+                self.count = count
+                self.size = size
+                self.done = False
+
+            def run(self):
+                self.sender = subprocess.Popen(["qpid-send",
+                                                "--capacity=1",
+                                                "--content-size=%s" % self.size,
+                                                "--messages=%s" % self.count,
+                                                "--failover-updates",
+                                                "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
+                                                "--address=%s" % self.queue,
+                                                "--broker=%s" % self.broker.host_port()])
+                self.sender.wait()
+                self.done = True
+
+        cluster = self.cluster(2)
+        # create a queue with rather draconian flow control settings
+        ssn0 = cluster[0].connect().session()
+        s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':100, 'qpid.flow_resume_count':20}}}}")
+
+        # fire off the sending thread to broker[0], and wait until the queue
+        # hits flow control on broker[1]
+        sender = AsyncSender(cluster[0], "flq", count=110);
+        sender.start();
+
+        cluster[1].startQmf()
+        q_obj = [q for q in cluster[1].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+        deadline = time.time() + 10
+        while not q_obj.flowStopped and time.time() < deadline:
+            q_obj.update()
+        assert q_obj.flowStopped
+        assert not sender.done
+        assert q_obj.msgDepth < 110
+
+        # Now drain enough messages on broker[1] to drop below the flow stop
+        # threshold, but not relieve flow control...
+        receiver = subprocess.Popen(["qpid-receive",
+                                     "--messages=15",
+                                     "--timeout=1",
+                                     "--print-content=no",
+                                     "--failover-updates",
+                                     "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
+                                     "--ack-frequency=1",
+                                     "--address=flq",
+                                     "--broker=%s" % cluster[1].host_port()])
+        receiver.wait()
+        q_obj.update()
+        assert q_obj.flowStopped
+        assert not sender.done
+        current_depth = q_obj.msgDepth
+
+        # add a new broker to the cluster, and verify that the queue is in flow
+        # control on that broker
+        cluster.start()
+        cluster[2].startQmf()
+        q_obj = [q for q in cluster[2].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+        assert q_obj.flowStopped
+        assert q_obj.msgDepth == current_depth
+
+        # now drain the queue on broker[2], and verify that the sender becomes
+        # unblocked
+        receiver = subprocess.Popen(["qpid-receive",
+                                     "--messages=95",
+                                     "--timeout=1",
+                                     "--print-content=no",
+                                     "--failover-updates",
+                                     "--connection-options={%s}"%(Cluster.CONNECTION_OPTIONS),
+                                     "--ack-frequency=1",
+                                     "--address=flq",
+                                     "--broker=%s" % cluster[2].host_port()])
+        receiver.wait()
+        q_obj.update()
+        assert not q_obj.flowStopped
+        self.assertEqual(q_obj.msgDepth, 0)
+
+        # verify that the sender has become unblocked
+        sender.join(timeout=5)
+        assert not sender.isAlive()
+        assert sender.done
+
+    def test_blocked_queue_delete(self):
+        """Verify that producers which are blocked on a queue due to flow
+        control are unblocked when that queue is deleted.
+        """
+
+        cluster = self.cluster(2)
+        cluster[0].startQmf()
+        cluster[1].startQmf()
+
+        # configure a queue with a specific flow limit on first broker
+        ssn0 = cluster[0].connect().session()
+        s0 = ssn0.sender("flq; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':5, 'qpid.flow_resume_count':3}}}}")
+        q1 = [q for q in cluster[0].qmf_session.getObjects(_class="queue") if q.name == "flq"][0]
+        oid = q1.getObjectId()
+        self.assertEqual(q1.name, "flq")
+        self.assertEqual(q1.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+        assert not q1.flowStopped
+        self.assertEqual(q1.flowStoppedCount, 0)
+
+        # fill the queue on one broker until flow control is active
+        for x in range(5): s0.send(Message(str(x)))
+        sender = ShortTests.BlockedSend(s0, Message(str(6)))
+        sender.start()                  # Tests that sender does block
+        # Verify the broker queue goes into a flowStopped state
+        deadline = time.time() + 1
+        while not q1.flowStopped and time.time() < deadline: q1.update()
+        assert q1.flowStopped
+        self.assertEqual(q1.flowStoppedCount, 1)
+        sender.assert_blocked()         # Still blocked
+
+        # Now verify the  both brokers in cluster have same configuration
+        qs = cluster[1].qmf_session.getObjects(_objectId=oid)
+        self.assertEqual(len(qs), 1)
+        q2 = qs[0]
+        self.assertEqual(q2.name, "flq")
+        self.assertEqual(q2.arguments, {u'qpid.flow_stop_count': 5L, u'qpid.flow_resume_count': 3L})
+        assert q2.flowStopped
+        self.assertEqual(q2.flowStoppedCount, 1)
+
+        # now delete the blocked queue from other broker
+        ssn1 = cluster[1].connect().session()
+        self.evaluate_address(ssn1, "flq;{delete:always}")
+        sender.wait()                   # Verify no longer blocked.
+
+        ssn0.connection.close()
+        ssn1.connection.close()
+        cluster_test_logs.verify_logs()
+
+
+    def test_alternate_exchange_update(self):
+        """Verify that alternate-exchange on exchanges and queues is propagated to new members of a cluster. """
+        cluster = self.cluster(1)
+        s0 = cluster[0].connect().session()
+        # create alt queue bound to amq.fanout exchange, will be destination for alternate exchanges
+        self.evaluate_address(s0, "alt;{create:always,node:{x-bindings:[{exchange:'amq.fanout',queue:alt}]}}")
+        # create direct exchange ex with alternate-exchange amq.fanout and no queues bound
+        self.evaluate_address(s0, "ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'amq.fanout'}}}")
+        # create queue q with alternate-exchange amq.fanout
+        self.evaluate_address(s0, "q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'amq.fanout'}}}")
+
+        def verify(broker):
+            s = broker.connect().session()
+            # Verify unmatched message goes to ex's alternate.
+            s.sender("ex").send("foo")
+            self.assertEqual("foo", s.receiver("alt").fetch(timeout=0).content)
+            # Verify rejected message goes to q's alternate.
+            s.sender("q").send("bar")
+            msg = s.receiver("q").fetch(timeout=0)
+            self.assertEqual("bar", msg.content)
+            s.acknowledge(msg, Disposition(REJECTED)) # Reject the message
+            self.assertEqual("bar", s.receiver("alt").fetch(timeout=0).content)
+
+        verify(cluster[0])
+        cluster.start()
+        verify(cluster[1])
+
+    def test_binding_order(self):
+        """Regression test for binding order inconsistency in cluster"""
+        cluster = self.cluster(1)
+        c0 = cluster[0].connect()
+        s0 = c0.session()
+        # Declare multiple queues bound to same key on amq.topic
+        def declare(q,max=0):
+            if max: declare = 'x-declare:{arguments:{"qpid.max_count":%d, "qpid.flow_stop_count":0}}'%max
+            else: declare = 'x-declare:{}'
+            bind='x-bindings:[{queue:%s,key:key,exchange:"amq.topic"}]'%(q)
+            s0.sender("%s;{create:always,node:{%s,%s}}" % (q,declare,bind))
+        declare('d',max=4)              # Only one with a limit
+        for q in ['c', 'b','a']: declare(q)
+        # Add a cluster member, send enough messages to exceed the max count
+        cluster.start()
+        try:
+            s = s0.sender('amq.topic/key')
+            for m in xrange(1,6): s.send(Message(str(m)))
+            self.fail("Expected capacity exceeded exception")
+        except messaging.exceptions.TargetCapacityExceeded: pass
+        c1 = cluster[1].connect()
+        s1 = c1.session()
+        s0 = c0.session()        # Old session s0 is broken by exception.
+        # Verify queue contents are consistent.
+        for q in ['a','b','c','d']:
+            self.assertEqual(self.browse(s0, q), self.browse(s1, q))
+        # Verify queue contents are "best effort"
+        for q in ['a','b','c']: self.assert_browse(s1,q,[str(n) for n in xrange(1,6)])
+        self.assert_browse(s1,'d',[str(n) for n in xrange(1,5)])
+
+    def test_deleted_exchange(self):
+        """QPID-3215: cached exchange reference can cause cluster inconsistencies
+        if exchange is deleted/recreated
+        Verify stand-alone case
+        """
+        cluster = self.cluster()
+        # Verify we do not route message via an exchange that has been destroyed.
+        cluster.start()
+        s0 = cluster[0].connect().session()
+        self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}")
+        self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}")
+        send0 = s0.sender("ex/foo")
+        send0.send("foo")
+        self.assert_browse(s0, "q", ["foo"])
+        self.evaluate_address(s0, "ex;{delete:always}")
+        try:
+            send0.send("bar")     # Should fail, exchange is deleted.
+            self.fail("Expected not-found exception")
+        except qpid.messaging.NotFound: pass
+        self.assert_browse(cluster[0].connect().session(), "q", ["foo"])
+
+    def test_deleted_exchange_inconsistent(self):
+        """QPID-3215: cached exchange reference can cause cluster inconsistencies
+        if exchange is deleted/recreated
+
+        Verify cluster inconsistency.
+        """
+        cluster = self.cluster()
+        cluster.start()
+        s0 = cluster[0].connect().session()
+        self.evaluate_address(s0, "ex;{create:always,node:{type:topic}}")
+        self.evaluate_address(s0, "q;{create:always,node:{x-bindings:[{exchange:'ex',queue:q,key:foo}]}}")
+        send0 = s0.sender("ex/foo")
+        send0.send("foo")
+        self.assert_browse(s0, "q", ["foo"])
+
+        cluster.start()
+        s1 = cluster[1].connect().session()
+        self.evaluate_address(s0, "ex;{delete:always}")
+        try:
+            send0.send("bar")
+            self.fail("Expected not-found exception")
+        except qpid.messaging.NotFound: pass
+
+        self.assert_browse(s1, "q", ["foo"])
+
+
+    def test_ttl_consistent(self):
+        """Ensure we don't get inconsistent errors with message that have TTL very close together"""
+        messages = [ Message(str(i), ttl=i/1000.0) for i in xrange(0,1000)]
+        messages.append(Message("x"))
+        cluster = self.cluster(2)
+        sender = cluster[0].connect().session().sender("q;{create:always}")
+
+        def fetch(b):
+            receiver = b.connect().session().receiver("q;{create:always}")
+            while receiver.fetch().content != "x": pass
+
+        for m in messages: sender.send(m, sync=False)
+        for m in messages: sender.send(m, sync=False)
+        fetch(cluster[0])
+        fetch(cluster[1])
+        for m in messages: sender.send(m, sync=False)
+        cluster.start()
+        fetch(cluster[2])
+
+# Some utility code for transaction tests
+XA_RBROLLBACK = 1
+XA_RBTIMEOUT = 2
+XA_OK = 0
+dtx_branch_counter = 0
+
+class DtxStatusException(Exception):
+    def __init__(self, expect, actual):
+        self.expect = expect
+        self.actual = actual
+
+    def str(self):
+        return "DtxStatusException(expect=%s, actual=%s)"%(self.expect, self.actual)
+
+class DtxTestFixture:
+    """Bundle together some common requirements for dtx tests."""
+    def __init__(self, test, broker, name, exclusive=False):
+        self.test = test
+        self.broker = broker
+        self.name = name
+        # Use old API. DTX is not supported in messaging API.
+        self.connection = broker.connect_old()
+        self.session = self.connection.session(name, 1) # 1 second timeout
+        self.queue = self.session.queue_declare(name, exclusive=exclusive)
+        self.session.dtx_select()
+        self.consumer = None
+
+    def xid(self, id=None):
+        if id is None: id = self.name
+        return self.session.xid(format=0, global_id=id)
+
+    def check_status(self, expect, actual):
+        if expect != actual: raise DtxStatusException(expect, actual)
+
+    def start(self, id=None, resume=False):
+        self.check_status(XA_OK, self.session.dtx_start(xid=self.xid(id), resume=resume).status)
+
+    def end(self, id=None, suspend=False):
+        self.check_status(XA_OK, self.session.dtx_end(xid=self.xid(id), suspend=suspend).status)
+
+    def prepare(self, id=None):
+        self.check_status(XA_OK, self.session.dtx_prepare(xid=self.xid(id)).status)
+
+    def commit(self, id=None, one_phase=True):
+        self.check_status(
+            XA_OK, self.session.dtx_commit(xid=self.xid(id), one_phase=one_phase).status)
+
+    def rollback(self, id=None):
+        self.check_status(XA_OK, self.session.dtx_rollback(xid=self.xid(id)).status)
+
+    def set_timeout(self, timeout, id=None):
+        self.session.dtx_set_timeout(xid=self.xid(id),timeout=timeout)
+
+    def send(self, messages):
+       for m in messages:
+           dp=self.session.delivery_properties(routing_key=self.name)
+           mp=self.session.message_properties()
+           self.session.message_transfer(message=qpid.datatypes.Message(dp, mp, m))
+
+    def accept(self):
+        """Accept 1 message from queue"""
+        consumer_tag="%s-consumer"%(self.name)
+        self.session.message_subscribe(queue=self.name, destination=consumer_tag)
+        self.session.message_flow(unit = self.session.credit_unit.message, value = 1, destination = consumer_tag)
+        self.session.message_flow(unit = self.session.credit_unit.byte, value = 0xFFFFFFFFL, destination = consumer_tag)
+        msg = self.session.incoming(consumer_tag).get(timeout=1)
+        self.session.message_cancel(destination=consumer_tag)
+        self.session.message_accept(qpid.datatypes.RangedSet(msg.id))
+        return msg
+
+
+    def verify(self, sessions, messages):
+        for s in sessions:
+            self.test.assert_browse(s, self.name, messages)
+
+class DtxTests(BrokerTest):
+
+    def test_dtx_update(self):
+        """Verify that DTX transaction state is updated to a new broker.
+        Start a collection of transactions, then add a new cluster member,
+        then verify they commit/rollback correctly on the new broker."""
+
+        # Note: multiple test have been bundled into one to avoid the need to start/stop
+        # multiple brokers per test.
+
+        cluster=self.cluster(1)
+        sessions = [cluster[0].connect().session()] # For verify
+
+        # Transaction that will be open when new member joins, then committed.
+        t1 = DtxTestFixture(self, cluster[0], "t1")
+        t1.start()
+        t1.send(["1", "2"])
+        t1.verify(sessions, [])          # Not visible outside of transaction
+
+        # Transaction that will be open when  new member joins, then rolled back.
+        t2 = DtxTestFixture(self, cluster[0], "t2")
+        t2.start()
+        t2.send(["1", "2"])
+
+        # Transaction that will be prepared when new member joins, then committed.
+        t3 = DtxTestFixture(self, cluster[0], "t3")
+        t3.start()
+        t3.send(["1", "2"])
+        t3.end()
+        t3.prepare()
+        t1.verify(sessions, [])          # Not visible outside of transaction
+
+        # Transaction that will be prepared when  new member joins, then rolled back.
+        t4 = DtxTestFixture(self, cluster[0], "t4")
+        t4.start()
+        t4.send(["1", "2"])
+        t4.end()
+        t4.prepare()
+
+        # Transaction using an exclusive queue
+        t5 = DtxTestFixture(self, cluster[0], "t5", exclusive=True)
+        t5.start()
+        t5.send(["1", "2"])
+
+        # Accept messages in a transaction before/after join then commit
+        t6 = DtxTestFixture(self, cluster[0], "t6")
+        t6.send(["a","b","c"])
+        t6.start()
+        self.assertEqual(t6.accept().body, "a");
+
+        # Accept messages in a transaction before/after join then roll back
+        t7 = DtxTestFixture(self, cluster[0], "t7")
+        t7.send(["a","b","c"])
+        t7.start()
+        self.assertEqual(t7.accept().body, "a");
+
+        # Ended, suspended transactions across join.
+        t8 = DtxTestFixture(self, cluster[0], "t8")
+        t8.start(id="1")
+        t8.send(["x"])
+        t8.end(id="1", suspend=True)
+        t8.start(id="2")
+        t8.send(["y"])
+        t8.end(id="2")
+        t8.start()
+        t8.send("z")
+
+
+        # Start new cluster member
+        cluster.start()
+        sessions.append(cluster[1].connect().session())
+
+        # Commit t1
+        t1.send(["3","4"])
+        t1.verify(sessions, [])
+        t1.end()
+        t1.commit(one_phase=True)
+        t1.verify(sessions, ["1","2","3","4"])
+
+        # Rollback t2
+        t2.send(["3","4"])
+        t2.end()
+        t2.rollback()
+        t2.verify(sessions, [])
+
+        # Commit t3
+        t3.commit(one_phase=False)
+        t3.verify(sessions, ["1","2"])
+
+        # Rollback t4
+        t4.rollback()
+        t4.verify(sessions, [])
+
+        # Commit t5
+        t5.send(["3","4"])
+        t5.verify(sessions, [])
+        t5.end()
+        t5.commit(one_phase=True)
+        t5.verify(sessions, ["1","2","3","4"])
+
+        # Commit t6
+        self.assertEqual(t6.accept().body, "b");
+        t6.verify(sessions, ["c"])
+        t6.end()
+        t6.commit(one_phase=True)
+        t6.session.close()              # Make sure they're not requeued by the session.
+        t6.verify(sessions, ["c"])
+
+        # Rollback t7
+        self.assertEqual(t7.accept().body, "b");
+        t7.end()
+        t7.rollback()
+        t7.verify(sessions, ["a", "b", "c"])
+
+        # Resume t8
+        t8.end()
+        t8.commit(one_phase=True)
+        t8.start("1", resume=True)
+        t8.end("1")
+        t8.commit("1", one_phase=True)
+        t8.commit("2", one_phase=True)
+        t8.verify(sessions, ["z", "x","y"])
+
+
+    def test_dtx_failover_rollback(self):
+       """Kill a broker during a transaction, verify we roll back correctly"""
+       cluster=self.cluster(1, expect=EXPECT_EXIT_FAIL)
+       cluster.start(expect=EXPECT_RUNNING)
+
+       # Test unprepared at crash
+       t1 = DtxTestFixture(self, cluster[0], "t1")
+       t1.send(["a"])                   # Not in transaction
+       t1.start()
+       t1.send(["b"])                   # In transaction
+
+       # Test prepared at crash
+       t2 = DtxTestFixture(self, cluster[0], "t2")
+       t2.send(["a"])                   # Not in transaction
+       t2.start()
+       t2.send(["b"])                   # In transaction
+       t2.end()
+       t2.prepare()
+
+       # Crash the broker
+       cluster[0].kill()
+
+       # Transactional changes should not appear
+       s = cluster[1].connect().session();
+       self.assert_browse(s, "t1", ["a"])
+       self.assert_browse(s, "t2", ["a"])
+
+    def test_dtx_timeout(self):
+        """Verify that dtx timeout works"""
+        cluster = self.cluster(1)
+        t1 = DtxTestFixture(self, cluster[0], "t1")
+        t1.start()
+        t1.set_timeout(1)
+        time.sleep(1.1)
+        try:
+            t1.end()
+            self.fail("Expected rollback timeout.")
+        except DtxStatusException, e:
+            self.assertEqual(e.actual, XA_RBTIMEOUT)
+
+class TxTests(BrokerTest):
+
+    def test_tx_update(self):
+        """Verify that transaction state is updated to a new broker"""
+
+        def make_message(session, body=None, key=None, id=None):
+            dp=session.delivery_properties(routing_key=key)
+            mp=session.message_properties(correlation_id=id)
+            return qpid.datatypes.Message(dp, mp, body)
+
+        cluster=self.cluster(1)
+        # Use old API. TX is not supported in messaging API.
+        c = cluster[0].connect_old()
+        s = c.session("tx-session", 1)
+        s.queue_declare(queue="q")
+        # Start transaction
+        s.tx_select()
+        s.message_transfer(message=make_message(s, "1", "q"))
+        # Start new member mid-transaction
+        cluster.start()
+        # Do more work
+        s.message_transfer(message=make_message(s, "2", "q"))
+        # Commit the transaction and verify the results.
+        s.tx_commit()
+        for b in cluster: self.assert_browse(b.connect().session(), "q", ["1","2"])
+
+
 class LongTests(BrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""
     def duration(self):
@@ -316,22 +1023,28 @@ class LongTests(BrokerTest):
 
         # Original cluster will all be killed so expect exit with failure
         cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
+        for b in cluster: b.ready()     # Wait for brokers to be ready
         for b in cluster: ErrorGenerator(b)
 
         # Start sender and receiver threads
         cluster[0].declare_queue("test-queue")
-        sender = NumberedSender(cluster[1], 1000) # Max queue depth
-        receiver = NumberedReceiver(cluster[2], sender)
+        sender = NumberedSender(cluster[0], 1000) # Max queue depth
+        receiver = NumberedReceiver(cluster[0], sender)
         receiver.start()
         sender.start()
+        # Wait for sender & receiver to get up and running
+        retry(lambda: receiver.received > 0)
 
         # Kill original brokers, start new ones for the duration.
         endtime = time.time() + self.duration()
         i = 0
         while time.time() < endtime:
+            sender.sender.assert_running()
+            receiver.receiver.assert_running()
             cluster[i].kill()
             i += 1
             b = cluster.start(expect=EXPECT_EXIT_FAIL)
+            for b in cluster[i:]: b.ready()
             ErrorGenerator(b)
             time.sleep(5)
         sender.stop()
@@ -362,24 +1075,24 @@ class LongTests(BrokerTest):
                             if self.stopped: break
                             self.process = self.broker.test.popen(
                                 self.cmd, expect=EXPECT_UNKNOWN)
-                        finally: self.lock.release()
-                        try: exit = self.process.wait()
+                        finally:
+                            self.lock.release()
+                        try:
+                            exit = self.process.wait()
                         except OSError, e:
-                            # Seems to be a race in wait(), it throws
-                            # "no such process" during test shutdown.
-                            # Doesn't indicate a test error, ignore.
-                            return
+                            # Process may already have been killed by self.stop()
+                            break
                         except Exception, e:
                             self.process.unexpected(
                                 "client of %s: %s"%(self.broker.name, e))
                         self.lock.acquire()
                         try:
-                            # Quit and ignore errors if stopped or expecting failure.
                             if self.stopped: break
                             if exit != 0:
                                 self.process.unexpected(
                                     "client of %s exit code %s"%(self.broker.name, exit))
-                        finally: self.lock.release()
+                        finally:
+                            self.lock.release()
                 except Exception, e:
                     self.error = RethrownException("Error in ClientLoop.run")
 
@@ -401,7 +1114,7 @@ class LongTests(BrokerTest):
         args += ["--log-enable=trace+:management"]
         # Use store if present.
         if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
-        cluster = self.cluster(3, args)
+        cluster = self.cluster(3, args, expect=EXPECT_EXIT_FAIL) # brokers will be killed
 
         clients = [] # Per-broker list of clients that only connect to one broker.
         mclients = [] # Management clients that connect to every broker in the cluster.
@@ -410,10 +1123,12 @@ class LongTests(BrokerTest):
             """Start ordinary clients for a broker."""
             cmds=[
                 ["qpid-tool", "localhost:%s"%(broker.port())],
-                ["qpid-perftest", "--count", 50000,
+                ["qpid-perftest", "--count=5000", "--durable=yes",
                  "--base-name", str(qpid.datatypes.uuid4()), "--port", broker.port()],
-                ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())],
-                ["testagent", "localhost", str(broker.port())] ]
+                ["qpid-txtest", "--queue-base-name", "tx-%s"%str(qpid.datatypes.uuid4()),
+                 "--port", broker.port()],
+                ["qpid-queue-stats", "-a", "localhost:%s" %(broker.port())]
+                 ]
             clients.append([ClientLoop(broker, cmd) for cmd in cmds])
 
         def start_mclients(broker):
@@ -422,7 +1137,8 @@ class LongTests(BrokerTest):
             mclients.append(ClientLoop(broker, cmd))
 
         endtime = time.time() + self.duration()
-        runtime = self.duration() / 4   # First run is longer, use quarter of duration.
+        # For long duration, first run is a quarter of the duration.
+        runtime = min(5.0, self.duration() / 3.0)
         alive = 0                       # First live cluster member
         for i in range(len(cluster)): start_clients(cluster[i])
         start_mclients(cluster[alive])
@@ -433,7 +1149,7 @@ class LongTests(BrokerTest):
             for b in cluster[alive:]: b.ready() # Check if a broker crashed.
             # Kill the first broker, expect the clients to fail.
             b = cluster[alive]
-            b.expect = EXPECT_EXIT_FAIL
+            b.ready()
             b.kill()
             # Stop the brokers clients and all the mclients.
             for c in clients[alive] + mclients:
@@ -443,26 +1159,251 @@ class LongTests(BrokerTest):
             mclients = []
             # Start another broker and clients
             alive += 1
-            cluster.start()
+            cluster.start(expect=EXPECT_EXIT_FAIL)
+            cluster[-1].ready()         # Wait till its ready
             start_clients(cluster[-1])
             start_mclients(cluster[alive])
         for c in chain(mclients, *clients):
             c.stop()
-
+        for b in cluster[alive:]:
+            b.ready() # Verify still alive
+            b.kill()
         # Verify that logs are consistent
         cluster_test_logs.verify_logs()
 
     def test_management_qmf2(self):
         self.test_management(args=["--mgmt-qmf2=yes"])
 
-    def test_connect_consistent(self):   # FIXME aconway 2011-01-18:
+    def test_connect_consistent(self):
         args=["--mgmt-pub-interval=1","--log-enable=trace+:management"]
         cluster = self.cluster(2, args=args)
         end = time.time() + self.duration()
         while (time.time() < end):  # Get a management interval
             for i in xrange(1000): cluster[0].connect().close()
+        cluster_test_logs.verify_logs()
+
+    def test_flowlimit_failover(self):
+        """Test fail-over during continuous send-receive with flow control
+        active.
+        """
+
+        # Original cluster will all be killed so expect exit with failure
+        cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL)
+        for b in cluster: b.ready()     # Wait for brokers to be ready
+
+        # create a queue with rather draconian flow control settings
+        ssn0 = cluster[0].connect().session()
+        s0 = ssn0.sender("test-queue; {create:always, node:{type:queue, x-declare:{arguments:{'qpid.flow_stop_count':2000, 'qpid.flow_resume_count':100}}}}")
+
+        receiver = NumberedReceiver(cluster[0])
+        receiver.start()
+        senders = [NumberedSender(cluster[0]) for i in range(1,3)]
+        for s in senders:
+            s.start()
+        # Wait for senders & receiver to get up and running
+        retry(lambda: receiver.received > 2*senders)
+
+        # Kill original brokers, start new ones for the duration.
+        endtime = time.time() + self.duration();
+        i = 0
+        while time.time() < endtime:
+            for s in senders: s.sender.assert_running()
+            receiver.receiver.assert_running()
+            for b in cluster[i:]: b.ready() # Check if any broker crashed.
+            cluster[i].kill()
+            i += 1
+            b = cluster.start(expect=EXPECT_EXIT_FAIL)
+            time.sleep(5)
+        for s in senders:
+            s.stop()
+        receiver.stop()
+        for i in range(i, len(cluster)): cluster[i].kill()
+
+    def test_ttl_failover(self):
+        """Test that messages with TTL don't cause problems in a cluster with failover"""
+
+        class Client(StoppableThread):
+
+            def __init__(self, broker):
+                StoppableThread.__init__(self)
+                self.connection = broker.connect(reconnect=True)
+                self.auto_fetch_reconnect_urls(self.connection)
+                self.session = self.connection.session()
+
+            def auto_fetch_reconnect_urls(self, conn):
+                """Replacment for qpid.messaging.util version which is noisy"""
+                ssn = conn.session("auto-fetch-reconnect-urls")
+                rcv = ssn.receiver("amq.failover")
+                rcv.capacity = 10
+
+                def main():
+                    while True:
+                        try:
+                            msg = rcv.fetch()
+                            qpid.messaging.util.set_reconnect_urls(conn, msg)
+                            ssn.acknowledge(msg, sync=False)
+                        except messaging.exceptions.LinkClosed: return
+                        except messaging.exceptions.ConnectionError: return
+
+                thread = Thread(name="auto-fetch-reconnect-urls", target=main)
+                thread.setDaemon(True)
+                thread.start()
+
+            def stop(self):
+                StoppableThread.stop(self)
+                self.connection.detach()
+
+        class Sender(Client):
+            def __init__(self, broker, address):
+                Client.__init__(self, broker)
+                self.sent = 0    # Number of messages _reliably_ sent.
+                self.sender = self.session.sender(address, capacity=1000)
+
+            def send_counted(self, ttl):
+                self.sender.send(Message(str(self.sent), ttl=ttl))
+                self.sent += 1
+
+            def run(self):
+                while not self.stopped:
+                    choice = random.randint(0,4)
+                    if choice == 0: self.send_counted(None) # No ttl
+                    elif choice == 1: self.send_counted(100000) # Large ttl
+                    else: # Small ttl, might expire
+                        self.sender.send(Message("", ttl=random.random()/10))
+                self.sender.send(Message("z"), sync=True) # Chaser.
+
+        class Receiver(Client):
+
+            def __init__(self, broker, address):
+                Client.__init__(self, broker)
+                self.received = 0 # Number of  non-empty (reliable) messages received.
+                self.receiver = self.session.receiver(address, capacity=1000)
+            def run(self):
+                try:
+                    while True:
+                        m = self.receiver.fetch(1)
+                        if m.content == "z": break
+                        if m.content:   # Ignore unreliable messages
+                            # Ignore duplicates
+                            if int(m.content) == self.received: self.received += 1
+                except Exception,e: self.error = e
+
+        # def test_ttl_failover
+
+        # Original cluster will all be killed so expect exit with failure
+        # Set small purge interval.
+        cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["--queue-purge-interval=1"])
+        for b in cluster: b.ready()     # Wait for brokers to be ready
+
+        # Python client failover produces noisy WARN logs, disable temporarily
+        logger = logging.getLogger()
+        log_level = logger.getEffectiveLevel()
+        logger.setLevel(logging.ERROR)
+        sender = None
+        receiver = None
+        try:
+            # Start sender and receiver threads
+            receiver = Receiver(cluster[0], "q;{create:always}")
+            receiver.start()
+            sender = Sender(cluster[0], "q;{create:always}")
+            sender.start()
+            # Wait for sender & receiver to get up and running
+            retry(lambda: receiver.received > 0)
+
+            # Kill brokers in a cycle.
+            endtime = time.time() + self.duration()
+            runtime = min(5.0, self.duration() / 4.0)
+            i = 0
+            while time.time() < endtime:
+                for b in cluster[i:]: b.ready() # Check if any broker crashed.
+                cluster[i].kill()
+                i += 1
+                b = cluster.start(expect=EXPECT_EXIT_FAIL)
+                b.ready()
+                time.sleep(runtime)
+            sender.stop()
+            receiver.stop()
+            for b in cluster[i:]:
+                b.ready()               # Check it didn't crash
+                b.kill()
+            self.assertEqual(sender.sent, receiver.received)
             cluster_test_logs.verify_logs()
 
+        finally:
+            # Detach to avoid slow reconnect attempts during shut-down if test fails.
+            if sender: sender.connection.detach()
+            if receiver: receiver.connection.detach()
+            logger.setLevel(log_level)
+
+    def test_msg_group_failover(self):
+        """Test fail-over during continuous send-receive of grouped messages.
+        """
+
+        class GroupedTrafficGenerator(Thread):
+            def __init__(self, url, queue, group_key):
+                Thread.__init__(self)
+                self.url = url
+                self.queue = queue
+                self.group_key = group_key
+                self.status = -1
+
+            def run(self):
+                # generate traffic for approx 10 seconds (2011msgs / 200 per-sec)
+                cmd = ["msg_group_test",
+                       "--broker=%s" % self.url,
+                       "--address=%s" % self.queue,
+                       "--connection-options={%s}" % (Cluster.CONNECTION_OPTIONS),
+                       "--group-key=%s" % self.group_key,
+                       "--receivers=2",
+                       "--senders=3",
+                       "--messages=2011",
+                       "--send-rate=200",
+                       "--capacity=11",
+                       "--ack-frequency=23",
+                       "--allow-duplicates",
+                       "--group-size=37",
+                       "--randomize-group-size",
+                       "--interleave=13"]
+                #      "--trace"]
+                self.generator = Popen( cmd );
+                self.status = self.generator.wait()
+                return self.status
+
+            def results(self):
+                self.join(timeout=30)  # 3x assumed duration
+                if self.isAlive(): return -1
+                return self.status
+
+        # Original cluster will all be killed so expect exit with failure
+        cluster = self.cluster(3, expect=EXPECT_EXIT_FAIL, args=["-t"])
+        for b in cluster: b.ready()     # Wait for brokers to be ready
+
+        # create a queue with rather draconian flow control settings
+        ssn0 = cluster[0].connect().session()
+        q_args = "{'qpid.group_header_key':'group-id', 'qpid.shared_msg_group':1}"
+        s0 = ssn0.sender("test-group-q; {create:always, node:{type:queue, x-declare:{arguments:%s}}}" % q_args)
+
+        # Kill original brokers, start new ones for the duration.
+        endtime = time.time() + self.duration();
+        i = 0
+        while time.time() < endtime:
+            traffic = GroupedTrafficGenerator( cluster[i].host_port(),
+                                               "test-group-q", "group-id" )
+            traffic.start()
+            time.sleep(1)
+
+            for x in range(2):
+                for b in cluster[i:]: b.ready() # Check if any broker crashed.
+                cluster[i].kill()
+                i += 1
+                b = cluster.start(expect=EXPECT_EXIT_FAIL)
+                time.sleep(1)
+
+            # wait for traffic to finish, verify success
+            self.assertEqual(0, traffic.results())
+
+        for i in range(i, len(cluster)): cluster[i].kill()
+
 
 class StoreTests(BrokerTest):
     """

Propchange: qpid/branches/QPID-2519/cpp/src/tests/cluster_tests.py
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Oct 21 14:42:12 2011
@@ -0,0 +1,3 @@
+/qpid/branches/qpid-2935/qpid/cpp/src/tests/cluster_tests.py:1061302-1072333
+/qpid/branches/qpid-3346/qpid/cpp/src/tests/cluster_tests.py:1144319-1179855
+/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py:1072051-1187351

Modified: qpid/branches/QPID-2519/cpp/src/tests/exception_test.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/exception_test.cpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/exception_test.cpp (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/exception_test.cpp Fri Oct 21 14:42:12 2011
@@ -92,32 +92,30 @@ QPID_AUTO_TEST_CASE(TestSessionBusy) {
 }
 
 QPID_AUTO_TEST_CASE(DisconnectedPop) {
-    ProxySessionFixture fix;
-    ProxyConnection c(fix.broker->getPort(Broker::TCP_TRANSPORT));
+    SessionFixture fix;
     fix.session.queueDeclare(arg::queue="q");
     fix.subs.subscribe(fix.lq, "q");
     Catcher<TransportFailure> pop(bind(&LocalQueue::pop, &fix.lq, sys::TIME_SEC));
-    fix.connection.proxy.close();
+    fix.shutdownBroker();
     BOOST_CHECK(pop.join());
 }
 
 QPID_AUTO_TEST_CASE(DisconnectedListen) {
-    ProxySessionFixture fix;
+    SessionFixture fix;
     struct NullListener : public MessageListener {
         void received(Message&) { BOOST_FAIL("Unexpected message"); }
     } l;
-    ProxyConnection c(fix.broker->getPort(Broker::TCP_TRANSPORT));
     fix.session.queueDeclare(arg::queue="q");
     fix.subs.subscribe(l, "q");
 
     Catcher<TransportFailure> runner(bind(&SubscriptionManager::run, boost::ref(fix.subs)));
-    fix.connection.proxy.close();
-    runner.join();    
+    fix.shutdownBroker();
+    runner.join();
     BOOST_CHECK_THROW(fix.session.queueDeclare(arg::queue="x"), TransportFailure);
 }
 
 QPID_AUTO_TEST_CASE(NoSuchQueueTest) {
-    ProxySessionFixture fix;
+    SessionFixture fix;
     ScopedSuppressLogging sl; // Suppress messages for expected errors.
     BOOST_CHECK_THROW(fix.subs.subscribe(fix.lq, "no such queue"), NotFoundException);
 }

Modified: qpid/branches/QPID-2519/cpp/src/tests/federated_topic_test
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/federated_topic_test?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/federated_topic_test (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/federated_topic_test Fri Oct 21 14:42:12 2011
@@ -42,13 +42,12 @@ while getopts "s:m:b:" opt ; do
     esac
 done
 
-MY_DIR=$(dirname $(which $0))
 source ./test_env.sh
 
 trap stop_brokers EXIT
 
 start_broker() {
-    ${MY_DIR}/../qpidd --daemon --port 0 --no-module-dir --no-data-dir --auth no > qpidd.port
+    $QPIDD_EXEC --daemon --port 0 --no-module-dir --no-data-dir --auth no > qpidd.port
 }
 
 start_brokers() {
@@ -76,39 +75,39 @@ subscribe() {
 
     echo Subscriber $1 connecting on $MY_PORT
     LOG="subscriber_$1.log"
-    ${MY_DIR}/topic_listener -p $MY_PORT  > $LOG 2>&1 && rm -f $LOG 
+    ./qpid-topic-listener -p $MY_PORT  > $LOG 2>&1 && rm -f $LOG 
 }
 
 publish() {
-    ${MY_DIR}/topic_publisher --messages $MESSAGES --batches $BATCHES --subscribers $SUBSCRIBERS -p $PORT_A
+    ./qpid-topic-publisher --messages $MESSAGES --batches $BATCHES --subscribers $SUBSCRIBERS -p $PORT_A
 }
 
 setup_routes() {
-    BROKER_A="localhost:$PORT_A"
-    BROKER_B="localhost:$PORT_B"
-    BROKER_C="localhost:$PORT_C"
+    BROKER_A="daffodil:$PORT_A"
+    BROKER_B="daffodil:$PORT_B"
+    BROKER_C="daffodil:$PORT_C"
     if (($VERBOSE)); then
         echo "Establishing routes for topic..."
     fi
-    $PYTHON_COMMANDS/qpid-route route add $BROKER_B $BROKER_A amq.topic topic_control B B
-    $PYTHON_COMMANDS/qpid-route route add $BROKER_C $BROKER_B amq.topic topic_control C C
+    $QPID_ROUTE_EXEC route add $BROKER_B $BROKER_A amq.topic topic_control B B
+    $QPID_ROUTE_EXEC route add $BROKER_C $BROKER_B amq.topic topic_control C C
     if (($VERBOSE)); then
         echo "linked A->B->C"        
     fi
-    $PYTHON_COMMANDS/qpid-route route add $BROKER_B $BROKER_C amq.topic topic_control B B
-    $PYTHON_COMMANDS/qpid-route route add $BROKER_A $BROKER_B amq.topic topic_control A A
+    $QPID_ROUTE_EXEC route add $BROKER_B $BROKER_C amq.topic topic_control B B
+    $QPID_ROUTE_EXEC route add $BROKER_A $BROKER_B amq.topic topic_control A A
     if (($VERBOSE)); then
         echo "linked C->B->A"        
         echo "Establishing routes for response queue..."
     fi
 
-    $PYTHON_COMMANDS/qpid-route route add $BROKER_B $BROKER_C amq.direct response B B
-    $PYTHON_COMMANDS/qpid-route route add $BROKER_A $BROKER_B amq.direct response A A
+    $QPID_ROUTE_EXEC route add $BROKER_B $BROKER_C amq.direct response B B
+    $QPID_ROUTE_EXEC route add $BROKER_A $BROKER_B amq.direct response A A
     if (($VERBOSE)); then
         echo "linked C->B->A"        
         for b in $BROKER_A $BROKER_B $BROKER_C; do 
             echo "Routes for $b"
-            $PYTHON_COMMANDS/qpid-route route list $b
+            $QPID_ROUTE_EXEC route list $b
         done
     fi
 }

Modified: qpid/branches/QPID-2519/cpp/src/tests/federation.py
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/federation.py?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/federation.py (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/federation.py Fri Oct 21 14:42:12 2011
@@ -23,7 +23,7 @@ from qpid.testlib import TestBase010
 from qpid.datatypes import Message
 from qpid.queue import Empty
 from qpid.util import URL
-from time import sleep
+from time import sleep, time
 
 
 class _FedBroker(object):
@@ -111,18 +111,18 @@ class FederationTests(TestBase010):
 
         broker = qmf.getObjects(_class="broker")[0]
         result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
-        self.assertEqual(result.status, 0)
+        self.assertEqual(result.status, 0, result)
 
         link = qmf.getObjects(_class="link")[0]
         result = link.bridge(False, "amq.direct", "amq.direct", "my-key", "", "", False, False, False, 0)
-        self.assertEqual(result.status, 0)
+        self.assertEqual(result.status, 0, result)
 
         bridge = qmf.getObjects(_class="bridge")[0]
         result = bridge.close()
-        self.assertEqual(result.status, 0)
+        self.assertEqual(result.status, 0, result)
 
         result = link.close()
-        self.assertEqual(result.status, 0)
+        self.assertEqual(result.status, 0, result)
 
         self.verify_cleanup()
 
@@ -133,11 +133,11 @@ class FederationTests(TestBase010):
         qmf = self.qmf
         broker = qmf.getObjects(_class="broker")[0]
         result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
-        self.assertEqual(result.status, 0)
+        self.assertEqual(result.status, 0, result)
 
         link = qmf.getObjects(_class="link")[0]
         result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0)
-        self.assertEqual(result.status, 0)
+        self.assertEqual(result.status, 0, result)
 
         bridge = qmf.getObjects(_class="bridge")[0]
 
@@ -165,9 +165,9 @@ class FederationTests(TestBase010):
         except Empty: None
 
         result = bridge.close()
-        self.assertEqual(result.status, 0)
+        self.assertEqual(result.status, 0, result)
         result = link.close()
-        self.assertEqual(result.status, 0)
+        self.assertEqual(result.status, 0, result)
 
         self.verify_cleanup()
 
@@ -178,11 +178,11 @@ class FederationTests(TestBase010):
         qmf = self.qmf
         broker = qmf.getObjects(_class="broker")[0]
         result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
-        self.assertEqual(result.status, 0)
+        self.assertEqual(result.status, 0, result)
 
         link = qmf.getObjects(_class="link")[0]
         result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, True, False, 0)
-        self.assertEqual(result.status, 0)
+        self.assertEqual(result.status, 0, result)
 
         bridge = qmf.getObjects(_class="bridge")[0]
 
@@ -209,9 +209,9 @@ class FederationTests(TestBase010):
         except Empty: None
 
         result = bridge.close()
-        self.assertEqual(result.status, 0)
+        self.assertEqual(result.status, 0, result)
         result = link.close()
-        self.assertEqual(result.status, 0)
+        self.assertEqual(result.status, 0, result)
 
         self.verify_cleanup()
 
@@ -236,11 +236,11 @@ class FederationTests(TestBase010):
         qmf = self.qmf
         broker = qmf.getObjects(_class="broker")[0]
         result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
-        self.assertEqual(result.status, 0)
+        self.assertEqual(result.status, 0, result)
 
         link = qmf.getObjects(_class="link")[0]
         result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1)
-        self.assertEqual(result.status, 0)
+        self.assertEqual(result.status, 0, result)
 
         bridge = qmf.getObjects(_class="bridge")[0]
         sleep(3)
@@ -262,6 +262,63 @@ class FederationTests(TestBase010):
         except Empty: None
 
         result = bridge.close()
+        self.assertEqual(result.status, 0, result)
+        result = link.close()
+        self.assertEqual(result.status, 0, result)
+
+        self.verify_cleanup()
+
+    def test_pull_from_queue_recovery(self):
+        session = self.session
+
+        #setup queue on remote broker and add some messages
+        r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
+        r_session = r_conn.session("test_pull_from_queue_recovery")
+        r_session.queue_declare(queue="my-bridge-queue", auto_delete=True)
+        for i in range(1, 6):
+            dp = r_session.delivery_properties(routing_key="my-bridge-queue")
+            r_session.message_transfer(message=Message(dp, "Message %d" % i))
+
+        #setup queue to receive messages from local broker
+        session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
+        session.exchange_bind(queue="fed1", exchange="amq.fanout")
+        self.subscribe(queue="fed1", destination="f1")
+        queue = session.incoming("f1")
+
+        self.startQmf()
+        qmf = self.qmf
+        broker = qmf.getObjects(_class="broker")[0]
+        result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0, result)
+
+        link = qmf.getObjects(_class="link")[0]
+        result = link.bridge(False, "my-bridge-queue", "amq.fanout", "my-key", "", "", True, False, False, 1)
+        self.assertEqual(result.status, 0, result)
+
+        bridge = qmf.getObjects(_class="bridge")[0]
+        sleep(5)
+        
+        #recreate the remote bridge queue to invalidate the bridge session
+        r_session.queue_delete (queue="my-bridge-queue", if_empty=False, if_unused=False)
+        r_session.queue_declare(queue="my-bridge-queue", auto_delete=True)
+
+        #add some more messages (i.e. after bridge was created)
+        for i in range(6, 11):
+            dp = r_session.delivery_properties(routing_key="my-bridge-queue")
+            r_session.message_transfer(message=Message(dp, "Message %d" % i))
+
+        for i in range(1, 11):
+            try:
+                msg = queue.get(timeout=5)
+                self.assertEqual("Message %d" % i, msg.body)
+            except Empty:
+                self.fail("Failed to find expected message containing 'Message %d'" % i)
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in queue: " + extra.body)
+        except Empty: None
+
+        result = bridge.close()
         self.assertEqual(result.status, 0)
         result = link.close()
         self.assertEqual(result.status, 0)
@@ -649,10 +706,17 @@ class FederationTests(TestBase010):
 
         self.verify_cleanup()
 
-    def test_dynamic_headers(self):
+    def test_dynamic_headers_any(self):
+        self.do_test_dynamic_headers('any')
+
+    def test_dynamic_headers_all(self):
+        self.do_test_dynamic_headers('all')
+
+
+    def do_test_dynamic_headers(self, match_mode):
         session = self.session
         r_conn = self.connect(host=self.remote_host(), port=self.remote_port())
-        r_session = r_conn.session("test_dynamic_headers")
+        r_session = r_conn.session("test_dynamic_headers_%s" % match_mode)
 
         session.exchange_declare(exchange="fed.headers", type="headers")
         r_session.exchange_declare(exchange="fed.headers", type="headers")
@@ -671,7 +735,7 @@ class FederationTests(TestBase010):
         sleep(5)
 
         session.queue_declare(queue="fed1", exclusive=True, auto_delete=True)
-        session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':'any', 'class':'first'})
+        session.exchange_bind(queue="fed1", exchange="fed.headers", binding_key="key1", arguments={'x-match':match_mode, 'class':'first'})
         self.subscribe(queue="fed1", destination="f1")
         queue = session.incoming("f1")
 
@@ -1791,3 +1855,301 @@ class FederationTests(TestBase010):
         if headers:
             return headers[name]
         return None
+
+    def test_dynamic_topic_bounce(self):
+        """ Bounce the connection between federated Topic Exchanges.
+        """
+        class Params:
+            def exchange_type(self): return "topic"
+            def bind_queue(self, ssn, qname, ename):
+                ssn.exchange_bind(queue=qname, exchange=ename,
+                                  binding_key="spud.*")
+            def unbind_queue(self, ssn, qname, ename):
+                ssn.exchange_unbind(queue=qname, exchange=ename, binding_key="spud.*")
+            def delivery_properties(self, ssn):
+                return  ssn.delivery_properties(routing_key="spud.boy")
+
+        self.generic_dynamic_bounce_test(Params())
+
+    def test_dynamic_direct_bounce(self):
+        """ Bounce the connection between federated Direct Exchanges.
+        """
+        class Params:
+            def exchange_type(self): return "direct"
+            def bind_queue(self, ssn, qname, ename):
+                ssn.exchange_bind(queue=qname, exchange=ename, binding_key="spud")
+            def unbind_queue(self, ssn, qname, ename):
+                ssn.exchange_unbind(queue=qname, exchange=ename, binding_key="spud")
+            def delivery_properties(self, ssn):
+                return  ssn.delivery_properties(routing_key="spud")
+        self.generic_dynamic_bounce_test(Params())
+
+    def test_dynamic_fanout_bounce(self):
+        """ Bounce the connection between federated Fanout Exchanges.
+        """
+        class Params:
+            def exchange_type(self): return "fanout"
+            def bind_queue(self, ssn, qname, ename):
+                ssn.exchange_bind(queue=qname, exchange=ename)
+            def unbind_queue(self, ssn, qname, ename):
+                ssn.exchange_unbind(queue=qname, exchange=ename)
+            def delivery_properties(self, ssn):
+                return  ssn.delivery_properties(routing_key="spud")
+        self.generic_dynamic_bounce_test(Params())
+
+    def test_dynamic_headers_bounce(self):
+        """ Bounce the connection between federated Headers Exchanges.
+        """
+        class Params:
+            def exchange_type(self): return "headers"
+            def bind_queue(self, ssn, qname, ename):
+                ssn.exchange_bind(queue=qname, exchange=ename,
+                                  binding_key="spud", arguments={'x-match':'any', 'class':'first'})
+            def unbind_queue(self, ssn, qname, ename):
+                ssn.exchange_unbind(queue=qname, exchange=ename, binding_key="spud")
+            def delivery_properties(self, ssn):
+                return  ssn.message_properties(application_headers={'class':'first'})
+        ## @todo KAG - re-enable once federation bugs with headers exchanges
+        ## are fixed.
+        #self.generic_dynamic_bounce_test(Params())
+        return
+
+
+    def generic_dynamic_bounce_test(self, params):
+        """ Verify that a federated broker can maintain a binding to a local
+        queue using the same key as a remote binding.  Destroy and reconnect
+        the federation link, and verify routes are restored correctly.
+        See QPID-3170.
+        Topology:
+
+        Queue1 <---"Key"---B0<==[Federated Exchange]==>B1---"Key"--->Queue2
+        """
+        session = self.session
+
+        # create the federation
+
+        self.startQmf()
+        qmf = self.qmf
+
+        self._setup_brokers()
+
+        # create exchange on each broker, and retrieve the corresponding
+        # management object for that exchange
+
+        exchanges=[]
+        for _b in self._brokers[0:2]:
+            _b.client_session.exchange_declare(exchange="fedX", type=params.exchange_type())
+            self.assertEqual(_b.client_session.exchange_query(name="fedX").type,
+                             params.exchange_type(), "exchange_declare failed!")
+            # pull the exchange out of qmf...
+            retries = 0
+            my_exchange = None
+            timeout = time() + 10
+            while my_exchange is None and time() <= timeout:
+                objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange")
+                for ooo in objs:
+                    if ooo.name == "fedX":
+                        my_exchange = ooo
+                        break
+            if my_exchange is None:
+                self.fail("QMF failed to find new exchange!")
+            exchanges.append(my_exchange)
+
+        #
+        # on each broker, create a local queue bound to the exchange with the
+        # same key value.
+        #
+
+        self._brokers[0].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True)
+        params.bind_queue(self._brokers[0].client_session, "fedX1", "fedX")
+        self.subscribe(self._brokers[0].client_session, queue="fedX1", destination="f1")
+        queue_0 = self._brokers[0].client_session.incoming("f1")
+
+        self._brokers[1].client_session.queue_declare(queue="fedX1", exclusive=True, auto_delete=True)
+        params.bind_queue(self._brokers[1].client_session, "fedX1", "fedX")
+        self.subscribe(self._brokers[1].client_session, queue="fedX1", destination="f1")
+        queue_1 = self._brokers[1].client_session.incoming("f1")
+
+        # now federate the two brokers
+
+        # connect B0 --> B1
+        result = self._brokers[1].qmf_object.connect(self._brokers[0].host,
+                                                     self._brokers[0].port,
+                                                     False, "PLAIN", "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0)
+
+        # connect B1 --> B0
+        result = self._brokers[0].qmf_object.connect(self._brokers[1].host,
+                                                     self._brokers[1].port,
+                                                     False, "PLAIN", "guest", "guest", "tcp")
+        self.assertEqual(result.status, 0)
+
+        # for each link, bridge the "fedX" exchanges:
+
+        for _l in qmf.getObjects(_class="link"):
+            # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker())))
+            result = _l.bridge(False,  # durable
+                                 "fedX",  # src
+                                 "fedX",  # dst
+                                 "",  # key
+                                 "",  # tag
+                                 "",  # excludes
+                                 False, # srcIsQueue
+                                 False, # srcIsLocal
+                                 True,  # dynamic
+                                 0)     # sync
+            self.assertEqual(result.status, 0)
+
+        # wait for all the inter-broker links to become operational
+        operational = False
+        timeout = time() + 10
+        while not operational and time() <= timeout:
+            operational = True
+            for _l in qmf.getObjects(_class="link"):
+                #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state)))
+                if _l.state != "Operational":
+                    operational = False
+        self.failUnless(operational, "inter-broker links failed to become operational.")
+
+        # @todo - There is no way to determine when the bridge objects become
+        # active.
+
+        # wait until the binding key has propagated to each broker - each
+        # broker should see 2 bindings (1 local, 1 remote)
+
+        binding_counts = [2, 2]
+        self.assertEqual(len(binding_counts), len(exchanges), "Update Test!")
+        for i in range(2):
+            exchanges[i].update()
+            timeout = time() + 10
+            while exchanges[i].bindingCount < binding_counts[i] and time() <= timeout:
+                exchanges[i].update()
+            self.failUnless(exchanges[i].bindingCount == binding_counts[i])
+
+        # send 10 msgs to B0
+        for i in range(1, 11):
+            # dp = self._brokers[0].client_session.delivery_properties(routing_key=params.routing_key())
+            dp = params.delivery_properties(self._brokers[0].client_session)
+            self._brokers[0].client_session.message_transfer(destination="fedX", message=Message(dp, "Message_trp %d" % i))
+
+        # get exactly 10 msgs on B0's local queue and B1's queue
+        for i in range(1, 11):
+            try:
+                msg = queue_0.get(timeout=5)
+                self.assertEqual("Message_trp %d" % i, msg.body)
+                msg = queue_1.get(timeout=5)
+                self.assertEqual("Message_trp %d" % i, msg.body)
+            except Empty:
+                self.fail("Only got %d msgs - expected 10" % i)
+        try:
+            extra = queue_0.get(timeout=1)
+            self.fail("Got unexpected message in queue_0: " + extra.body)
+        except Empty: None
+
+        try:
+            extra = queue_1.get(timeout=1)
+            self.fail("Got unexpected message in queue_1: " + extra.body)
+        except Empty: None
+
+        #
+        # Tear down the bridges between the two exchanges, then wait
+        # for the bindings to be cleaned up
+        #
+
+        for _b in qmf.getObjects(_class="bridge"):
+            result = _b.close()
+            self.assertEqual(result.status, 0)
+
+        binding_counts = [1, 1]
+        self.assertEqual(len(binding_counts), len(exchanges), "Update Test!")
+        for i in range(2):
+            exchanges[i].update()
+            timeout = time() + 10
+            while exchanges[i].bindingCount != binding_counts[i] and time() <= timeout:
+                exchanges[i].update()
+            self.failUnless(exchanges[i].bindingCount == binding_counts[i])
+
+        #
+        # restore the bridges between the two exchanges, and wait for the
+        # bindings to propagate.
+        #
+
+        for _l in qmf.getObjects(_class="link"):
+            # print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.getBroker())))
+            result = _l.bridge(False,  # durable
+                                 "fedX",  # src
+                                 "fedX",  # dst
+                                 "",  # key
+                                 "",  # tag
+                                 "",  # excludes
+                                 False, # srcIsQueue
+                                 False, # srcIsLocal
+                                 True,  # dynamic
+                                 0)     # sync
+            self.assertEqual(result.status, 0)
+
+        binding_counts = [2, 2]
+        self.assertEqual(len(binding_counts), len(exchanges), "Update Test!")
+        for i in range(2):
+            exchanges[i].update()
+            timeout = time() + 10
+            while exchanges[i].bindingCount != binding_counts[i] and time() <= timeout:
+                exchanges[i].update()
+            self.failUnless(exchanges[i].bindingCount == binding_counts[i])
+
+        #
+        # verify traffic flows correctly
+        #
+
+        for i in range(1, 11):
+            #dp = self._brokers[1].client_session.delivery_properties(routing_key=params.routing_key())
+            dp = params.delivery_properties(self._brokers[1].client_session)
+            self._brokers[1].client_session.message_transfer(destination="fedX", message=Message(dp, "Message_trp %d" % i))
+
+        # get exactly 10 msgs on B0's queue and B1's queue
+        for i in range(1, 11):
+            try:
+                msg = queue_0.get(timeout=5)
+                self.assertEqual("Message_trp %d" % i, msg.body)
+                msg = queue_1.get(timeout=5)
+                self.assertEqual("Message_trp %d" % i, msg.body)
+            except Empty:
+                self.fail("Only got %d msgs - expected 10" % i)
+        try:
+            extra = queue_0.get(timeout=1)
+            self.fail("Got unexpected message in queue_0: " + extra.body)
+        except Empty: None
+
+        try:
+            extra = queue_1.get(timeout=1)
+            self.fail("Got unexpected message in queue_1: " + extra.body)
+        except Empty: None
+
+
+        #
+        # cleanup
+        #
+        params.unbind_queue(self._brokers[0].client_session, "fedX1", "fedX")
+        self._brokers[0].client_session.message_cancel(destination="f1")
+        self._brokers[0].client_session.queue_delete(queue="fedX1")
+
+        params.unbind_queue(self._brokers[1].client_session, "fedX1", "fedX")
+        self._brokers[1].client_session.message_cancel(destination="f1")
+        self._brokers[1].client_session.queue_delete(queue="fedX1")
+
+        for _b in qmf.getObjects(_class="bridge"):
+            result = _b.close()
+            self.assertEqual(result.status, 0)
+
+        for _l in qmf.getObjects(_class="link"):
+            result = _l.close()
+            self.assertEqual(result.status, 0)
+
+        for _b in self._brokers[0:2]:
+            _b.client_session.exchange_delete(exchange="fedX")
+
+        self._teardown_brokers()
+
+        self.verify_cleanup()
+
+

Modified: qpid/branches/QPID-2519/cpp/src/tests/python_tests
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/python_tests?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/python_tests (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/python_tests Fri Oct 21 14:42:12 2011
@@ -1,4 +1,4 @@
-#!/bin/sh
+#!/bin/bash
 
 #
 # Licensed to the Apache Software Foundation (ASF) under one

Modified: qpid/branches/QPID-2519/cpp/src/tests/qpid-cluster-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/qpid-cluster-benchmark?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/qpid-cluster-benchmark (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/qpid-cluster-benchmark Fri Oct 21 14:42:12 2011
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -19,21 +19,40 @@
 #
 
 # Benchmark script for comparing cluster performance.
-#PORT=":5555"
-BROKER=`echo $HOSTS | awk '{print $1}'`	# Single broker
-BROKERS=`echo $HOSTS | sed "s/\>/$PORT/g;s/ /,/g"` # Broker URL list
-COUNT=100000
-RATE=20000			# Rate to throttle senders for latency results
-run_test() { echo $*; "$@"; echo; echo; echo; }
 
-# Thruput,  unshared queue
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --no-timestamp -m $COUNT
+# Default values
+PORT="5672"
+COUNT=10000
+FLOW=100	      # Flow control limit on queue depth for latency.
+REPEAT=10
+QUEUES=4
+CLIENTS=3
 
-# Latency
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --connection-options '{tcp-nodelay:true}' -m `expr $COUNT / 2` --send-rate $RATE
+while getopts "p:c:f:r:t:b:q:c" opt; do
+    case $opt in
+	p) PORT=$OPTARG;;
+	c) COUNT=$OPTARG;;
+	f) FLOW=$OPTARG;;
+	r) REPEAT=$OPTARG;;
+	s) SCALE=$OPTARG;;
+	b) BROKERS=$OPTARG;;
+	q) QUEUES=$OPTARG;;
+	c) CLIENTS=$OPTARG;;
+	*) echo "Unknown option"; exit 1;;
+    esac
+done
+
+BROKERS=${BROKERS:-$(echo $HOSTS | sed "s/\>/:$PORT/g;s/ /,/g")} # Broker URL list
+BROKER=`echo $BROKERS | awk -F, '{print $1}'` # First broker
+
+run_test() { echo $*; shift; "$@"; echo; echo; echo; }
 
 # Multiple pubs/subs connect via multiple brokers (active-active)
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKERS --no-timestamp --summarize -s10 -r10 -m `expr $COUNT / 10`
+run_test "multi-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKERS --no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $COUNT
 
 # Multiple pubs/subs connect via single broker (active-passive)
-run_test qpid-cpp-benchmark --repeat 10 -b $BROKER --no-timestamp --summarize -s10 -r10 -m `expr $COUNT / 10`
+run_test "single-host-thruput" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --no-timestamp --summarize -q$QUEUES -s$CLIENTS -r$CLIENTS -m $COUNT
+
+# Latency
+run_test "latency" qpid-cpp-benchmark --repeat $REPEAT -b $BROKER --connection-options '{tcp-nodelay:true}' -m $COUNT --flow-control $FLOW
+

Modified: qpid/branches/QPID-2519/cpp/src/tests/qpid-cpp-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/qpid-cpp-benchmark?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/qpid-cpp-benchmark Fri Oct 21 14:42:12 2011
@@ -77,6 +77,20 @@ def ssh_command(host, command):
     """Convert command into an ssh command on host with quoting"""
     return ["ssh", host] + [posix_quote(arg) for arg in command]
 
+class Clients:
+    def __init__(self): self.clients=[]
+
+    def add(self, client):
+        self.clients.append(client)
+        return client
+
+    def kill(self):
+        for c in self.clients:
+            try: c.kill()
+            except: pass
+
+clients = Clients()
+
 def start_receive(queue, index, opts, ready_queue, broker, host):
     address_opts=["create:receiver"] + opts.receive_option
     if opts.durable: address_opts += ["node:{durable:true}"]
@@ -101,7 +115,7 @@ def start_receive(queue, index, opts, re
     if opts.connection_options:
         command += ["--connection-options",opts.connection_options]
     if host: command = ssh_command(host, command)
-    return Popen(command, stdout=PIPE)
+    return clients.add(Popen(command, stdout=PIPE))
 
 def start_send(queue, opts, broker, host):
     address="%s;{%s}"%(queue,",".join(opts.send_option))
@@ -122,7 +136,7 @@ def start_send(queue, opts, broker, host
     if opts.connection_options:
         command += ["--connection-options",opts.connection_options]
     if host: command = ssh_command(host, command)
-    return Popen(command, stdout=PIPE)
+    return clients.add(Popen(command, stdout=PIPE))
 
 def first_line(p):
     out,err=p.communicate()
@@ -133,7 +147,11 @@ def delete_queues(queues, broker):
     c = qpid.messaging.Connection(broker)
     c.open()
     for q in queues:
-        try: s = c.session().sender("%s;{delete:always}"%(q))
+        try:
+            s = c.session()
+            snd = s.sender("%s;{delete:always}"%(q))
+            snd.close()
+            s.sync()
         except qpid.messaging.exceptions.NotFound: pass # Ignore "no such queue"
     c.close()
 
@@ -145,7 +163,6 @@ def print_header(timestamp):
 def parse(parser, lines):               # Parse sender/receiver output
     for l in lines:
         fn_val = zip(parser, l)
-
     return [map(lambda p: p[0](p[1]), zip(parser,line.split())) for line in lines]
 
 def parse_senders(senders):
@@ -156,11 +173,12 @@ def parse_receivers(receivers):
 
 def print_data(send_stats, recv_stats):
     for send,recv in map(None, send_stats, recv_stats):
-        if send: print send[0],
+        line=""
+        if send: line += "%d"%send[0]
         if recv:
-            print "\t\t%d"%recv[0],
-            if len(recv) == 4: print "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:]),
-        print
+            line += "\t\t%d"%recv[0]
+            if len(recv) == 4: line += "\t%.2f\t%.2f\t%.2f"%tuple(recv[1:])
+        print line
 
 def print_summary(send_stats, recv_stats):
     def avg(s): sum(s) / len(s)
@@ -184,11 +202,11 @@ class ReadyReceiver:
         self.receiver = self.connection.session().receiver(
             "%s;{create:receiver,delete:receiver,node:{durable:false}}"%(queue))
         self.receiver.session.sync()
-        self.timeout=2
+        self.timeout=10
 
     def wait(self, receivers):
         try:
-            for i in xrange(len(receivers)): self.receiver.fetch(self.timeout)
+            for i in receivers: self.receiver.fetch(self.timeout)
             self.connection.close()
         except qpid.messaging.Empty:
             for r in receivers:
@@ -197,7 +215,8 @@ class ReadyReceiver:
                     raise Exception("Receiver error: %s"%(out))
             raise Exception("Timed out waiting for receivers to be ready")
 
-def flatten(l): return sum(map(lambda s: s.split(","), l),[])
+def flatten(l):
+    return sum(map(lambda s: re.split(re.compile("\s*,\s*|\s+"), s), l), [])
 
 class RoundRobin:
     def __init__(self,items):
@@ -221,20 +240,22 @@ def main():
     receive_out = ""
     ready_queue="%s-ready"%(opts.queue_name)
     queues = ["%s-%s"%(opts.queue_name, i) for i in xrange(opts.queues)]
-    for i in xrange(opts.repeat):
-        delete_queues(queues, opts.broker[0])
-        ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
-        receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
-                     for q in queues for j in xrange(opts.receivers)]
-        ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
-        senders = [start_send(q, opts,brokers.next(), client_hosts.next())
-                   for q in queues for j in xrange(opts.senders)]
-        if opts.report_header and i == 0: print_header(opts.timestamp)
-        send_stats=parse_senders(senders)
-        recv_stats=parse_receivers(receivers)
-        if opts.summarize: print_summary(send_stats, recv_stats)
-        else: print_data(send_stats, recv_stats)
-        delete_queues(queues, opts.broker[0])
+    try:
+        for i in xrange(opts.repeat):
+            delete_queues(queues, opts.broker[0])
+            ready_receiver = ReadyReceiver(ready_queue, opts.broker[0])
+            receivers = [start_receive(q, j, opts, ready_queue, brokers.next(), client_hosts.next())
+                         for q in queues for j in xrange(opts.receivers)]
+            ready_receiver.wait(filter(None, receivers)) # Wait for receivers to be ready.
+            senders = [start_send(q, opts,brokers.next(), client_hosts.next())
+                       for q in queues for j in xrange(opts.senders)]
+            if opts.report_header and i == 0: print_header(opts.timestamp)
+            send_stats=parse_senders(senders)
+            recv_stats=parse_receivers(receivers)
+            if opts.summarize: print_summary(send_stats, recv_stats)
+            else: print_data(send_stats, recv_stats)
+            delete_queues(queues, opts.broker[0])
+    finally: clients.kill()             # No strays
 
 if __name__ == "__main__": main()
 

Modified: qpid/branches/QPID-2519/cpp/src/tests/qpid-ctrl
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/cpp/src/tests/qpid-ctrl?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/cpp/src/tests/qpid-ctrl (original)
+++ qpid/branches/QPID-2519/cpp/src/tests/qpid-ctrl Fri Oct 21 14:42:12 2011
@@ -92,7 +92,10 @@ try:
   arguments = {}
   for a in args:
     name, val = nameval(a)
-    arguments[name] = val
+    if val[0] == '{' or val[0] == '[':
+      arguments[name] = eval(val)
+    else:
+      arguments[name] = val
   content = {
              "_object_id": {"_object_name": object_name},
              "_method_name": method_name,



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org