You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC

svn commit: r1368910 [10/27] - in /qpid/branches/asyncstore: ./ bin/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/ cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/ cpp/bindings/qpid/ruby/features/step_definitions/ cpp...

Modified: qpid/branches/asyncstore/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/cluster_tests.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/cluster_tests.py (original)
+++ qpid/branches/asyncstore/cpp/src/tests/cluster_tests.py Fri Aug  3 12:13:32 2012
@@ -227,6 +227,18 @@ acl deny all all
         self.assertEqual("x", cluster[0].get_message("q").content)
         self.assertEqual("y", cluster[1].get_message("q").content)
 
+    def test_other_mech(self):
+        """Test using a mechanism other than PLAIN/ANONYMOUS for cluster update  authentication.
+        Regression test for https://issues.apache.org/jira/browse/QPID-3849"""
+        sasl_config=os.path.join(self.rootdir, "sasl_config")
+        cluster = self.cluster(2, args=["--auth", "yes", "--sasl-config", sasl_config,
+                                        "--cluster-username=zig",
+                                        "--cluster-password=zig",
+                                        "--cluster-mechanism=DIGEST-MD5"])
+        cluster[0].connect()
+        cluster.start()         # Before the fix this broker falied to join the cluster.
+        cluster[2].connect()
+
     def test_link_events(self):
         """Regression test for https://bugzilla.redhat.com/show_bug.cgi?id=611543"""
         args = ["--mgmt-pub-interval", 1] # Publish management information every second.
@@ -768,6 +780,35 @@ acl deny all all
         fetch(cluster[2])
 
 
+    def _verify_federation(self, src_broker, src, dst_broker, dst, timeout=30):
+        """ Prove that traffic can pass between two federated brokers.
+        """
+        tot_time = 0
+        active = False
+        send_session = src_broker.connect().session()
+        sender = send_session.sender(src)
+        receive_session = dst_broker.connect().session()
+        receiver = receive_session.receiver(dst)
+        while not active and tot_time < timeout:
+            sender.send(Message("Hello from Source!"))
+            try:
+                receiver.fetch(timeout = 1)
+                receive_session.acknowledge()
+                # Get this far without Empty exception, and the link is good!
+                active = True
+                while True:
+                    # Keep receiving msgs, as several may have accumulated
+                    receiver.fetch(timeout = 1)
+                    receive_session.acknowledge()
+            except Empty:
+                if not active:
+                    tot_time += 1
+        receiver.close()
+        receive_session.close()
+        sender.close()
+        send_session.close()
+        return active
+
     def test_federation_failover(self):
         """
         Verify that federation operates across failures occuring in a cluster.
@@ -778,38 +819,6 @@ acl deny all all
         cluster to newly-added members
         """
 
-        TIMEOUT = 30
-        def verify(src_broker, src, dst_broker, dst, timeout=TIMEOUT):
-            """ Prove that traffic can pass from source fed broker to
-            destination fed broker
-            """
-            tot_time = 0
-            active = False
-            send_session = src_broker.connect().session()
-            sender = send_session.sender(src)
-            receive_session = dst_broker.connect().session()
-            receiver = receive_session.receiver(dst)
-            while not active and tot_time < timeout:
-                sender.send(Message("Hello from Source!"))
-                try:
-                    receiver.fetch(timeout = 1)
-                    receive_session.acknowledge()
-                    # Get this far without Empty exception, and the link is good!
-                    active = True
-                    while True:
-                        # Keep receiving msgs, as several may have accumulated
-                        receiver.fetch(timeout = 1)
-                        receive_session.acknowledge()
-                except Empty:
-                    if not active:
-                        tot_time += 1
-            receiver.close()
-            receive_session.close()
-            sender.close()
-            send_session.close()
-            self.assertTrue(active, "Bridge failed to become active")
-
-
         # 2 node cluster source, 2 node cluster destination
         src_cluster = self.cluster(2, expect=EXPECT_EXIT_FAIL)
         src_cluster.ready();
@@ -848,43 +857,145 @@ acl deny all all
         self.assertEqual(result.status, 0, result)
 
         # check that traffic passes
-        verify(src_cluster[0], "srcQ", dst_cluster[0], "destQ")
+        assert self._verify_federation(src_cluster[0], "srcQ", dst_cluster[0], "destQ")
 
         # add src[2] broker to source cluster
         src_cluster.start(expect=EXPECT_EXIT_FAIL);
         src_cluster.ready();
-        verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+        assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
 
         # Kill src[0]. dst[0] should fail over to src[1]
         src_cluster[0].kill()
         for b in src_cluster[1:]: b.ready()
-        verify(src_cluster[1], "srcQ", dst_cluster[0], "destQ")
+        assert self._verify_federation(src_cluster[1], "srcQ", dst_cluster[0], "destQ")
 
         # Kill src[1], dst[0] should fail over to src[2]
         src_cluster[1].kill()
         for b in src_cluster[2:]: b.ready()
-        verify(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
+        assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[0], "destQ")
 
         # Kill dest[0], force failover to dest[1]
         dst_cluster[0].kill()
         for b in dst_cluster[1:]: b.ready()
-        verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+        assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
 
         # Add dest[2]
         # dest[1] syncs dest[2] to current remote state
         dst_cluster.start(expect=EXPECT_EXIT_FAIL);
         for b in dst_cluster[1:]: b.ready()
-        verify(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
+        assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[1], "destQ")
 
         # Kill dest[1], force failover to dest[2]
         dst_cluster[1].kill()
         for b in dst_cluster[2:]: b.ready()
-        verify(src_cluster[2], "srcQ", dst_cluster[2], "destQ")
+        assert self._verify_federation(src_cluster[2], "srcQ", dst_cluster[2], "destQ")
 
         for i in range(2, len(src_cluster)): src_cluster[i].kill()
         for i in range(2, len(dst_cluster)): dst_cluster[i].kill()
 
 
+    def test_federation_multilink_failover(self):
+        """
+        Verify that multi-link federation operates across failures occuring in
+        a cluster.
+        """
+
+        # 1 node cluster source, 1 node cluster destination
+        src_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
+        src_cluster.ready();
+        dst_cluster = self.cluster(1, expect=EXPECT_EXIT_FAIL)
+        dst_cluster.ready();
+
+        # federate a direct binding across two separate links
+
+        # first, create a direct exchange bound to two queues using different
+        # bindings
+        cmd = self.popen(["qpid-config",
+                          "--broker", src_cluster[0].host_port(),
+                          "add", "exchange", "direct", "FedX"],
+                         EXPECT_EXIT_OK)
+        cmd.wait()
+
+        cmd = self.popen(["qpid-config",
+                          "--broker", dst_cluster[0].host_port(),
+                          "add", "exchange", "direct", "FedX"],
+                         EXPECT_EXIT_OK)
+        cmd.wait()
+
+        cmd = self.popen(["qpid-config",
+                          "--broker", dst_cluster[0].host_port(),
+                          "add", "queue", "destQ1"],
+                         EXPECT_EXIT_OK)
+        cmd.wait()
+
+        cmd = self.popen(["qpid-config",
+                          "--broker", dst_cluster[0].host_port(),
+                          "bind", "FedX", "destQ1", "one"],
+                         EXPECT_EXIT_OK)
+        cmd.wait()
+
+        cmd = self.popen(["qpid-config",
+                          "--broker", dst_cluster[0].host_port(),
+                          "add", "queue", "destQ2"],
+                         EXPECT_EXIT_OK)
+        cmd.wait()
+
+        cmd = self.popen(["qpid-config",
+                          "--broker", dst_cluster[0].host_port(),
+                          "bind", "FedX", "destQ2", "two"],
+                         EXPECT_EXIT_OK)
+        cmd.wait()
+
+        # Create two separate links between the dst and source brokers, bind
+        # each to different keys
+        dst_cluster[0].startQmf()
+        dst_broker = dst_cluster[0].qmf_session.getObjects(_class="broker")[0]
+
+        for _l in [("link1", "bridge1", "one"),
+                   ("link2", "bridge2", "two")]:
+            result = dst_broker.create("link", _l[0],
+                                       {"host":src_cluster[0].host(),
+                                        "port":src_cluster[0].port()},
+                                       False)
+            self.assertEqual(result.status, 0, result);
+            result = dst_broker.create("bridge", _l[1],
+                                       {"link":_l[0],
+                                        "src":"FedX",
+                                        "dest":"FedX",
+                                        "key":_l[2]}, False)
+            self.assertEqual(result.status, 0);
+
+        # check that traffic passes
+        assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1")
+        assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2")
+
+        # add new member, verify traffic
+        src_cluster.start(expect=EXPECT_EXIT_FAIL);
+        src_cluster.ready();
+
+        dst_cluster.start(expect=EXPECT_EXIT_FAIL);
+        dst_cluster.ready();
+
+        assert self._verify_federation(src_cluster[0], "FedX/one", dst_cluster[0], "destQ1")
+        assert self._verify_federation(src_cluster[0], "FedX/two", dst_cluster[0], "destQ2")
+
+        src_cluster[0].kill()
+        for b in src_cluster[1:]: b.ready()
+
+        assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[0], "destQ1")
+        assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[0], "destQ2")
+
+        dst_cluster[0].kill()
+        for b in dst_cluster[1:]: b.ready()
+
+        assert self._verify_federation(src_cluster[1], "FedX/one", dst_cluster[1], "destQ1")
+        assert self._verify_federation(src_cluster[1], "FedX/two", dst_cluster[1], "destQ2")
+
+        for i in range(1, len(src_cluster)): src_cluster[i].kill()
+        for i in range(1, len(dst_cluster)): dst_cluster[i].kill()
+
+
+
 # Some utility code for transaction tests
 XA_RBROLLBACK = 1
 XA_RBTIMEOUT = 2

Propchange: qpid/branches/asyncstore/cpp/src/tests/cluster_tests.py
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/cpp/src/tests/cluster_tests.py:r1333988-1368650

Modified: qpid/branches/asyncstore/cpp/src/tests/federation.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/federation.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/federation.py (original)
+++ qpid/branches/asyncstore/cpp/src/tests/federation.py Fri Aug  3 12:13:32 2012
@@ -23,6 +23,7 @@ from qpid.testlib import TestBase010
 from qpid.datatypes import Message
 from qpid.queue import Empty
 from qpid.util import URL
+import qpid.messaging
 from time import sleep, time
 
 
@@ -94,6 +95,11 @@ class FederationTests(TestBase010):
                     break
             self._brokers.append(_b)
 
+        # add a new-style messaging connection to each broker
+        for _b in self._brokers:
+            _b.connection = qpid.messaging.Connection(_b.url)
+            _b.connection.open()
+
     def _teardown_brokers(self):
         """ Un-does _setup_brokers()
         """
@@ -103,7 +109,7 @@ class FederationTests(TestBase010):
             if not _b.client_session.error():
                 _b.client_session.close(timeout=10)
             _b.client_conn.close(timeout=10)
-
+            _b.connection.close()
 
     def test_bridge_create_and_close(self):
         self.startQmf();
@@ -127,18 +133,28 @@ class FederationTests(TestBase010):
         self.verify_cleanup()
 
     def test_pull_from_exchange(self):
+        """ This test uses an alternative method to manage links and bridges
+        via the broker object.
+        """
         session = self.session
-        
+
         self.startQmf()
         qmf = self.qmf
         broker = qmf.getObjects(_class="broker")[0]
-        result = broker.connect(self.remote_host(), self.remote_port(), False, "PLAIN", "guest", "guest", "tcp")
-        self.assertEqual(result.status, 0, result)
 
-        link = qmf.getObjects(_class="link")[0]
-        result = link.bridge(False, "amq.direct", "amq.fanout", "my-key", "", "", False, False, False, 0)
+        # create link
+        link_args = {"host":self.remote_host(), "port":self.remote_port(), "durable":False,
+                     "authMechanism":"PLAIN", "username":"guest", "password":"guest",
+                     "transport":"tcp"}
+        result = broker.create("link", "test-link-1", link_args, False)
         self.assertEqual(result.status, 0, result)
+        link = qmf.getObjects(_class="link")[0]
 
+        # create bridge
+        bridge_args = {"link":"test-link-1", "src":"amq.direct", "dest":"amq.fanout",
+                       "key":"my-key"}
+        result = broker.create("bridge", "test-bridge-1", bridge_args, False);
+        self.assertEqual(result.status, 0, result)
         bridge = qmf.getObjects(_class="bridge")[0]
 
         #setup queue to receive messages from local broker
@@ -164,9 +180,11 @@ class FederationTests(TestBase010):
             self.fail("Got unexpected message in queue: " + extra.body)
         except Empty: None
 
-        result = bridge.close()
+
+        result = broker.delete("bridge", "test-bridge-1", {})
         self.assertEqual(result.status, 0, result)
-        result = link.close()
+
+        result = broker.delete("link", "test-link-1", {})
         self.assertEqual(result.status, 0, result)
 
         self.verify_cleanup()
@@ -376,6 +394,9 @@ class FederationTests(TestBase010):
         for i in range(1, 11):
             try:
                 msg = queue.get(timeout=5)
+                mp = msg.get("message_properties").application_headers
+                self.assertEqual(mp.__class__, dict)
+                self.assertEqual(mp['x-qpid.trace'], 'REMOTE') # check that the federation-tag override works
                 self.assertEqual("Message %d" % i, msg.body)
             except Empty:
                 self.fail("Failed to find expected message containing 'Message %d'" % i)
@@ -2153,3 +2174,433 @@ class FederationTests(TestBase010):
         self.verify_cleanup()
 
 
+    def test_multilink_direct(self):
+        """ Verify that two distinct links can be created between federated
+        brokers.
+        """
+        self.startQmf()
+        qmf = self.qmf
+        self._setup_brokers()
+        src_broker = self._brokers[0]
+        dst_broker = self._brokers[1]
+
+        # create a direct exchange on each broker
+        for _b in [src_broker, dst_broker]:
+            _b.client_session.exchange_declare(exchange="fedX.direct", type="direct")
+            self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type,
+                             "direct", "exchange_declare failed!")
+
+        # create destination queues
+        for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]:
+            dst_broker.client_session.queue_declare(queue=_q[0], auto_delete=True)
+            dst_broker.client_session.exchange_bind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1])
+
+        # create two connections, one for high priority traffic
+        for _q in ["HiPri", "Traffic"]:
+            result = dst_broker.qmf_object.create("link", _q,
+                                                  {"host":src_broker.host,
+                                                   "port":src_broker.port},
+                                                  False)
+            self.assertEqual(result.status, 0);
+
+        links = qmf.getObjects(_broker=dst_broker.qmf_broker, _class="link")
+        for _l in links:
+            if _l.name == "HiPri":
+                hi_link = _l
+            elif _l.name == "Traffic":
+                data_link = _l
+            else:
+                self.fail("Unexpected Link found: " + _l.name)
+
+        # now create a route for messages sent with key "high" to use the
+        # hi_link
+        result = dst_broker.qmf_object.create("bridge", "HiPriBridge",
+                                              {"link":hi_link.name,
+                                               "src":"fedX.direct",
+                                               "dest":"fedX.direct",
+                                               "key":"high"}, False)
+        self.assertEqual(result.status, 0);
+
+
+        # create routes for the "medium" and "low" links to use the normal
+        # data_link
+        for _b in [("MediumBridge", "medium"), ("LowBridge", "low")]:
+            result = dst_broker.qmf_object.create("bridge", _b[0],
+                                                  {"link":data_link.name,
+                                                   "src":"fedX.direct",
+                                                   "dest":"fedX.direct",
+                                                   "key":_b[1]}, False)
+            self.assertEqual(result.status, 0);
+
+        # now wait for the links to become operational
+        for _l in [hi_link, data_link]:
+            expire_time = time() + 30
+            while _l.state != "Operational" and time() < expire_time:
+                _l.update()
+            self.assertEqual(_l.state, "Operational", "Link failed to become operational")
+
+        # verify each link uses a different connection
+        self.assertNotEqual(hi_link.connectionRef, data_link.connectionRef,
+                            "Different links using the same connection")
+
+        hi_conn = qmf.getObjects(_broker=dst_broker.qmf_broker,
+                                 _objectId=hi_link.connectionRef)[0]
+        data_conn = qmf.getObjects(_broker=dst_broker.qmf_broker,
+                                 _objectId=data_link.connectionRef)[0]
+
+
+        # send hi data, verify only goes over hi link
+
+        r_ssn = dst_broker.connection.session()
+        hi_receiver = r_ssn.receiver("HiQ");
+        med_receiver = r_ssn.receiver("MedQ");
+        low_receiver = r_ssn.receiver("LoQ");
+
+        for _c in [hi_conn, data_conn]:
+            _c.update()
+            self.assertEqual(_c.msgsToClient, 0, "Unexpected messages received")
+
+        s_ssn = src_broker.connection.session()
+        hi_sender = s_ssn.sender("fedX.direct/high")
+        med_sender = s_ssn.sender("fedX.direct/medium")
+        low_sender = s_ssn.sender("fedX.direct/low")
+
+        try:
+            hi_sender.send(qpid.messaging.Message(content="hi priority"))
+            msg = hi_receiver.fetch(timeout=10)
+            r_ssn.acknowledge()
+            self.assertEqual(msg.content, "hi priority");
+        except:
+            self.fail("Hi Pri message failure")
+
+        hi_conn.update()
+        data_conn.update()
+        self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message")
+        self.assertEqual(data_conn.msgsToClient, 0, "Expected 0 data messages")
+
+        # send low and medium, verify it does not go over hi link
+
+        try:
+            med_sender.send(qpid.messaging.Message(content="medium priority"))
+            msg = med_receiver.fetch(timeout=10)
+            r_ssn.acknowledge()
+            self.assertEqual(msg.content, "medium priority");
+        except:
+            self.fail("Medium Pri message failure")
+
+        hi_conn.update()
+        data_conn.update()
+        self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message")
+        self.assertEqual(data_conn.msgsToClient, 1, "Expected 1 data message")
+
+        try:
+            low_sender.send(qpid.messaging.Message(content="low priority"))
+            msg = low_receiver.fetch(timeout=10)
+            r_ssn.acknowledge()
+            self.assertEqual(msg.content, "low priority");
+        except:
+            self.fail("Low Pri message failure")
+
+        hi_conn.update()
+        data_conn.update()
+        self.assertEqual(hi_conn.msgsToClient, 1, "Expected 1 hi pri message")
+        self.assertEqual(data_conn.msgsToClient, 2, "Expected 2 data message")
+
+        # cleanup
+
+        for _b in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="bridge"):
+            result = _b.close()
+            self.assertEqual(result.status, 0)
+
+        for _l in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="link"):
+            result = _l.close()
+            self.assertEqual(result.status, 0)
+
+        for _q in [("HiQ", "high"), ("MedQ", "medium"), ("LoQ", "low")]:
+            dst_broker.client_session.exchange_unbind(queue=_q[0], exchange="fedX.direct", binding_key=_q[1])
+            dst_broker.client_session.queue_delete(queue=_q[0])
+
+        for _b in [src_broker, dst_broker]:
+            _b.client_session.exchange_delete(exchange="fedX.direct")
+
+        self._teardown_brokers()
+
+        self.verify_cleanup()
+
+
+    def test_multilink_shared_queue(self):
+        """ Verify that two distinct links can be created between federated
+        brokers.
+        """
+        self.startQmf()
+        qmf = self.qmf
+        self._setup_brokers()
+        src_broker = self._brokers[0]
+        dst_broker = self._brokers[1]
+
+        # create a topic exchange on the destination broker
+        dst_broker.client_session.exchange_declare(exchange="fedX.topic", type="topic")
+        self.assertEqual(dst_broker.client_session.exchange_query(name="fedX.topic").type,
+                         "topic", "exchange_declare failed!")
+
+        # create a destination queue
+        dst_broker.client_session.queue_declare(queue="destQ", auto_delete=True)
+        dst_broker.client_session.exchange_bind(queue="destQ", exchange="fedX.topic", binding_key="srcQ")
+
+        # create a single source queue
+        src_broker.client_session.queue_declare(queue="srcQ", auto_delete=True)
+
+        # create two connections
+        for _q in ["Link1", "Link2"]:
+            result = dst_broker.qmf_object.create("link", _q,
+                                                  {"host":src_broker.host,
+                                                   "port":src_broker.port},
+                                                  False)
+            self.assertEqual(result.status, 0);
+
+        links = qmf.getObjects(_broker=dst_broker.qmf_broker, _class="link")
+        self.assertEqual(len(links), 2)
+
+        # now create two "parallel" queue routes from the source queue to the
+        # destination exchange.
+        result = dst_broker.qmf_object.create("bridge", "Bridge1",
+                                              {"link":"Link1",
+                                               "src":"srcQ",
+                                               "dest":"fedX.topic",
+                                               "srcIsQueue": True},
+                                              False)
+        self.assertEqual(result.status, 0);
+        result = dst_broker.qmf_object.create("bridge", "Bridge2",
+                                              {"link":"Link2",
+                                               "src":"srcQ",
+                                               "dest":"fedX.topic",
+                                               "srcIsQueue": True},
+                                              False)
+        self.assertEqual(result.status, 0);
+
+
+        # now wait for the links to become operational
+        for _l in links:
+            expire_time = time() + 30
+            while _l.state != "Operational" and time() < expire_time:
+                _l.update()
+            self.assertEqual(_l.state, "Operational", "Link failed to become operational")
+
+        # verify each link uses a different connection
+        self.assertNotEqual(links[0].connectionRef, links[1].connectionRef,
+                            "Different links using the same connection")
+
+        conn1 = qmf.getObjects(_broker=dst_broker.qmf_broker,
+                               _objectId=links[0].connectionRef)[0]
+        conn2 = qmf.getObjects(_broker=dst_broker.qmf_broker,
+                               _objectId=links[1].connectionRef)[0]
+
+        # verify messages sent to the queue are pulled by each connection
+
+        r_ssn = dst_broker.connection.session()
+        receiver = r_ssn.receiver("destQ");
+
+        for _c in [conn1, conn2]:
+            _c.update()
+            self.assertEqual(_c.msgsToClient, 0, "Unexpected messages received")
+
+        s_ssn = src_broker.connection.session()
+        sender = s_ssn.sender("srcQ")
+
+        try:
+            for x in range(5):
+                sender.send(qpid.messaging.Message(content="hello"))
+            for x in range(5):
+                msg = receiver.fetch(timeout=10)
+                self.assertEqual(msg.content, "hello");
+                r_ssn.acknowledge()
+        except:
+            self.fail("Message failure")
+
+        # expect messages to be split over each connection.
+        conn1.update()
+        conn2.update()
+        self.assertNotEqual(conn1.msgsToClient, 0, "No messages sent")
+        self.assertNotEqual(conn2.msgsToClient, 0, "No messages sent")
+        self.assertEqual(conn2.msgsToClient + conn1.msgsToClient, 5,
+                         "Expected 5 messages total")
+
+        for _b in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="bridge"):
+            result = _b.close()
+            self.assertEqual(result.status, 0)
+
+        for _l in qmf.getObjects(_broker=dst_broker.qmf_broker,_class="link"):
+            result = _l.close()
+            self.assertEqual(result.status, 0)
+
+        dst_broker.client_session.exchange_unbind(queue="destQ", exchange="fedX.topic", binding_key="srcQ")
+        dst_broker.client_session.exchange_delete(exchange="fedX.topic")
+
+        self._teardown_brokers()
+
+        self.verify_cleanup()
+
+
+    def test_dynamic_direct_shared_queue(self):
+        """
+        Route Topology:
+
+               +<--- B1
+        B0 <---+<--- B2
+               +<--- B3
+        """
+        session = self.session
+
+        # create the federation
+
+        self.startQmf()
+        qmf = self.qmf
+
+        self._setup_brokers()
+
+        # create direct exchange on each broker, and retrieve the corresponding
+        # management object for that exchange
+
+        exchanges=[]
+        for _b in self._brokers:
+            _b.client_session.exchange_declare(exchange="fedX.direct", type="direct")
+            self.assertEqual(_b.client_session.exchange_query(name="fedX.direct").type,
+                             "direct", "exchange_declare failed!")
+            # pull the exchange out of qmf...
+            retries = 0
+            my_exchange = None
+            while my_exchange is None:
+                objs = qmf.getObjects(_broker=_b.qmf_broker, _class="exchange")
+                for ooo in objs:
+                    if ooo.name == "fedX.direct":
+                        my_exchange = ooo
+                        break
+                if my_exchange is None:
+                    retries += 1
+                    self.failIfEqual(retries, 10,
+                                     "QMF failed to find new exchange!")
+                    sleep(1)
+            exchanges.append(my_exchange)
+
+        self.assertEqual(len(exchanges), len(self._brokers), "Exchange creation failed!")
+
+        # Create 2 links per each source broker (1,2,3) to the downstream
+        # broker 0:
+        for _b in range(1,4):
+            for _l in ["dynamic", "queue"]:
+                result = self._brokers[0].qmf_object.create( "link",
+                                                             "Link-%d-%s" % (_b, _l),
+                                                             {"host":self._brokers[_b].host,
+                                                              "port":self._brokers[_b].port}, False)
+                self.assertEqual(result.status, 0)
+
+            # create queue on source brokers for use by the dynamic route
+            self._brokers[_b].client_session.queue_declare(queue="fedSrcQ", exclusive=False, auto_delete=True)
+
+        for _l in range(1,4):
+            # for each dynamic link, create a dynamic bridge for the "fedX.direct"
+            # exchanges, using the fedSrcQ on each upstream source broker
+            result = self._brokers[0].qmf_object.create("bridge",
+                                                        "Bridge-%d-dynamic" % _l,
+                                                        {"link":"Link-%d-dynamic" % _l,
+                                                         "src":"fedX.direct",
+                                                         "dest":"fedX.direct",
+                                                         "dynamic":True,
+                                                         "queue":"fedSrcQ"}, False)
+            self.assertEqual(result.status, 0)
+
+            # create a queue route that shares the queue used by the dynamic route
+            result = self._brokers[0].qmf_object.create("bridge",
+                                                        "Bridge-%d-queue" % _l,
+                                                        {"link":"Link-%d-queue" % _l,
+                                                         "src":"fedSrcQ",
+                                                         "dest":"fedX.direct",
+                                                         "srcIsQueue":True}, False)
+            self.assertEqual(result.status, 0)
+
+
+        # wait for the inter-broker links to become operational
+        retries = 0
+        operational = False
+        while not operational:
+            operational = True
+            for _l in qmf.getObjects(_class="link"):
+                #print("Link=%s:%s %s" % (_l.host, _l.port, str(_l.state)))
+                if _l.state != "Operational":
+                    operational = False
+            if not operational:
+                retries += 1
+                self.failIfEqual(retries, 10,
+                                 "inter-broker links failed to become operational.")
+                sleep(1)
+
+        # @todo - There is no way to determine when the bridge objects become
+        # active.  Hopefully, this is long enough!
+        sleep(6)
+
+        # create a queue on B0, bound to "spudboy"
+        self._brokers[0].client_session.queue_declare(queue="DestQ", exclusive=True, auto_delete=True)
+        self._brokers[0].client_session.exchange_bind(queue="DestQ", exchange="fedX.direct", binding_key="spudboy")
+
+        # subscribe to messages arriving on B2's queue
+        self.subscribe(self._brokers[0].client_session, queue="DestQ", destination="f1")
+        queue = self._brokers[0].client_session.incoming("f1")
+
+        # wait until the binding key has propagated to each broker
+
+        binding_counts = [1, 1, 1, 1]
+        self.assertEqual(len(binding_counts), len(exchanges), "Update Test!")
+        for i in range(3,-1,-1):
+            retries = 0
+            exchanges[i].update()
+            while exchanges[i].bindingCount < binding_counts[i]:
+                retries += 1
+                self.failIfEqual(retries, 10,
+                                 "binding failed to propagate to broker %d"
+                                 % i)
+                sleep(3)
+                exchanges[i].update()
+
+        for _b in range(1,4):
+            # send 3 msgs from each source broker
+            for i in range(3):
+                dp = self._brokers[_b].client_session.delivery_properties(routing_key="spudboy")
+                self._brokers[_b].client_session.message_transfer(destination="fedX.direct", message=Message(dp, "Message_drp %d" % i))
+
+        # get exactly 9 (3 per broker) on B0
+        for i in range(9):
+            msg = queue.get(timeout=5)
+
+        try:
+            extra = queue.get(timeout=1)
+            self.fail("Got unexpected message in queue: " + extra.body)
+        except Empty: None
+
+        # verify that messages went across every link
+        for _l in qmf.getObjects(_broker=self._brokers[0].qmf_broker,
+                                 _class="link"):
+            for _c in qmf.getObjects(_broker=self._brokers[0].qmf_broker,
+                                     _objectId=_l.connectionRef):
+                self.assertNotEqual(_c.msgsToClient, 0, "Messages did not pass over link as expected.")
+
+        # cleanup
+
+        self._brokers[0].client_session.exchange_unbind(queue="DestQ", exchange="fedX.direct", binding_key="spudboy")
+        self._brokers[0].client_session.message_cancel(destination="f1")
+        self._brokers[0].client_session.queue_delete(queue="DestQ")
+
+        for _b in qmf.getObjects(_class="bridge"):
+            result = _b.close()
+            self.assertEqual(result.status, 0)
+
+        for _l in qmf.getObjects(_class="link"):
+            result = _l.close()
+            self.assertEqual(result.status, 0)
+
+        for _b in self._brokers:
+            _b.client_session.exchange_delete(exchange="fedX.direct")
+
+        self._teardown_brokers()
+
+        self.verify_cleanup()
+

Modified: qpid/branches/asyncstore/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/ha_tests.py?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/asyncstore/cpp/src/tests/ha_tests.py Fri Aug  3 12:13:32 2012
@@ -18,59 +18,123 @@
 # under the License.
 #
 
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
+import traceback
+from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED
 from qpid.datatypes import uuid4
 from brokertest import *
 from threading import Thread, Lock, Condition
-from logging import getLogger, WARN, ERROR, DEBUG
+from logging import getLogger, WARN, ERROR, DEBUG, INFO
 from qpidtoollibs import BrokerAgent
+from uuid import UUID
 
 log = getLogger(__name__)
 
+class QmfAgent(object):
+    """Access to a QMF broker agent."""
+    def __init__(self, address, **kwargs):
+        self._connection = Connection.establish(
+            address, client_properties={"qpid.ha-admin":1}, **kwargs)
+        self._agent = BrokerAgent(self._connection)
+        assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address)
+
+    def __getattr__(self, name):
+        a = getattr(self._agent, name)
+        return a
+
+class Credentials(object):
+    """SASL credentials: username, password, and mechanism"""
+    def __init__(self, username, password, mechanism):
+        (self.username, self.password, self.mechanism) = (username, password, mechanism)
+
+    def __str__(self): return "Credentials%s"%(self.tuple(),)
+
+    def tuple(self): return (self.username, self.password, self.mechanism)
+
+    def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url)
+
 class HaBroker(Broker):
-    def __init__(self, test, args=[], broker_url=None, ha_cluster=True,
-                 ha_replicate="all", **kwargs):
+    """Start a broker with HA enabled
+    @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
+    """
+    def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all",
+                 client_credentials=None, **kwargs):
         assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
         args = copy(args)
         args += ["--load-module", BrokerTest.ha_lib,
-                 "--log-enable=info+", "--log-enable=debug+:ha::",
+                 "--log-enable=debug+:ha::",
                  # FIXME aconway 2012-02-13: workaround slow link failover.
                  "--link-maintenace-interval=0.1",
                  "--ha-cluster=%s"%ha_cluster]
         if ha_replicate is not None:
             args += [ "--ha-replicate=%s"%ha_replicate ]
-        if broker_url: args.extend([ "--ha-brokers", broker_url ])
+        if brokers_url: args += [ "--ha-brokers-url", brokers_url ]
         Broker.__init__(self, test, args, **kwargs)
-        self.commands=os.getenv("PYTHON_COMMANDS")
-        assert os.path.isdir(self.commands)
+        self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha")
+        assert os.path.exists(self.qpid_ha_path)
+        self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
+        assert os.path.exists(self.qpid_config_path)
         getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
-
-    def promote(self):
-        assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0
-
-    def set_client_url(self, url):
-        assert os.system(
-            "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0
-
-    def set_broker_url(self, url):
-        assert os.system(
-            "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0
-
-    def replicate(self, from_broker, queue):
-        assert os.system(
-            "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+        self.qpid_ha_script=import_script(self.qpid_ha_path)
+        self._agent = None
+        self.client_credentials = client_credentials
+
+    def __str__(self): return Broker.__str__(self)
+
+    def qpid_ha(self, args):
+        cred = self.client_credentials
+        url = self.host_port()
+        if cred:
+            url =cred.add_user(url)
+            args = args + ["--sasl-mechanism", cred.mechanism]
+        self.qpid_ha_script.main_except(["", "-b", url]+args)
+
+    def promote(self): self.qpid_ha(["promote"])
+    def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
+    def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
+    def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
+
+    def agent(self):
+        if not self._agent:
+            cred = self.client_credentials
+            if cred:
+                self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism)
+            else:
+                self._agent = QmfAgent(self.host_port())
+        return self._agent
+
+    def ha_status(self):
+        hb = self.agent().getHaBroker()
+        hb.update()
+        return hb.status
+
+    def wait_status(self, status):
+        def try_get_status():
+            # Ignore ConnectionError, the broker may not be up yet.
+            try: return self.ha_status() == status;
+            except ConnectionError: return False
+        assert retry(try_get_status, timeout=20), "%s status != %r"%(self, status)
+
+    # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
+    def qpid_config(self, args):
+        assert subprocess.call(
+            [self.qpid_config_path, "--broker", self.host_port()]+args) == 0
 
     def config_replicate(self, from_broker, queue):
-        assert os.system(
-            "%s/qpid-config --broker=%s add queue --start-replica %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+        self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
 
     def config_declare(self, queue, replication):
-        assert os.system(
-            "%s/qpid-config --broker=%s add queue %s --replicate %s"%(self.commands, self.host_port(), queue, replication)) == 0
+        self.qpid_config(["add", "queue", queue, "--replicate", replication])
 
     def connect_admin(self, **kwargs):
-        return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
+        cred = self.client_credentials
+        if cred:
+            return Broker.connect(
+                self, client_properties={"qpid.ha-admin":1},
+                username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism,
+                **kwargs)
+        else:
+            return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
 
     def wait_backup(self, address):
         """Wait for address to become valid on a backup broker."""
@@ -78,6 +142,14 @@ class HaBroker(Broker):
         try: wait_address(bs, address)
         finally: bs.connection.close()
 
+    def assert_browse(self, queue, expected, **kwargs):
+        """Verify queue contents by browsing."""
+        bs = self.connect().session()
+        try:
+            wait_address(bs, queue)
+            assert_browse_retry(bs, queue, expected, **kwargs)
+        finally: bs.connection.close()
+
     def assert_browse_backup(self, queue, expected, **kwargs):
         """Combines wait_backup and assert_browse_retry."""
         bs = self.connect_admin().session()
@@ -86,33 +158,70 @@ class HaBroker(Broker):
             assert_browse_retry(bs, queue, expected, **kwargs)
         finally: bs.connection.close()
 
+    def assert_connect_fail(self):
+        try:
+            self.connect()
+            self.test.fail("Expected ConnectionError")
+        except ConnectionError: pass
+
+    def try_connect(self):
+        try: return self.connect()
+        except ConnectionError: return None
+
 class HaCluster(object):
     _cluster_count = 0
 
-    def __init__(self, test, n, **kwargs):
+    def __init__(self, test, n, promote=True, **kwargs):
         """Start a cluster of n brokers"""
         self.test = test
-        self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)]
+        self.kwargs = kwargs
+        self._brokers = []
+        self.id = HaCluster._cluster_count
+        self.broker_id = 0
         HaCluster._cluster_count += 1
-        self.url = ",".join([b.host_port() for b in self])
-        for b in self: b.set_broker_url(self.url)
+        for i in xrange(n): self.start(False)
+        self.update_urls()
         self[0].promote()
 
+    def next_name(self):
+        name="cluster%s-%s"%(self.id, self.broker_id)
+        self.broker_id += 1
+        return name
+
+    def start(self, update_urls=True, args=[]):
+        """Start a new broker in the cluster"""
+        b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
+        self._brokers.append(b)
+        if update_urls: self.update_urls()
+        return b
+
+    def update_urls(self):
+        self.url = ",".join([b.host_port() for b in self])
+        if len(self) > 1:          # No failover addresses on a 1 cluster.
+            for b in self: b.set_brokers_url(self.url)
+
     def connect(self, i):
         """Connect with reconnect_urls"""
         return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
 
-    def kill(self, i):
+    def kill(self, i, promote_next=True):
         """Kill broker i, promote broker i+1"""
-        self[i].kill()
         self[i].expect = EXPECT_EXIT_FAIL
-        self[(i+1) % len(self)].promote()
+        self[i].kill()
+        if promote_next: self[(i+1) % len(self)].promote()
 
-    def bounce(self, i):
+    def restart(self, i):
+        """Start a broker with the same port, name and data directory. It will get
+        a separate log file: foo.n.log"""
+        b = self._brokers[i]
+        self._brokers[i] = HaBroker(
+            self.test, name=b.name, port=b.port(), brokers_url=self.url,
+            **self.kwargs)
+
+    def bounce(self, i, promote_next=True):
         """Stop and restart a broker in a cluster."""
-        self.kill(i)
-        b = self[i]
-        self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url)
+        self.kill(i, promote_next)
+        self.restart(i)
 
     # Behave like a list of brokers.
     def __len__(self): return len(self._brokers)
@@ -128,12 +237,12 @@ def wait_address(session, address):
         except NotFound: return False
     assert retry(check), "Timed out waiting for address %s"%(address)
 
-def assert_missing(session, address):
-    """Assert that the address is _not_ valid"""
+def valid_address(session, address):
+    """Test if an address is valid"""
     try:
         session.receiver(address)
-        self.fail("Expected NotFound: %s"%(address))
-    except NotFound: pass
+        return True
+    except NotFound: return False
 
 class ReplicationTests(BrokerTest):
     """Correctness tests for  HA replication."""
@@ -180,7 +289,7 @@ class ReplicationTests(BrokerTest):
             self.assert_browse_retry(b, prefix+"q1", ["1", "4"])
 
             self.assert_browse_retry(b, prefix+"q2", []) # configuration only
-            assert_missing(b, prefix+"q3")
+            assert not valid_address(b, prefix+"q3")
             b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
             self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
             b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration
@@ -195,7 +304,7 @@ class ReplicationTests(BrokerTest):
 
         # Create config, send messages before starting the backup, to test catch-up replication.
         setup(p, "1", primary)
-        backup  = HaBroker(self, name="backup", broker_url=primary.host_port())
+        backup  = HaBroker(self, name="backup", brokers_url=primary.host_port())
         # Create config, send messages after starting the backup, to test steady-state replication.
         setup(p, "2", primary)
 
@@ -233,10 +342,10 @@ class ReplicationTests(BrokerTest):
         s = p.sender("q;{create:always}")
         for m in [str(i) for i in range(0,10)]: s.send(m)
         s.sync()
-        backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+        backup1 = HaBroker(self, name="backup1", brokers_url=primary.host_port())
         for m in [str(i) for i in range(10,20)]: s.send(m)
         s.sync()
-        backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+        backup2 = HaBroker(self, name="backup2", brokers_url=primary.host_port())
         for m in [str(i) for i in range(20,30)]: s.send(m)
         s.sync()
 
@@ -276,7 +385,7 @@ class ReplicationTests(BrokerTest):
         """Verify that backups rejects connections and that fail-over works in python client"""
         primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
         primary.promote()
-        backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+        backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
         # Check that backup rejects normal connections
         try:
             backup.connect().session()
@@ -294,14 +403,15 @@ class ReplicationTests(BrokerTest):
         primary.kill()
         assert retry(lambda: not is_running(primary.pid))
         backup.promote()
-        self.assert_browse_retry(s, "q", ["foo"])
+        sender.send("bar")
+        self.assert_browse_retry(s, "q", ["foo", "bar"])
         c.close()
 
     def test_failover_cpp(self):
         """Verify that failover works in the C++ client."""
         primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
         primary.promote()
-        backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+        backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
         url="%s,%s"%(primary.host_port(), backup.host_port())
         primary.connect().session().sender("q;{create:always}")
         backup.wait_backup("q")
@@ -344,6 +454,7 @@ class ReplicationTests(BrokerTest):
 
     def test_standalone_queue_replica(self):
         """Test replication of individual queues outside of cluster mode"""
+        getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
         primary = HaBroker(self, name="primary", ha_cluster=False)
         pc = primary.connect()
         ps = pc.session().sender("q;{create:always}")
@@ -393,7 +504,7 @@ class ReplicationTests(BrokerTest):
         """Verify that we replicate to an LVQ correctly"""
         primary  = HaBroker(self, name="primary")
         primary.promote()
-        backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+        backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
         s = primary.connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}")
         def send(key,value): s.send(Message(content=value,properties={"lvq-key":key}))
         for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]:
@@ -410,7 +521,7 @@ class ReplicationTests(BrokerTest):
         """Test replication with the ring queue policy"""
         primary  = HaBroker(self, name="primary")
         primary.promote()
-        backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+        backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
         s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5}}}}")
         for i in range(10): s.send(Message(str(i)))
         backup.assert_browse_backup("q", [str(i) for i in range(5,10)])
@@ -419,18 +530,20 @@ class ReplicationTests(BrokerTest):
         """Test replication with the reject queue policy"""
         primary  = HaBroker(self, name="primary")
         primary.promote()
-        backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+        backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
         s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':reject, 'qpid.max_count':5}}}}")
         try:
             for i in range(10): s.send(Message(str(i)), sync=False)
         except qpid.messaging.exceptions.TargetCapacityExceeded: pass
         backup.assert_browse_backup("q", [str(i) for i in range(0,5)])
+        # Detach, don't close as there is a broken session
+        s.session.connection.detach()
 
     def test_priority(self):
         """Verify priority queues replicate correctly"""
         primary  = HaBroker(self, name="primary")
         primary.promote()
-        backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+        backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
         session = primary.connect().session()
         s = session.sender("priority-queue; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}}}}")
         priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
@@ -445,7 +558,7 @@ class ReplicationTests(BrokerTest):
         """Verify priority queues replicate correctly"""
         primary  = HaBroker(self, name="primary")
         primary.promote()
-        backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+        backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
         session = primary.connect().session()
         levels = 8
         priorities = [4,5,3,7,8,8,2,8,2,8,8,16,6,6,6,6,6,6,8,3,5,8,3,5,5,3,3,8,8,3,7,3,7,7,7,8,8,8,2,3]
@@ -464,7 +577,7 @@ class ReplicationTests(BrokerTest):
     def test_priority_ring(self):
         primary  = HaBroker(self, name="primary")
         primary.promote()
-        backup = HaBroker(self, name="backup", broker_url=primary.host_port())
+        backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
         s = primary.connect().session().sender("q; {create:always, node:{x-declare:{arguments:{'qpid.policy_type':ring, 'qpid.max_count':5, 'qpid.priorities':10}}}}")
         priorities = [8,9,5,1,2,2,3,4,9,7,8,9,9,2]
         for p in priorities: s.send(Message(priority=p))
@@ -475,8 +588,10 @@ class ReplicationTests(BrokerTest):
         # correct result, the uncommented one is for the actualy buggy
         # result.  See https://issues.apache.org/jira/browse/QPID-3866
         #
-        # backup.assert_browse_backup("q", sorted(priorities,reverse=True)[0:5], transform=lambda m: m.priority)
-        backup.assert_browse_backup("q", [9,9,9,9,2], transform=lambda m: m.priority)
+        # expect = sorted(priorities,reverse=True)[0:5]
+        expect = [9,9,9,9,2]
+        primary.assert_browse("q", expect, transform=lambda m: m.priority)
+        backup.assert_browse_backup("q", expect, transform=lambda m: m.priority)
 
     def test_backup_acquired(self):
         """Verify that acquired messages are backed up, for all queue types."""
@@ -509,11 +624,11 @@ class ReplicationTests(BrokerTest):
 
         primary  = HaBroker(self, name="primary")
         primary.promote()
-        backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
+        backup1 = HaBroker(self, name="backup1", brokers_url=primary.host_port())
         c = primary.connect()
         for t in tests: t.send(c) # Send messages, leave one unacknowledged.
 
-        backup2 = HaBroker(self, name="backup2", broker_url=primary.host_port())
+        backup2 = HaBroker(self, name="backup2", brokers_url=primary.host_port())
         # Wait for backups to catch up.
         for t in tests:
             t.wait(self, backup1)
@@ -538,11 +653,13 @@ class ReplicationTests(BrokerTest):
             self.fail("Excpected no-such-queue exception")
         except NotFound: pass
 
-    def test_invalid_default(self):
-        """Verify that a queue with an invalid qpid.replicate gets default treatment"""
-        cluster = HaCluster(self, 2, ha_replicate="all")
-        c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
-        cluster[1].wait_backup("q")
+    def test_invalid_replication(self):
+        """Verify that we reject an attempt to declare a queue with invalid replication value."""
+        cluster = HaCluster(self, 1, ha_replicate="all")
+        try:
+            c = cluster[0].connect().session().sender("q;{create:always, node:{x-declare:{arguments:{'qpid.replicate':XXinvalidXX}}}}")
+            self.fail("Expected ConnectionError")
+        except ConnectionError: pass
 
     def test_exclusive_queue(self):
         """Ensure that we can back-up exclusive queues, i.e. the replicating
@@ -559,6 +676,136 @@ class ReplicationTests(BrokerTest):
         test("excl_sub;{create:always, link:{x-subscribe:{exclusive:True}}}");
         test("excl_queue;{create:always, node:{x-declare:{exclusive:True}}}")
 
+    def test_auto_delete_exclusive(self):
+        """Verify that we ignore auto-delete, exclusive, non-auto-delete-timeout queues"""
+        cluster = HaCluster(self,2)
+        s = cluster[0].connect().session()
+        s.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}")
+        s.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}")
+        s.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}")
+        s.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
+        s.receiver("q;{create:always}")
+
+        s = cluster[1].connect_admin().session()
+        cluster[1].wait_backup("q")
+        assert not valid_address(s, "exad")
+        assert valid_address(s, "ex")
+        assert valid_address(s, "ad")
+        assert valid_address(s, "time")
+
+    def test_broker_info(self):
+        """Check that broker information is correctly published via management"""
+        cluster = HaCluster(self, 3)
+
+        for broker in cluster:  # Make sure HA system-id matches broker's
+            qmf = broker.agent().getHaBroker()
+            self.assertEqual(qmf.systemId, UUID(broker.agent().getBroker().systemRef))
+
+        cluster_ports = map(lambda b: b.port(), cluster)
+        cluster_ports.sort()
+        def ports(qmf):
+            qmf.update()
+            return sorted(map(lambda b: b["port"], qmf.members))
+        # Check that all brokers have the same membership as the cluster
+        for broker in cluster:
+            qmf = broker.agent().getHaBroker()
+            assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s on %s"%(cluster_ports, ports(qmf), broker)
+        # Add a new broker, check it is updated everywhere
+        b = cluster.start()
+        cluster_ports.append(b.port())
+        cluster_ports.sort()
+        for broker in cluster:
+            qmf = broker.agent().getHaBroker()
+            assert retry(lambda: cluster_ports == ports(qmf), 1), "%s != %s"%(cluster_ports, ports(qmf))
+
+    def test_auth(self):
+        """Verify that authentication does not interfere with replication."""
+        # FIXME aconway 2012-07-09: generate test sasl config portably for cmake
+        sasl_config=os.path.join(self.rootdir, "sasl_config")
+        if not os.path.exists(sasl_config):
+            print "WARNING: Skipping test, SASL test configuration %s not found."%sasl_config
+            return
+        acl=os.path.join(os.getcwd(), "policy.acl")
+        aclf=file(acl,"w")
+        # Verify that replication works with auth=yes and HA user has at least the following
+        # privileges:
+        aclf.write("""
+acl allow zag@QPID access queue
+acl allow zag@QPID create queue
+acl allow zag@QPID consume queue
+acl allow zag@QPID delete queue
+acl allow zag@QPID access exchange
+acl allow zag@QPID create exchange
+acl allow zag@QPID bind exchange
+acl allow zag@QPID publish exchange
+acl allow zag@QPID delete exchange
+acl allow zag@QPID access method
+acl allow zag@QPID create link
+acl deny all all
+ """)
+        aclf.close()
+        cluster = HaCluster(
+            self, 2,
+            args=["--auth", "yes", "--sasl-config", sasl_config,
+                  "--acl-file", acl, "--load-module", os.getenv("ACL_LIB"),
+                  "--ha-username=zag", "--ha-password=zag", "--ha-mechanism=PLAIN"
+                  ],
+            client_credentials=Credentials("zag", "zag", "PLAIN"))
+        s0 = cluster[0].connect(username="zag", password="zag").session();
+        s0.receiver("q;{create:always}")
+        s0.receiver("ex;{create:always,node:{type:topic,x-declare:{type:'fanout'},x-bindings:[{exchange:'ex',queue:'q'}]}}")
+        cluster[1].wait_backup("q")
+        cluster[1].wait_backup("ex")
+        s1 = cluster[1].connect_admin().session(); # Uses Credentials above.
+        s1.sender("ex").send("foo");
+        self.assertEqual(s1.receiver("q").fetch().content, "foo")
+
+    def test_alternate_exchange(self):
+        """Verify that alternate-exchange on exchanges and queues is propagated
+        to new members of a cluster. """
+        cluster = HaCluster(self, 2)
+        s = cluster[0].connect().session()
+        # altex exchange: acts as alternate exchange
+        s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
+        # altq queue bound to altex, collect re-routed messages.
+        s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
+        # 0ex exchange with alternate-exchange altex and no queues bound
+        s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
+        # create queue q with alternate-exchange altex
+        s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}")
+        # create a bunch of exchanges to ensure we don't clean up prematurely if the
+        # response comes in multiple fragments.
+        for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i)
+
+        def verify(broker):
+            s = broker.connect().session()
+            # Verify unmatched message goes to ex's alternate.
+            s.sender("0ex").send("foo")
+            altq = s.receiver("altq")
+            self.assertEqual("foo", altq.fetch(timeout=0).content)
+            s.acknowledge()
+            # Verify rejected message goes to q's alternate.
+            s.sender("q").send("bar")
+            msg = s.receiver("q").fetch(timeout=0)
+            self.assertEqual("bar", msg.content)
+            s.acknowledge(msg, Disposition(REJECTED)) # Reject the message
+            self.assertEqual("bar", altq.fetch(timeout=0).content)
+            s.acknowledge()
+
+        # Sanity check: alternate exchanges on original broker
+        verify(cluster[0])
+        # Check backup that was connected during setup.
+        cluster[1].wait_backup("0ex")
+        cluster[1].wait_backup("q")
+        cluster.bounce(0)
+        verify(cluster[1])
+        # Check a newly started backup.
+        cluster.start()
+        cluster[2].wait_backup("0ex")
+        cluster[2].wait_backup("q")
+        cluster.bounce(1)
+        verify(cluster[2])
+
 def fairshare(msgs, limit, levels):
     """
     Generator to return prioritised messages in expected order for a given fairshare limit
@@ -601,49 +848,135 @@ class LongTests(BrokerTest):
         if d: return float(d)*60
         else: return 3                  # Default is to be quick
 
-
-    def disable_test_failover(self):
+    def test_failover_send_receive(self):
         """Test failover with continuous send-receive"""
-        # FIXME aconway 2012-02-03: fails due to dropped messages,
-        # known issue: sending messages to new primary before
-        # backups are ready. Enable when fixed.
-
-        # Start a cluster, all members will be killed during the test.
-        brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
-                    for name in ["ha0","ha1","ha2"] ]
-        url = ",".join([b.host_port() for b in brokers])
-        for b in brokers: b.set_broker_url(url)
-        brokers[0].promote()
+        brokers = HaCluster(self, 3)
 
         # Start sender and receiver threads
-        sender = NumberedSender(brokers[0], max_depth=1000, failover_updates=False)
-        receiver = NumberedReceiver(brokers[0], sender=sender, failover_updates=False)
-        receiver.start()
-        sender.start()
-        # Wait for sender & receiver to get up and running
-        assert retry(lambda: receiver.received > 100)
+        n = 10
+        senders = [NumberedSender(brokers[0], max_depth=1024, failover_updates=False,
+                                 queue="test%s"%(i)) for i in xrange(n)]
+        receivers = [NumberedReceiver(brokers[0], sender=senders[i],
+                                      failover_updates=False,
+                                      queue="test%s"%(i)) for i in xrange(n)]
+        for r in receivers: r.start()
+        for s in senders: s.start()
+
+        def wait_passed(r, n):
+            """Wait for receiver r to pass n"""
+            def check():
+                r.check()       # Verify no exceptions
+                return r.received > n
+            assert retry(check), "Stalled %s at %s"%(r.queue, n)
+
+        for r in receivers: wait_passed(r, 0)
+
         # Kill and restart brokers in a cycle:
         endtime = time.time() + self.duration()
         i = 0
-        while time.time() < endtime or i < 3: # At least 3 iterations
-            sender.sender.assert_running()
-            receiver.receiver.assert_running()
-            port = brokers[i].port()
-            brokers[i].kill()
-            brokers.append(
-                HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port,
-                         expect=EXPECT_EXIT_FAIL))
-            i += 1
-            brokers[i].promote()
-            n = receiver.received       # Verify we're still running
-            def enough():
-                receiver.check()        # Verify no exceptions
-                return receiver.received > n + 100
-            assert retry(enough, timeout=5)
-
-        sender.stop()
-        receiver.stop()
-        for b in brokers[i:]: b.kill()
+        try:
+            while time.time() < endtime or i < 3: # At least 3 iterations
+                for s in senders: s.sender.assert_running()
+                for r in receivers: r.receiver.assert_running()
+                checkpoint = [ r.received for r in receivers ]
+                # Don't kill primary till it is active and the next
+                # backup is ready, otherwise we can lose messages.
+                brokers[i%3].wait_status("active")
+                brokers[(i+1)%3].wait_status("ready")
+                brokers.bounce(i%3)
+                i += 1
+                map(wait_passed, receivers, checkpoint) # Wait for all receivers
+        except:
+            traceback.print_exc()
+            raise
+        finally:
+            for s in senders: s.stop()
+            for r in receivers: r.stop()
+            dead = []
+            for i in xrange(3):
+                if not brokers[i].is_running(): dead.append(i)
+                brokers.kill(i, False)
+            if dead: raise Exception("Brokers not running: %s"%dead)
+
+class RecoveryTests(BrokerTest):
+    """Tests for recovery after a failure."""
+
+    def test_queue_hold(self):
+        """Verify that the broker holds queues without sufficient backup,
+        i.e. does not complete messages sent to those queues."""
+
+        # We don't want backups to time out for this test, set long timeout.
+        cluster = HaCluster(self, 4, args=["--ha-backup-timeout=100000"]);
+        # Wait for the primary to be ready
+        cluster[0].wait_status("active")
+        # Create a queue before the failure.
+        s1 = cluster.connect(0).session().sender("q1;{create:always}")
+        for b in cluster: b.wait_backup("q1")
+        for i in xrange(100): s1.send(str(i))
+        # Kill primary and 2 backups
+        for i in [0,1,2]: cluster.kill(i, False)
+        cluster[3].promote()    # New primary, backups will be 1 and 2
+        cluster[3].wait_status("recovering")
+
+        def assertSyncTimeout(s):
+            try:
+                s.sync(timeout=.01)
+                self.fail("Expected Timeout exception")
+            except Timeout: pass
+
+        # Create a queue after the failure
+        s2 = cluster.connect(3).session().sender("q2;{create:always}")
+
+        # Verify that messages sent are not completed
+        for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
+        assertSyncTimeout(s1)
+        self.assertEqual(s1.unsettled(), 100)
+        assertSyncTimeout(s2)
+        self.assertEqual(s2.unsettled(), 100)
+
+        # Verify we can receive even if sending is on hold:
+        cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
+
+        # Restart backups, verify queues are released only when both backups are up
+        cluster.restart(1)
+        assertSyncTimeout(s1)
+        self.assertEqual(s1.unsettled(), 100)
+        assertSyncTimeout(s2)
+        self.assertEqual(s2.unsettled(), 100)
+        self.assertEqual(cluster[3].ha_status(), "recovering")
+        cluster.restart(2)
+
+        # Verify everything is up to date and active
+        def settled(sender): sender.sync(); return sender.unsettled() == 0;
+        assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
+        assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
+        cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
+        cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
+        cluster[3].wait_status("active"),
+        s1.session.connection.close()
+        s2.session.connection.close()
+
+    def test_expected_backup_timeout(self):
+        """Verify that we time-out expected backups and release held queues
+        after a configured interval. Verify backup is demoted to catch-up,
+        but can still rejoin.
+        """
+        cluster = HaCluster(self, 3, args=["--ha-backup-timeout=0.5"]);
+        cluster[0].wait_status("active") # Primary ready
+        for b in cluster[1:4]: b.wait_status("ready") # Backups ready
+        for i in [0,1]: cluster.kill(i, False)
+        cluster[2].promote()    # New primary, backups will be 1 and 2
+        cluster[2].wait_status("recovering")
+        # Should not go active till the expected backup connects or times out.
+        self.assertEqual(cluster[2].ha_status(), "recovering")
+        # Messages should be held expected backup times out
+        s = cluster[2].connect().session().sender("q;{create:always}")
+        for i in xrange(100): s.send(str(i), sync=False)
+        # Verify message held initially.
+        try: s.sync(timeout=.01); self.fail("Expected Timeout exception")
+        except Timeout: pass
+        s.sync(timeout=1)      # And released after the timeout.
+        self.assertEqual(cluster[2].ha_status(), "active")
 
 if __name__ == "__main__":
     shutil.rmtree("brokertest.tmp", True)

Modified: qpid/branches/asyncstore/cpp/src/tests/ipv6_test
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/ipv6_test?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/ipv6_test (original)
+++ qpid/branches/asyncstore/cpp/src/tests/ipv6_test Fri Aug  3 12:13:32 2012
@@ -19,6 +19,19 @@
 # under the License.
 #
 
+# Check whether we have any globally configured IPv6 addresses
+# - if not then we can't run the tests because ipv6 lookups won't
+#   work within the qpid code. This is a deliberate feature to avoid
+#   getting addresses that can't be routed by the machine.
+
+if ip -f inet6 -o addr | cut -f 9 -s -d' ' | grep global > /dev/null ; then
+    echo "IPv6 addresses configured continuing"
+else
+    echo "No global IPv6 addresses configured - skipping test"
+    exit 0
+fi
+
+
 # Run a simple test over IPv6
 source ./test_env.sh
 

Modified: qpid/branches/asyncstore/cpp/src/tests/logging.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/logging.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/logging.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/logging.cpp Fri Aug  3 12:13:32 2012
@@ -258,7 +258,7 @@ QPID_AUTO_TEST_CASE(testOverhead) {
 Statement statement(
     Level level, const char* file="", int line=0, const char* fn=0)
 {
-    Statement s={0, file, line, fn, level};
+    Statement s={0, file, line, fn, level, ::qpid::log::unspecified};
     return s;
 }
 
@@ -347,11 +347,11 @@ QPID_AUTO_TEST_CASE(testLoggerStateure) 
     };
     opts.parse(ARGC(argv), const_cast<char**>(argv));
     l.configure(opts);
-    QPID_LOG(critical, "foo"); int srcline=__LINE__;
+    QPID_LOG_CAT(critical, test, "foo"); int srcline=__LINE__;
     ifstream log("logging.tmp");
     string line;
     getline(log, line);
-    string expect=(format("critical %s:%d: foo")%__FILE__%srcline).str();
+    string expect=(format("[Test] critical %s:%d: foo")%__FILE__%srcline).str();
     BOOST_CHECK_EQUAL(expect, line);
     log.close();
     unlink("logging.tmp");
@@ -375,11 +375,11 @@ QPID_AUTO_TEST_CASE(testQuoteNonPrintabl
 
     char s[] = "null\0tab\tspace newline\nret\r\x80\x99\xff";
     string str(s, sizeof(s));
-    QPID_LOG(critical, str);
+    QPID_LOG_CAT(critical, test, str);
     ifstream log("logging.tmp");
     string line;
     getline(log, line, '\0');
-    string expect="critical null\\x00tab\tspace newline\nret\r\\x80\\x99\\xFF\\x00\n";
+    string expect="[Test] critical null\\x00tab\tspace newline\nret\r\\x80\\x99\\xFF\\x00\n";
     BOOST_CHECK_EQUAL(expect, line);
     log.close();
     unlink("logging.tmp");

Modified: qpid/branches/asyncstore/cpp/src/tests/qpid-latency-test.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/qpid-latency-test.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/qpid-latency-test.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/qpid-latency-test.cpp Fri Aug  3 12:13:32 2012
@@ -359,19 +359,29 @@ void Sender::sendByRate()
     }
     uint64_t interval = TIME_SEC/opts.rate;
     int64_t timeLimit = opts.timeLimit * TIME_SEC;
-    uint64_t sent = 0, missedRate = 0;
+    uint64_t sent = 0;
     AbsTime start = now();
+    AbsTime last = start;
     while (true) {
         AbsTime sentAt=now();
         msg.getDeliveryProperties().setTimestamp(Duration(EPOCH, sentAt));
         async(session).messageTransfer(arg::content=msg, arg::acceptMode=1);
         if (opts.sync) session.sync();
         ++sent;
+        if (Duration(last, sentAt) > TIME_SEC*2) {
+            Duration t(start, now());
+            //check rate actually achieved thus far
+            uint actualRate = sent / (t/TIME_SEC);
+            //report inability to stay within 1% of desired rate
+            if (actualRate < opts.rate && opts.rate - actualRate > opts.rate/100) {
+                std::cerr << "WARNING: Desired send rate: " << opts.rate << ", actual send rate: " << actualRate << std::endl;
+            }
+            last = sentAt;
+        }
+
         AbsTime waitTill(start, sent*interval);
         Duration delay(sentAt, waitTill);
-        if (delay < 0)
-            ++missedRate;
-        else
+        if (delay > 0)
             sys::usleep(delay / TIME_USEC);
         if (timeLimit != 0 && Duration(start, now()) > timeLimit) {
             session.sync();

Modified: qpid/branches/asyncstore/cpp/src/tests/qpid-receive.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/qpid-receive.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/qpid-receive.cpp Fri Aug  3 12:13:32 2012
@@ -68,6 +68,7 @@ struct Options : public qpid::Options
     bool reportHeader;
     string readyAddress;
     uint receiveRate;
+    std::string replyto;
 
     Options(const std::string& argv0=std::string())
         : qpid::Options("Options"),
@@ -114,6 +115,7 @@ struct Options : public qpid::Options
             ("report-header", qpid::optValue(reportHeader, "yes|no"), "Headers on report.")
             ("ready-address", qpid::optValue(readyAddress, "ADDRESS"), "send a message to this address when ready to receive")
             ("receive-rate", qpid::optValue(receiveRate,"N"), "Receive at rate of N messages/second. 0 means receive as fast as possible.")
+            ("reply-to", qpid::optValue(replyto, "REPLY-TO"), "specify reply-to address on response messages")
             ("help", qpid::optValue(help), "print this usage statement");
         add(log);
     }
@@ -246,6 +248,9 @@ int main(int argc, char ** argv)
                         s = session.createSender(msg.getReplyTo());
                         s.setCapacity(opts.capacity);
                     }
+                    if (!opts.replyto.empty()) {
+                        msg.setReplyTo(Address(opts.replyto));
+                    }
                     s.send(msg);
                 }
                 if (opts.receiveRate) {

Modified: qpid/branches/asyncstore/cpp/src/tests/run_acl_tests
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/run_acl_tests?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/run_acl_tests (original)
+++ qpid/branches/asyncstore/cpp/src/tests/run_acl_tests Fri Aug  3 12:13:32 2012
@@ -30,9 +30,9 @@ trap stop_brokers INT TERM QUIT
 start_brokers() {
     ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIR --load-module $ACL_LIB --acl-file policy.acl --auth no --log-to-file local.log > qpidd.port
     LOCAL_PORT=`cat qpidd.port`
-    ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRI --load-module $ACL_LIB --acl-file policy.acl --auth no --acl-max-connect-per-ip 2 --log-to-file locali.log > qpiddi.port
+    ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRI --load-module $ACL_LIB --acl-file policy.acl --auth no --max-connections-per-ip 2 --log-to-file locali.log > qpiddi.port
     LOCAL_PORTI=`cat qpiddi.port`
-    ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRU --load-module $ACL_LIB --acl-file policy.acl --auth no --acl-max-connect-per-user 2 --log-to-file localu.log > qpiddu.port
+    ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRU --load-module $ACL_LIB --acl-file policy.acl --auth no --max-connections-per-user 2 --log-to-file localu.log > qpiddu.port
     LOCAL_PORTU=`cat qpiddu.port`
 }
 

Modified: qpid/branches/asyncstore/cpp/src/tests/run_federation_tests
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/run_federation_tests?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/run_federation_tests (original)
+++ qpid/branches/asyncstore/cpp/src/tests/run_federation_tests Fri Aug  3 12:13:32 2012
@@ -36,10 +36,10 @@ fi
 QPIDD_CMD="../qpidd --daemon --port 0 --no-data-dir $MODULES --auth no --log-enable=info+ --log-enable=debug+:Bridge --log-to-file"
 start_brokers() {
     rm -f fed_local.log fed_remote.log fed_b1.log fed_b2.log
-    LOCAL_PORT=$($QPIDD_CMD fed_local.log)
-    REMOTE_PORT=$($QPIDD_CMD fed_remote.log)
-    REMOTE_B1=$($QPIDD_CMD fed_b1.log)
-    REMOTE_B2=$($QPIDD_CMD fed_b2.log)
+    LOCAL_PORT=$($QPIDD_CMD fed_local.log --federation-tag LOCAL)
+    REMOTE_PORT=$($QPIDD_CMD fed_remote.log --federation-tag REMOTE)
+    REMOTE_B1=$($QPIDD_CMD fed_b1.log --federation-tag B1)
+    REMOTE_B2=$($QPIDD_CMD fed_b2.log --federation-tag B2)
 }
 
 stop_brokers() {

Modified: qpid/branches/asyncstore/cpp/src/tests/sasl_test_setup.sh
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/sasl_test_setup.sh?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/sasl_test_setup.sh (original)
+++ qpid/branches/asyncstore/cpp/src/tests/sasl_test_setup.sh Fri Aug  3 12:13:32 2012
@@ -30,7 +30,7 @@ pwcheck_method: auxprop
 auxprop_plugin: sasldb
 sasldb_path: $PWD/sasl_config/qpidd.sasldb
 sql_select: dummy select
-mech_list: ANONYMOUS PLAIN DIGEST-MD5 EXTERNAL
+mech_list: ANONYMOUS PLAIN DIGEST-MD5 EXTERNAL CRAM-MD5
 EOF
 
 # Populate temporary sasl db.

Modified: qpid/branches/asyncstore/cpp/src/tests/ssl_test
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/ssl_test?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/ssl_test (original)
+++ qpid/branches/asyncstore/cpp/src/tests/ssl_test Fri Aug  3 12:13:32 2012
@@ -148,6 +148,11 @@ URL=$TEST_HOSTNAME:$PORT
 MSG=`./qpid-receive -b $URL --connection-options '{transport:ssl,heartbeat:2}' -a "foo;{create:always}" --messages 1`
 test "$MSG" = "hello again" || { echo "receive failed '$MSG' != 'hello again'"; exit 1; }
 
+## Test using the Python client
+echo "Testing Non-Authenticating with Python Client..."
+URL=amqps://$TEST_HOSTNAME:$PORT
+if `$top_srcdir/src/tests/ping_broker -b $URL`; then echo "    Passed"; else { echo "    Failed"; exit 1; }; fi
+
 #### Client Authentication tests
 
 start_authenticating_broker

Copied: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncStorePerf_smoke_test.sh (from r1368541, qpid/branches/asyncstore/cpp/src/tests/storePerftools/storePerftoolsSmokeTest.sh)
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncStorePerf_smoke_test.sh?p2=qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncStorePerf_smoke_test.sh&p1=qpid/branches/asyncstore/cpp/src/tests/storePerftools/storePerftoolsSmokeTest.sh&r1=1368541&r2=1368910&rev=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/storePerftoolsSmokeTest.sh (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncStorePerf_smoke_test.sh Fri Aug  3 12:13:32 2012
@@ -9,11 +9,16 @@ run_test() {
     fi    
 }
 
-NUM_MSGS=1000
+NUM_MSGS=10000
 TEST_PROG="./asyncStorePerf"
 
+# Default (no args)
 run_test "${TEST_PROG}"
+
+# Help
 run_test "${TEST_PROG} --help"
+
+# Limited combinations of major params
 for q in 1 2; do
     for p in 1 2; do
         for c in 1 2; do
@@ -27,25 +32,3 @@ for q in 1 2; do
         done
     done
 done
-
-
-NUM_MSGS=1000
-TEST_PROG="./jrnl2Perf"
-
-
-run_test "${TEST_PROG}"
-
-# This test returns 1, don't use run_test until this is fixed.
-cmd="${TEST_PROG} --help"
-echo $cmd
-$cmd
-
-for q in 1 2; do
-    for p in 1 2; do
-        for c in 1; do # BUG - this will fail for c > 1
-            run_test "./jrnl2Perf --num_queues $q --num_msgs ${NUM_MSGS} --num_enq_threads_per_queue $p --num_deq_threads_per_queue $c"
-        done
-    done
-done
-
-#exit 0

Added: qpid/branches/asyncstore/cpp/src/tests/storePerftools/jrnl2Perf_smoke_test.sh
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/jrnl2Perf_smoke_test.sh?rev=1368910&view=auto
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/jrnl2Perf_smoke_test.sh (added)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/jrnl2Perf_smoke_test.sh Fri Aug  3 12:13:32 2012
@@ -0,0 +1,33 @@
+#!/bin/bash
+
+run_test() {
+    local cmd=$1
+    echo $cmd
+    $cmd
+    if (( $? != 0 )); then
+        exit 1
+    fi    
+}
+
+NUM_MSGS=10000
+TEST_PROG="./jrnl2Perf"
+
+# Default (no args)
+run_test "${TEST_PROG}"
+
+# Help
+# This test returns 1, don't use run_test until this is fixed.
+cmd="${TEST_PROG} --help"
+echo $cmd
+$cmd
+
+# Limited combinations of major params
+for q in 1 2; do
+    for p in 1 2; do
+        for c in 1; do # BUG - this will fail for c > 1
+            run_test "./jrnl2Perf --num_queues $q --num_msgs ${NUM_MSGS} --num_enq_threads_per_queue $p --num_deq_threads_per_queue $c"
+        done
+    done
+done
+
+#exit 0

Propchange: qpid/branches/asyncstore/cpp/src/tests/storePerftools/jrnl2Perf_smoke_test.sh
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/branches/asyncstore/cpp/src/tests/txjob.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/txjob.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/txjob.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/txjob.cpp Fri Aug  3 12:13:32 2012
@@ -38,9 +38,9 @@ namespace tests {
 
 struct Args : public qpid::TestOptions
 {
-    string workQueue;
-    string source;
-    string dest;
+    std::string workQueue;
+    std::string source;
+    std::string dest;
     uint messages;
     uint jobs;
     bool quit;

Modified: qpid/branches/asyncstore/cpp/src/tests/txshift.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/txshift.cpp?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/txshift.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/txshift.cpp Fri Aug  3 12:13:32 2012
@@ -39,7 +39,7 @@ namespace tests {
 
 struct Args : public qpid::TestOptions
 {
-    string workQueue;
+    std::string workQueue;
     size_t workers;
 
     Args() : workQueue("txshift-control"), workers(1)

Modified: qpid/branches/asyncstore/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/xml/cluster.xml?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/xml/cluster.xml (original)
+++ qpid/branches/asyncstore/cpp/xml/cluster.xml Fri Aug  3 12:13:32 2012
@@ -179,6 +179,7 @@
       <field name="position" type="sequence-no"/>
       <field name="used-msg-credit" type="uint32"/>
       <field name="used-byte-credit" type="uint32"/>
+      <field name="deliveryCount" type="uint32"/>
     </control>
 
     <!-- Delivery-record for outgoing messages sent but not yet accepted. -->



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org