You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC

svn commit: r1451244 [18/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf2/rub...

Modified: qpid/branches/asyncstore/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/ha_tests.py?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/asyncstore/cpp/src/tests/ha_tests.py Thu Feb 28 16:14:30 2013
@@ -18,235 +18,32 @@
 # under the License.
 #
 
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
 import traceback
-from qpid.messaging import Message, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED
+from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
 from qpid.datatypes import uuid4
 from brokertest import *
+from ha_test import *
 from threading import Thread, Lock, Condition
 from logging import getLogger, WARN, ERROR, DEBUG, INFO
-from qpidtoollibs import BrokerAgent
+from qpidtoollibs import BrokerAgent, EventHelper
 from uuid import UUID
 
 log = getLogger(__name__)
 
-class QmfAgent(object):
-    """Access to a QMF broker agent."""
-    def __init__(self, address, **kwargs):
-        self._connection = Connection.establish(
-            address, client_properties={"qpid.ha-admin":1}, **kwargs)
-        self._agent = BrokerAgent(self._connection)
-        assert self._agent.getHaBroker(), "HA module not loaded in broker at: %s"%(address)
-
-    def __getattr__(self, name):
-        a = getattr(self._agent, name)
-        return a
-
-class Credentials(object):
-    """SASL credentials: username, password, and mechanism"""
-    def __init__(self, username, password, mechanism):
-        (self.username, self.password, self.mechanism) = (username, password, mechanism)
-
-    def __str__(self): return "Credentials%s"%(self.tuple(),)
-
-    def tuple(self): return (self.username, self.password, self.mechanism)
-
-    def add_user(self, url): return "%s/%s@%s"%(self.username, self.password, url)
-
-class HaBroker(Broker):
-    """Start a broker with HA enabled
-    @param client_cred: (user, password, mechanism) for admin clients started by the HaBroker.
-    """
-    def __init__(self, test, args=[], brokers_url=None, ha_cluster=True, ha_replicate="all",
-                 client_credentials=None, **kwargs):
-        assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
-        args = copy(args)
-        args += ["--load-module", BrokerTest.ha_lib,
-                 "--log-enable=debug+:ha::",
-                 # FIXME aconway 2012-02-13: workaround slow link failover.
-                 "--link-maintenace-interval=0.1",
-                 "--ha-cluster=%s"%ha_cluster]
-        if ha_replicate is not None:
-            args += [ "--ha-replicate=%s"%ha_replicate ]
-        if brokers_url: args += [ "--ha-brokers-url", brokers_url ]
-        Broker.__init__(self, test, args, **kwargs)
-        self.qpid_ha_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-ha")
-        assert os.path.exists(self.qpid_ha_path)
-        self.qpid_config_path=os.path.join(os.getenv("PYTHON_COMMANDS"), "qpid-config")
-        assert os.path.exists(self.qpid_config_path)
-        getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
-        self.qpid_ha_script=import_script(self.qpid_ha_path)
-        self._agent = None
-        self.client_credentials = client_credentials
-
-    def __str__(self): return Broker.__str__(self)
-
-    def qpid_ha(self, args):
-        cred = self.client_credentials
-        url = self.host_port()
-        if cred:
-            url =cred.add_user(url)
-            args = args + ["--sasl-mechanism", cred.mechanism]
-        self.qpid_ha_script.main_except(["", "-b", url]+args)
-
-    def promote(self): self.qpid_ha(["promote"])
-    def set_client_url(self, url): self.qpid_ha(["set", "--public-url", url])
-    def set_brokers_url(self, url): self.qpid_ha(["set", "--brokers-url", url])
-    def replicate(self, from_broker, queue): self.qpid_ha(["replicate", from_broker, queue])
-
-    def agent(self):
-        if not self._agent:
-            cred = self.client_credentials
-            if cred:
-                self._agent = QmfAgent(cred.add_user(self.host_port()), sasl_mechanisms=cred.mechanism)
-            else:
-                self._agent = QmfAgent(self.host_port())
-        return self._agent
-
-    def ha_status(self):
-        hb = self.agent().getHaBroker()
-        hb.update()
-        return hb.status
-
-    def wait_status(self, status):
-        def try_get_status():
-            # Ignore ConnectionError, the broker may not be up yet.
-            try:
-                self._status = self.ha_status()
-                return self._status == status;
-            except ConnectionError: return False
-        assert retry(try_get_status, timeout=20), "%s %r != %r"%(self, self._status, status)
-
-    # FIXME aconway 2012-05-01: do direct python call to qpid-config code.
-    def qpid_config(self, args):
-        assert subprocess.call(
-            [self.qpid_config_path, "--broker", self.host_port()]+args) == 0
-
-    def config_replicate(self, from_broker, queue):
-        self.qpid_config(["add", "queue", "--start-replica", from_broker, queue])
-
-    def config_declare(self, queue, replication):
-        self.qpid_config(["add", "queue", queue, "--replicate", replication])
-
-    def connect_admin(self, **kwargs):
-        cred = self.client_credentials
-        if cred:
-            return Broker.connect(
-                self, client_properties={"qpid.ha-admin":1},
-                username=cred.username, password=cred.password, sasl_mechanisms=cred.mechanism,
-                **kwargs)
-        else:
-            return Broker.connect(self, client_properties={"qpid.ha-admin":1}, **kwargs)
-
-    def wait_backup(self, address):
-        """Wait for address to become valid on a backup broker."""
-        bs = self.connect_admin().session()
-        try: wait_address(bs, address)
-        finally: bs.connection.close()
-
-    def assert_browse(self, queue, expected, **kwargs):
-        """Verify queue contents by browsing."""
-        bs = self.connect().session()
-        try:
-            wait_address(bs, queue)
-            assert_browse_retry(bs, queue, expected, **kwargs)
-        finally: bs.connection.close()
-
-    def assert_browse_backup(self, queue, expected, **kwargs):
-        """Combines wait_backup and assert_browse_retry."""
-        bs = self.connect_admin().session()
-        try:
-            wait_address(bs, queue)
-            assert_browse_retry(bs, queue, expected, **kwargs)
-        finally: bs.connection.close()
-
-    def assert_connect_fail(self):
-        try:
-            self.connect()
-            self.test.fail("Expected ConnectionError")
-        except ConnectionError: pass
+def grep(filename, regexp):
+    for line in open(filename).readlines():
+        if (regexp.search(line)): return True
+    return False
+
+class HaBrokerTest(BrokerTest):
+    """Base class for HA broker tests"""
+    def assert_log_no_errors(self, broker):
+        log = broker.get_log()
+        if grep(log, re.compile("] error|] critical")):
+            self.fail("Errors in log file %s"%(log))
 
-    def try_connect(self):
-        try: return self.connect()
-        except ConnectionError: return None
-
-class HaCluster(object):
-    _cluster_count = 0
-
-    def __init__(self, test, n, promote=True, **kwargs):
-        """Start a cluster of n brokers"""
-        self.test = test
-        self.kwargs = kwargs
-        self._brokers = []
-        self.id = HaCluster._cluster_count
-        self.broker_id = 0
-        HaCluster._cluster_count += 1
-        for i in xrange(n): self.start(False)
-        self.update_urls()
-        self[0].promote()
-
-    def next_name(self):
-        name="cluster%s-%s"%(self.id, self.broker_id)
-        self.broker_id += 1
-        return name
-
-    def start(self, update_urls=True, args=[]):
-        """Start a new broker in the cluster"""
-        b = HaBroker(self.test, name=self.next_name(), **self.kwargs)
-        self._brokers.append(b)
-        if update_urls: self.update_urls()
-        return b
-
-    def update_urls(self):
-        self.url = ",".join([b.host_port() for b in self])
-        if len(self) > 1:          # No failover addresses on a 1 cluster.
-            for b in self: b.set_brokers_url(self.url)
-
-    def connect(self, i):
-        """Connect with reconnect_urls"""
-        return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
-
-    def kill(self, i, promote_next=True):
-        """Kill broker i, promote broker i+1"""
-        self[i].expect = EXPECT_EXIT_FAIL
-        self[i].kill()
-        if promote_next: self[(i+1) % len(self)].promote()
-
-    def restart(self, i):
-        """Start a broker with the same port, name and data directory. It will get
-        a separate log file: foo.n.log"""
-        b = self._brokers[i]
-        self._brokers[i] = HaBroker(
-            self.test, name=b.name, port=b.port(), brokers_url=self.url,
-            **self.kwargs)
-
-    def bounce(self, i, promote_next=True):
-        """Stop and restart a broker in a cluster."""
-        self.kill(i, promote_next)
-        self.restart(i)
-
-    # Behave like a list of brokers.
-    def __len__(self): return len(self._brokers)
-    def __getitem__(self,index): return self._brokers[index]
-    def __iter__(self): return self._brokers.__iter__()
-
-def wait_address(session, address):
-    """Wait for an address to become valid."""
-    def check():
-        try:
-            session.sender(address)
-            return True
-        except NotFound: return False
-    assert retry(check), "Timed out waiting for address %s"%(address)
-
-def valid_address(session, address):
-    """Test if an address is valid"""
-    try:
-        session.receiver(address)
-        return True
-    except NotFound: return False
-
-class ReplicationTests(BrokerTest):
+class ReplicationTests(HaBrokerTest):
     """Correctness tests for  HA replication."""
 
     def test_replication(self):
@@ -256,8 +53,9 @@ class ReplicationTests(BrokerTest):
         def queue(name, replicate):
             return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
 
-        def exchange(name, replicate, bindq):
-            return"%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'fanout'},x-bindings:[{exchange:'%s',queue:'%s'}]}}"%(name, replicate, name, bindq)
+        def exchange(name, replicate, bindq, key):
+            return "%s/%s;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':%s}, type:'topic'},x-bindings:[{exchange:'%s',queue:'%s',key:'%s'}]}}"%(name, key, replicate, name, bindq, key)
+
         def setup(p, prefix, primary):
             """Create config, send messages on the primary p"""
             s = p.sender(queue(prefix+"q1", "all"))
@@ -267,16 +65,21 @@ class ReplicationTests(BrokerTest):
             p.acknowledge()
             p.sender(queue(prefix+"q2", "configuration")).send(Message("2"))
             p.sender(queue(prefix+"q3", "none")).send(Message("3"))
-            p.sender(exchange(prefix+"e1", "all", prefix+"q1")).send(Message("4"))
-            p.sender(exchange(prefix+"e2", "all", prefix+"q2")).send(Message("5"))
+            p.sender(exchange(prefix+"e1", "all", prefix+"q1", "key1")).send(Message("4"))
+            p.sender(exchange(prefix+"e2", "configuration", prefix+"q2", "key2")).send(Message("5"))
             # Test  unbind
             p.sender(queue(prefix+"q4", "all")).send(Message("6"))
-            s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4"))
+            s3 = p.sender(exchange(prefix+"e4", "all", prefix+"q4", "key4"))
             s3.send(Message("7"))
             # Use old connection to unbind
             us = primary.connect_old().session(str(uuid4()))
-            us.exchange_unbind(exchange=prefix+"e4", binding_key="", queue=prefix+"q4")
+            us.exchange_unbind(exchange=prefix+"e4", binding_key="key4", queue=prefix+"q4")
             p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped
+            # Test replication of deletes
+            p.sender(queue(prefix+"dq", "all"))
+            p.sender(exchange(prefix+"de", "all", prefix+"dq", ""))
+            p.sender(prefix+"dq;{delete:always}").close()
+            p.sender(prefix+"de;{delete:always}").close()
             # Need a marker so we can wait till sync is done.
             p.sender(queue(prefix+"x", "configuration"))
 
@@ -292,50 +95,61 @@ class ReplicationTests(BrokerTest):
 
             self.assert_browse_retry(b, prefix+"q2", []) # configuration only
             assert not valid_address(b, prefix+"q3")
-            b.sender(prefix+"e1").send(Message(prefix+"e1")) # Verify binds with replicate=all
+
+            # Verify exchange with replicate=all
+            b.sender(prefix+"e1/key1").send(Message(prefix+"e1"))
             self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
-            b.sender(prefix+"e2").send(Message(prefix+"e2")) # Verify binds with replicate=configuration
+
+            # Verify exchange with replicate=configuration
+            b.sender(prefix+"e2/key2").send(Message(prefix+"e2")) 
             self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
 
-            b.sender(prefix+"e4").send(Message("drop2")) # Verify unbind.
+            b.sender(prefix+"e4/key4").send(Message("drop2")) # Verify unbind.
             self.assert_browse_retry(b, prefix+"q4", ["6","7"])
 
-        primary = HaBroker(self, name="primary")
-        primary.promote()
-        p = primary.connect().session()
+            # Verify deletes
+            assert not valid_address(b, prefix+"dq")
+            assert not valid_address(b, prefix+"de")
 
-        # Create config, send messages before starting the backup, to test catch-up replication.
-        setup(p, "1", primary)
-        backup  = HaBroker(self, name="backup", brokers_url=primary.host_port())
-        # Create config, send messages after starting the backup, to test steady-state replication.
-        setup(p, "2", primary)
-
-        # Verify the data on the backup
-        b = backup.connect_admin().session()
-        verify(b, "1", p)
-        verify(b, "2", p)
-        # Test a series of messages, enqueue all then dequeue all.
-        s = p.sender(queue("foo","all"))
-        wait_address(b, "foo")
-        msgs = [str(i) for i in range(10)]
-        for m in msgs: s.send(Message(m))
-        self.assert_browse_retry(p, "foo", msgs)
-        self.assert_browse_retry(b, "foo", msgs)
-        r = p.receiver("foo")
-        for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content)
-        p.acknowledge()
-        self.assert_browse_retry(p, "foo", [])
-        self.assert_browse_retry(b, "foo", [])
-
-        # Another series, this time verify each dequeue individually.
-        for m in msgs: s.send(Message(m))
-        self.assert_browse_retry(p, "foo", msgs)
-        self.assert_browse_retry(b, "foo", msgs)
-        for i in range(len(msgs)):
-            self.assertEqual(msgs[i], r.fetch(timeout=0).content)
+        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+        try:
+            primary = HaBroker(self, name="primary")
+            primary.promote()
+            p = primary.connect().session()
+
+            # Create config, send messages before starting the backup, to test catch-up replication.
+            setup(p, "1", primary)
+            backup  = HaBroker(self, name="backup", brokers_url=primary.host_port())
+            # Create config, send messages after starting the backup, to test steady-state replication.
+            setup(p, "2", primary)
+
+            # Verify the data on the backup
+            b = backup.connect_admin().session()
+            verify(b, "1", p)
+            verify(b, "2", p)
+            # Test a series of messages, enqueue all then dequeue all.
+            s = p.sender(queue("foo","all"))
+            wait_address(b, "foo")
+            msgs = [str(i) for i in range(10)]
+            for m in msgs: s.send(Message(m))
+            self.assert_browse_retry(p, "foo", msgs)
+            self.assert_browse_retry(b, "foo", msgs)
+            r = p.receiver("foo")
+            for m in msgs: self.assertEqual(m, r.fetch(timeout=0).content)
             p.acknowledge()
-            self.assert_browse_retry(p, "foo", msgs[i+1:])
-            self.assert_browse_retry(b, "foo", msgs[i+1:])
+            self.assert_browse_retry(p, "foo", [])
+            self.assert_browse_retry(b, "foo", [])
+
+            # Another series, this time verify each dequeue individually.
+            for m in msgs: s.send(Message(m))
+            self.assert_browse_retry(p, "foo", msgs)
+            self.assert_browse_retry(b, "foo", msgs)
+            for i in range(len(msgs)):
+                self.assertEqual(msgs[i], r.fetch(timeout=0).content)
+                p.acknowledge()
+                self.assert_browse_retry(p, "foo", msgs[i+1:])
+                self.assert_browse_retry(b, "foo", msgs[i+1:])
+        finally: l.restore()
 
     def test_sync(self):
         primary = HaBroker(self, name="primary")
@@ -361,53 +175,59 @@ class ReplicationTests(BrokerTest):
 
     def test_send_receive(self):
         """Verify sequence numbers of messages sent by qpid-send"""
-        brokers = HaCluster(self, 3)
-        sender = self.popen(
-            ["qpid-send",
-             "--broker", brokers[0].host_port(),
-             "--address", "q;{create:always}",
-             "--messages=1000",
-             "--content-string=x"
-             ])
-        receiver = self.popen(
-            ["qpid-receive",
-             "--broker", brokers[0].host_port(),
-             "--address", "q;{create:always}",
-             "--messages=990",
-             "--timeout=10"
-             ])
-        self.assertEqual(sender.wait(), 0)
-        self.assertEqual(receiver.wait(), 0)
-        expect = [long(i) for i in range(991, 1001)]
-        sn = lambda m: m.properties["sn"]
-        brokers[1].assert_browse_backup("q", expect, transform=sn)
-        brokers[2].assert_browse_backup("q", expect, transform=sn)
+        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+        try:
+            brokers = HaCluster(self, 3)
+            sender = self.popen(
+                ["qpid-send",
+                 "--broker", brokers[0].host_port(),
+                 "--address", "q;{create:always}",
+                 "--messages=1000",
+                 "--content-string=x"
+                 ])
+            receiver = self.popen(
+                ["qpid-receive",
+                 "--broker", brokers[0].host_port(),
+                 "--address", "q;{create:always}",
+                 "--messages=990",
+                 "--timeout=10"
+                 ])
+            self.assertEqual(sender.wait(), 0)
+            self.assertEqual(receiver.wait(), 0)
+            expect = [long(i) for i in range(991, 1001)]
+            sn = lambda m: m.properties["sn"]
+            brokers[1].assert_browse_backup("q", expect, transform=sn)
+            brokers[2].assert_browse_backup("q", expect, transform=sn)
+        finally: l.restore()
 
     def test_failover_python(self):
         """Verify that backups rejects connections and that fail-over works in python client"""
-        primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
-        primary.promote()
-        backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
-        # Check that backup rejects normal connections
+        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
         try:
-            backup.connect().session()
-            self.fail("Expected connection to backup to fail")
-        except ConnectionError: pass
-        # Check that admin connections are allowed to backup.
-        backup.connect_admin().close()
-
-        # Test discovery: should connect to primary after reject by backup
-        c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
-        s = c.session()
-        sender = s.sender("q;{create:always}")
-        backup.wait_backup("q")
-        sender.send("foo")
-        primary.kill()
-        assert retry(lambda: not is_running(primary.pid))
-        backup.promote()
-        sender.send("bar")
-        self.assert_browse_retry(s, "q", ["foo", "bar"])
-        c.close()
+            primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+            primary.promote()
+            backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+            # Check that backup rejects normal connections
+            try:
+                backup.connect().session()
+                self.fail("Expected connection to backup to fail")
+            except ConnectionError: pass
+            # Check that admin connections are allowed to backup.
+            backup.connect_admin().close()
+
+            # Test discovery: should connect to primary after reject by backup
+            c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
+            s = c.session()
+            sender = s.sender("q;{create:always}")
+            backup.wait_backup("q")
+            sender.send("foo")
+            primary.kill()
+            assert retry(lambda: not is_running(primary.pid))
+            backup.promote()
+            sender.send("bar")
+            self.assert_browse_retry(s, "q", ["foo", "bar"])
+            c.close()
+        finally: l.restore()
 
     def test_failover_cpp(self):
         """Verify that failover works in the C++ client."""
@@ -456,51 +276,61 @@ class ReplicationTests(BrokerTest):
 
     def test_standalone_queue_replica(self):
         """Test replication of individual queues outside of cluster mode"""
-        getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
-        primary = HaBroker(self, name="primary", ha_cluster=False)
-        pc = primary.connect()
-        ps = pc.session().sender("q;{create:always}")
-        pr = pc.session().receiver("q;{create:always}")
-        backup = HaBroker(self, name="backup", ha_cluster=False)
-        br = backup.connect().session().receiver("q;{create:always}")
-
-        # Set up replication with qpid-ha
-        backup.replicate(primary.host_port(), "q")
-        ps.send("a")
-        backup.assert_browse_backup("q", ["a"])
-        ps.send("b")
-        backup.assert_browse_backup("q", ["a", "b"])
-        self.assertEqual("a", pr.fetch().content)
-        pr.session.acknowledge()
-        backup.assert_browse_backup("q", ["b"])
-
-        # Set up replication with qpid-config
-        ps2 = pc.session().sender("q2;{create:always}")
-        backup.config_replicate(primary.host_port(), "q2");
-        ps2.send("x")
-        backup.assert_browse_backup("q2", ["x"])
-
+        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+        try:
+            primary = HaBroker(self, name="primary", ha_cluster=False,
+                               args=["--ha-queue-replication=yes"]);
+            pc = primary.connect()
+            ps = pc.session().sender("q;{create:always}")
+            pr = pc.session().receiver("q;{create:always}")
+            backup = HaBroker(self, name="backup", ha_cluster=False,
+                              args=["--ha-queue-replication=yes"])
+            br = backup.connect().session().receiver("q;{create:always}")
+
+            # Set up replication with qpid-ha
+            backup.replicate(primary.host_port(), "q")
+            ps.send("a")
+            backup.assert_browse_backup("q", ["a"])
+            ps.send("b")
+            backup.assert_browse_backup("q", ["a", "b"])
+            self.assertEqual("a", pr.fetch().content)
+            pr.session.acknowledge()
+            backup.assert_browse_backup("q", ["b"])
+
+            # Set up replication with qpid-config
+            ps2 = pc.session().sender("q2;{create:always}")
+            backup.config_replicate(primary.host_port(), "q2");
+            ps2.send("x")
+            backup.assert_browse_backup("q2", ["x"])
+        finally: l.restore()
 
     def test_queue_replica_failover(self):
-        """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over."""
-        cluster = HaCluster(self, 2)
-        primary = cluster[0]
-        pc = cluster.connect(0)
-        ps = pc.session().sender("q;{create:always}")
-        pr = pc.session().receiver("q;{create:always}")
-        backup = HaBroker(self, name="backup", ha_cluster=False)
-        br = backup.connect().session().receiver("q;{create:always}")
-        backup.replicate(cluster.url, "q")
-        ps.send("a")
-        backup.assert_browse_backup("q", ["a"])
-        cluster.bounce(0)
-        backup.assert_browse_backup("q", ["a"])
-        ps.send("b")
-        backup.assert_browse_backup("q", ["a", "b"])
-        cluster.bounce(1)
-        self.assertEqual("a", pr.fetch().content)
-        pr.session.acknowledge()
-        backup.assert_browse_backup("q", ["b"])
+        """Test individual queue replication from a cluster to a standalone
+        backup broker, verify it fails over."""
+        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+        try:
+            cluster = HaCluster(self, 2)
+            primary = cluster[0]
+            pc = cluster.connect(0)
+            ps = pc.session().sender("q;{create:always}")
+            pr = pc.session().receiver("q;{create:always}")
+            backup = HaBroker(self, name="backup", ha_cluster=False,
+                              args=["--ha-queue-replication=yes"])
+            br = backup.connect().session().receiver("q;{create:always}")
+            backup.replicate(cluster.url, "q")
+            ps.send("a")
+            backup.assert_browse_backup("q", ["a"])
+            cluster.bounce(0)
+            backup.assert_browse_backup("q", ["a"])
+            ps.send("b")
+            backup.assert_browse_backup("q", ["a", "b"])
+            cluster.bounce(1)
+            self.assertEqual("a", pr.fetch().content)
+            pr.session.acknowledge()
+            backup.assert_browse_backup("q", ["b"])
+            pc.close()
+            br.close()
+        finally: l.restore()
 
     def test_lvq(self):
         """Verify that we replicate to an LVQ correctly"""
@@ -634,8 +464,10 @@ class ReplicationTests(BrokerTest):
     def test_replicate_default(self):
         """Make sure we don't replicate if ha-replicate is unspecified or none"""
         cluster1 = HaCluster(self, 2, ha_replicate=None)
+        cluster1[1].wait_status("ready")
         c1 = cluster1[0].connect().session().sender("q;{create:always}")
         cluster2 = HaCluster(self, 2, ha_replicate="none")
+        cluster2[1].wait_status("ready")
         cluster2[0].connect().session().sender("q;{create:always}")
         time.sleep(.1)               # Give replication a chance.
         try:
@@ -647,6 +479,23 @@ class ReplicationTests(BrokerTest):
             self.fail("Excpected no-such-queue exception")
         except NotFound: pass
 
+    def test_replicate_binding(self):
+        """Verify that binding replication can be disabled"""
+        primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
+        primary.promote()
+        backup = HaBroker(self, name="backup", brokers_url=primary.host_port())
+        ps = primary.connect().session()
+        ps.sender("ex;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}")
+        ps.sender("q;{create:always,node:{type:queue,x-declare:{arguments:{'qpid.replicate':all}},x-bindings:[{exchange:'ex',queue:'q',key:'',arguments:{'qpid.replicate':none}}]}}")
+        backup.wait_backup("q")
+
+        primary.kill()
+        assert retry(lambda: not is_running(primary.pid)) # Wait for primary to die
+        backup.promote()
+        bs = backup.connect_admin().session()
+        bs.sender("ex").send(Message("msg"))
+        self.assert_browse_retry(bs, "q", [])
+
     def test_invalid_replication(self):
         """Verify that we reject an attempt to declare a queue with invalid replication value."""
         cluster = HaCluster(self, 1, ha_replicate="all")
@@ -672,20 +521,26 @@ class ReplicationTests(BrokerTest):
 
     def test_auto_delete_exclusive(self):
         """Verify that we ignore auto-delete, exclusive, non-auto-delete-timeout queues"""
-        cluster = HaCluster(self,2)
-        s = cluster[0].connect().session()
-        s.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}")
-        s.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}")
-        s.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}")
-        s.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
-        s.receiver("q;{create:always}")
+        cluster = HaCluster(self, 2)
+        s0 = cluster[0].connect().session()
+        s0.receiver("exad;{create:always,node:{x-declare:{exclusive:True,auto-delete:True}}}")
+        s0.receiver("ex;{create:always,node:{x-declare:{exclusive:True}}}")
+        ad = s0.receiver("ad;{create:always,node:{x-declare:{auto-delete:True}}}")
+        s0.receiver("time;{create:always,node:{x-declare:{exclusive:True,auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
+        s0.receiver("q;{create:always}")
 
-        s = cluster[1].connect_admin().session()
+        s1 = cluster[1].connect_admin().session()
         cluster[1].wait_backup("q")
-        assert not valid_address(s, "exad")
-        assert valid_address(s, "ex")
-        assert valid_address(s, "ad")
-        assert valid_address(s, "time")
+        assert not valid_address(s1, "exad")
+        assert valid_address(s1, "ex")
+        assert valid_address(s1, "ad")
+        assert valid_address(s1, "time")
+
+        # Verify that auto-delete queues are not kept alive by
+        # replicating subscriptions
+        ad.close()
+        s0.sync()
+        assert not valid_address(s0, "ad")
 
     def test_broker_info(self):
         """Check that broker information is correctly published via management"""
@@ -763,18 +618,18 @@ acl deny all all
         s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
         # altq queue bound to altex, collect re-routed messages.
         s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
-        # 0ex exchange with alternate-exchange altex and no queues bound
-        s.sender("0ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
+        # ex exchange with alternate-exchange altex and no queues bound
+        s.sender("ex;{create:always,node:{type:topic, x-declare:{type:'direct', alternate-exchange:'altex'}}}")
         # create queue q with alternate-exchange altex
         s.sender("q;{create:always,node:{type:queue, x-declare:{alternate-exchange:'altex'}}}")
         # create a bunch of exchanges to ensure we don't clean up prematurely if the
         # response comes in multiple fragments.
-        for i in xrange(200): s.sender("00ex%s;{create:always,node:{type:topic}}"%i)
+        for i in xrange(200): s.sender("ex.%s;{create:always,node:{type:topic}}"%i)
 
         def verify(broker):
             s = broker.connect().session()
             # Verify unmatched message goes to ex's alternate.
-            s.sender("0ex").send("foo")
+            s.sender("ex").send("foo")
             altq = s.receiver("altq")
             self.assertEqual("foo", altq.fetch(timeout=0).content)
             s.acknowledge()
@@ -786,20 +641,265 @@ acl deny all all
             self.assertEqual("bar", altq.fetch(timeout=0).content)
             s.acknowledge()
 
+        def ss(n): return cluster[n].connect().session()
+
         # Sanity check: alternate exchanges on original broker
         verify(cluster[0])
+        # Altex is in use as an alternate exchange.
+        self.assertRaises(SessionError,
+                          lambda:ss(0).sender("altex;{delete:always}").close())
         # Check backup that was connected during setup.
-        cluster[1].wait_backup("0ex")
+        cluster[1].wait_status("ready")
+        cluster[1].wait_backup("ex")
         cluster[1].wait_backup("q")
         cluster.bounce(0)
         verify(cluster[1])
+
         # Check a newly started backup.
         cluster.start()
-        cluster[2].wait_backup("0ex")
+        cluster[2].wait_status("ready")
+        cluster[2].wait_backup("ex")
         cluster[2].wait_backup("q")
         cluster.bounce(1)
         verify(cluster[2])
 
+        # Check that alt-exchange in-use count is replicated
+        s = cluster[2].connect().session();
+
+        self.assertRaises(SessionError,
+                          lambda:ss(2).sender("altex;{delete:always}").close())
+        s.sender("q;{delete:always}").close()
+        self.assertRaises(SessionError,
+                          lambda:ss(2).sender("altex;{delete:always}").close())
+        s.sender("ex;{delete:always}").close()
+        s.sender("altex;{delete:always}").close()
+
+    def test_priority_reroute(self):
+        """Regression test for QPID-4262, rerouting messages from a priority queue
+        to itself causes a crash"""
+        cluster = HaCluster(self, 2)
+        primary = cluster[0]
+        session = primary.connect().session()
+        s = session.sender("pq; {create:always, node:{x-declare:{arguments:{'qpid.priorities':10}},x-bindings:[{exchange:'amq.fanout',queue:pq}]}}")
+        for m in xrange(100): s.send(Message(str(m), priority=m%10))
+        pq =  QmfAgent(primary.host_port()).getQueue("pq")
+        pq.reroute(request=0, useAltExchange=False, exchange="amq.fanout")
+        # Verify that consuming is in priority order
+        expect = [str(10*i+p) for p in xrange(9,-1,-1) for i in xrange(0,10) ]
+        actual = [m.content for m in primary.get_messages("pq", 100)]
+        self.assertEqual(expect, actual)
+
+    def test_delete_missing_response(self):
+        """Check that a backup correctly deletes leftover queues and exchanges that are
+        missing from the initial reponse set."""
+        # This test is a bit contrived, we set up the situation on backup brokers
+        # and then promote one.
+        cluster = HaCluster(self, 2, promote=False)
+
+        # cluster[0] Will be the primary
+        s = cluster[0].connect_admin().session()
+        s.sender("q1;{create:always}")
+        s.sender("e1;{create:always, node:{type:topic}}")
+
+        # cluster[1] will be the backup, has extra queues/exchanges
+        xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}"
+        node = "node:{%s}"%(xdecl)
+        s = cluster[1].connect_admin().session()
+        s.sender("q1;{create:always, %s}"%(node))
+        s.sender("q2;{create:always, %s}"%(node))
+        s.sender("e1;{create:always, node:{type:topic, %s}}"%(xdecl))
+        s.sender("e2;{create:always, node:{type:topic, %s}}"%(xdecl))
+        for a in ["q1", "q2", "e1", "e2"]: cluster[1].wait_backup(a)
+
+        cluster[0].promote()
+        # Verify the backup deletes the surplus queue and exchange
+        cluster[1].wait_status("ready")
+        s = cluster[1].connect_admin().session()
+        self.assertRaises(NotFound, s.receiver, ("q2"));
+        self.assertRaises(NotFound, s.receiver, ("e2"));
+
+
+    def test_delete_qpid_4285(self):
+        """Regression test for QPID-4285: on deleting a queue it gets stuck in a
+        partially deleted state and causes replication errors."""
+        cluster = HaCluster(self,2)
+        s = cluster[0].connect().session()
+        s.receiver("q;{create:always}")
+        cluster[1].wait_backup("q")
+        cluster.kill(0)       # Make the backup take over.
+        s = cluster[1].connect().session()
+        s.receiver("q;{delete:always}").close() # Delete q on new primary
+        try:
+            s.receiver("q")
+            self.fail("Expected NotFound exception") # Should not be avaliable
+        except NotFound: pass
+        assert not cluster[1].agent().getQueue("q") # Should not be in QMF
+
+    def alt_setup(self, session, suffix):
+        # Create exchange to use as alternate and a queue bound to it.
+        # altex exchange: acts as alternate exchange
+        session.sender("altex%s;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}"%(suffix))
+        # altq queue bound to altex, collect re-routed messages.
+        session.sender("altq%s;{create:always,node:{x-bindings:[{exchange:'altex%s',queue:altq%s}]}}"%(suffix,suffix,suffix))
+
+    def test_auto_delete_close(self):
+        """Verify auto-delete queues are deleted on backup if auto-deleted
+        on primary"""
+        cluster=HaCluster(self, 2)
+        p = cluster[0].connect().session()
+        self.alt_setup(p, "1")
+        r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1)
+        s = p.sender("adq1")
+        for m in ["aa","bb","cc"]: s.send(m)
+        p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}")
+        cluster[1].wait_queue("adq1")
+        cluster[1].wait_queue("adq2")
+        r.close()               # trigger auto-delete of adq1
+        cluster[1].wait_no_queue("adq1")
+        cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"])
+        cluster[1].wait_queue("adq2")
+
+    def test_auto_delete_crash(self):
+        """Verify auto-delete queues are deleted on backup if the primary crashes"""
+        cluster=HaCluster(self, 2)
+        p = cluster[0].connect().session()
+        self.alt_setup(p,"1")
+
+        # adq1 is subscribed so will be auto-deleted.
+        r = p.receiver("adq1;{create:always,node:{x-declare:{auto-delete:True,alternate-exchange:'altex1'}}}", capacity=1)
+        s = p.sender("adq1")
+        for m in ["aa","bb","cc"]: s.send(m)
+        # adq2 is subscribed after cluster[2] starts.
+        p.sender("adq2;{create:always,node:{x-declare:{auto-delete:True}}}")
+        # adq3 is never subscribed.
+        p.sender("adq3;{create:always,node:{x-declare:{auto-delete:True}}}")
+
+        cluster.start()
+        cluster[2].wait_status("ready")
+
+        p.receiver("adq2")      # Subscribed after cluster[2] joined
+
+        for q in ["adq1","adq2","adq3","altq1"]: cluster[1].wait_queue(q)
+        for q in ["adq1","adq2","adq3","altq1"]: cluster[2].wait_queue(q)
+        cluster[0].kill()
+
+        cluster[1].wait_no_queue("adq1")
+        cluster[1].wait_no_queue("adq2")
+        cluster[1].wait_queue("adq3")
+
+        cluster[2].wait_no_queue("adq1")
+        cluster[2].wait_no_queue("adq2")
+        cluster[2].wait_queue("adq3")
+
+        cluster[1].assert_browse_backup("altq1", ["aa","bb","cc"])
+        cluster[2].assert_browse_backup("altq1", ["aa","bb","cc"])
+
+    def test_auto_delete_timeout(self):
+        cluster = HaCluster(self, 2)
+        # Test timeout
+        r1 = cluster[0].connect().session().receiver("q1;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':1}}}}")
+        # Test special case of timeout = 0
+        r0 = cluster[0].connect().session().receiver("q0;{create:always,node:{x-declare:{auto-delete:True,arguments:{'qpid.auto_delete_timeout':0}}}}")
+        cluster[1].wait_queue("q0")
+        cluster[1].wait_queue("q1")
+        cluster[0].kill()
+        cluster[1].wait_queue("q1")    # Not timed out yet
+        cluster[1].wait_no_queue("q1", timeout=5) # Wait for timeout
+        cluster[1].wait_no_queue("q0", timeout=5) # Wait for timeout
+
+    def test_alt_exchange_dup(self):
+        """QPID-4349: if a queue has an alterante exchange and is deleted the
+        messages appear twice on the alternate, they are rerouted once by the
+        primary and again by the backup."""
+        cluster = HaCluster(self,2)
+
+        # Set up q with alternate exchange altex bound to altq.
+        s = cluster[0].connect().session()
+        s.sender("altex;{create:always,node:{type:topic,x-declare:{type:'fanout'}}}")
+        s.sender("altq;{create:always,node:{x-bindings:[{exchange:'altex',queue:altq}]}}")
+        snd = s.sender("q;{create:always,node:{x-declare:{alternate-exchange:'altex'}}}")
+        messages = [ str(n) for n in xrange(10) ]
+        for m in messages: snd.send(m)
+        cluster[1].assert_browse_backup("q", messages)
+        s.sender("q;{delete:always}").close()
+        cluster[1].assert_browse_backup("altq", messages)
+
+    def test_expired(self):
+        """Regression test for QPID-4379: HA does not properly handle expired messages"""
+        # Race between messages expiring and HA replicating consumer.
+        cluster = HaCluster(self, 2)
+        s = cluster[0].connect().session().sender("q;{create:always}", capacity=2)
+        def send_ttl_messages():
+            for i in xrange(100): s.send(Message(str(i), ttl=0.001), timeout=1)
+        send_ttl_messages()
+        cluster.start()
+        send_ttl_messages()
+
+    def test_stale_response(self):
+        """Check for race condition where a stale response is processed after an
+        event for the same queue/exchange """
+        cluster = HaCluster(self, 2)
+        s = cluster[0].connect().session()
+        s.sender("keep;{create:always}") # Leave this queue in place.
+        for i in xrange(1000):
+            s.sender("deleteme%s;{create:always,delete:always}"%(i)).close()
+        # It is possible for the backup to attempt to subscribe after the queue
+        # is deleted. This is not an error, but is logged as an error on the primary.
+        # The backup does not log this as an error so we only check the backup log for errors.
+        self.assert_log_no_errors(cluster[1])
+
+    def test_missed_recreate(self):
+        """If a queue or exchange is destroyed and one with the same name re-created
+        while a backup is disconnected, the backup should also delete/recreate
+        the object when it re-connects"""
+        cluster = HaCluster(self, 3)
+        sn = cluster[0].connect().session()
+        # Create a queue with messages
+        s = sn.sender("qq;{create:always}")
+        msgs = [str(i) for i in xrange(3)]
+        for m in msgs: s.send(m)
+        cluster[1].assert_browse_backup("qq", msgs)
+        cluster[2].assert_browse_backup("qq", msgs)
+        # Set up an exchange with a binding.
+        sn.sender("xx;{create:always,node:{type:topic}}")
+        sn.sender("xxq;{create:always,node:{x-bindings:[{exchange:'xx',queue:'xxq',key:xxq}]}}")
+        cluster[1].wait_address("xx")
+        self.assertEqual(cluster[1].agent().getExchange("xx").values["bindingCount"], 1)
+        cluster[2].wait_address("xx")
+        self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 1)
+
+        # Simulate the race by re-creating the objects before promoting the new primary
+        cluster.kill(0, False)
+        xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}"
+        node = "node:{%s}"%(xdecl)
+        sn = cluster[1].connect_admin().session()
+        sn.sender("qq;{delete:always}").close()
+        s = sn.sender("qq;{create:always, %s}"%(node))
+        s.send("foo")
+        sn.sender("xx;{delete:always}").close()
+        sn.sender("xx;{create:always,node:{type:topic,%s}}"%(xdecl))
+        cluster[1].promote()
+        cluster[1].wait_status("active")
+        # Verify we are not still using the old objects on cluster[2]
+        cluster[2].assert_browse_backup("qq", ["foo"])
+        cluster[2].wait_address("xx")
+        self.assertEqual(cluster[2].agent().getExchange("xx").values["bindingCount"], 0)
+
+    def test_redeclare_exchange(self):
+        """Ensure that re-declaring an exchange is an HA no-op"""
+        cluster = HaCluster(self, 2)
+        ps = cluster[0].connect().session()
+        ps.sender("ex1;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout'}}}")
+        ps.sender("ex2;{create:always,node:{type:topic,x-declare:{arguments:{'qpid.replicate':all}, type:'fanout', alternate-exchange:'ex1'}}}")
+        cluster[1].wait_backup("ex1")
+        cluster[1].wait_backup("ex2")
+
+        # Use old API to re-declare the exchange
+        old_conn = cluster[0].connect_old()
+        old_sess = old_conn.session(str(qpid.datatypes.uuid4()))
+        old_sess.exchange_declare(exchange='ex1', type='fanout')
+        cluster[1].wait_backup("ex1")
+
 def fairshare(msgs, limit, levels):
     """
     Generator to return prioritised messages in expected order for a given fairshare limit
@@ -812,7 +912,7 @@ def fairshare(msgs, limit, levels):
             msgs = postponed
             count = 0
             last_priority = None
-            postponed = []
+            postponed = [ ]
         msg = msgs.pop(0)
         if last_priority and priority_level(msg.priority, levels) == last_priority:
             count += 1
@@ -834,7 +934,7 @@ def priority_level(value, levels):
     offset = 5-math.ceil(levels/2.0)
     return min(max(value - offset, 0), levels-1)
 
-class LongTests(BrokerTest):
+class LongTests(HaBrokerTest):
     """Tests that can run for a long time if -DDURATION=<minutes> is set"""
 
     def duration(self):
@@ -860,7 +960,7 @@ class LongTests(BrokerTest):
             """Wait for receiver r to pass n"""
             def check():
                 r.check()       # Verify no exceptions
-                return r.received > n
+                return r.received > n + 100
             assert retry(check), "Stalled %s at %s"%(r.queue, n)
 
         for r in receivers: wait_passed(r, 0)
@@ -868,87 +968,176 @@ class LongTests(BrokerTest):
         # Kill and restart brokers in a cycle:
         endtime = time.time() + self.duration()
         i = 0
+        primary = 0
         try:
             while time.time() < endtime or i < 3: # At least 3 iterations
+                # Precondition: All 3 brokers running,
+                # primary = index of promoted primary
+                # one or two backups are running,
                 for s in senders: s.sender.assert_running()
                 for r in receivers: r.receiver.assert_running()
-                checkpoint = [ r.received for r in receivers ]
-                # Don't kill primary till it is active and the next
-                # backup is ready, otherwise we can lose messages.
-                brokers[i%3].wait_status("active")
-                brokers[(i+1)%3].wait_status("ready")
-                brokers.bounce(i%3)
+                checkpoint = [ r.received+100 for r in receivers ]
+                dead = None
+                victim = random.randint(0,2)
+                if victim == primary:
+                    # Don't kill primary till it is active and the next
+                    # backup is ready, otherwise we can lose messages.
+                    brokers[victim].wait_status("active")
+                    next = (victim+1)%3
+                    brokers[next].wait_status("ready")
+                    brokers.bounce(victim) # Next one is promoted
+                    primary = next
+                else:
+                    brokers.kill(victim, False)
+                    dead = victim
+
+                # At this point the primary is running with 1 or 2 backups
+                # Make sure we are not stalled
+                map(wait_passed, receivers, checkpoint)
+                # Run another checkpoint to ensure things work in this configuration
+                checkpoint = [ r.received+100 for r in receivers ]
+                map(wait_passed, receivers, checkpoint)
+
+                if dead is not None:
+                    brokers.restart(dead) # Restart backup
+                    brokers[dead].ready()
+                    dead = None
                 i += 1
-                map(wait_passed, receivers, checkpoint) # Wait for all receivers
         except:
             traceback.print_exc()
             raise
         finally:
             for s in senders: s.stop()
             for r in receivers: r.stop()
-            dead = []
+            unexpected_dead = []
             for i in xrange(3):
-                if not brokers[i].is_running(): dead.append(i)
-                brokers.kill(i, False)
-            if dead: raise Exception("Brokers not running: %s"%dead)
+                if not brokers[i].is_running() and i != dead:
+                    unexpected_dead.append(i)
+                if brokers[i].is_running(): brokers.kill(i, False)
+            if unexpected_dead:
+                raise Exception("Brokers not running: %s"%unexpected_dead)
+
+    def test_qmf_order(self):
+        """QPID 4402:  HA QMF events can be out of order.
+        This test mimics the test described in the JIRA. Two threads repeatedly
+        declare the same auto-delete queue and close their connection.
+        """
+        broker = Broker(self)
+        class Receiver(Thread):
+            def __init__(self, qname):
+                Thread.__init__(self)
+                self.qname = qname
+                self.stopped = False
+
+            def run(self):
+                while not self.stopped:
+                    self.connection = broker.connect()
+                    try:
+                        self.connection.session().receiver(
+                            self.qname+";{create:always,node:{x-declare:{auto-delete:True}}}")
+                    except NotFound: pass # Can occur occasionally, not an error.
+                    try: self.connection.close()
+                    except: pass
+
+        class QmfObject(object):
+            """Track existance of an object and validate QMF events"""
+            def __init__(self, type_name, name_field, name):
+                self.type_name, self.name_field, self.name = type_name, name_field, name
+                self.exists = False
+
+            def qmf_event(self, event):
+                content = event.content[0]
+                event_type = content['_schema_id']['_class_name']
+                values = content['_values']
+                if event_type == self.type_name+"Declare" and values[self.name_field] == self.name:
+                    disp = values['disp']
+                    log.debug("Event %s: disp=%s exists=%s"%(
+                            event_type, values['disp'], self.exists))
+                    if self.exists: assert values['disp'] == 'existing'
+                    else: assert values['disp'] == 'created'
+                    self.exists = True
+                elif event_type == self.type_name+"Delete" and values[self.name_field] == self.name:
+                    log.debug("Event %s: exists=%s"%(event_type, self.exists))
+                    assert self.exists
+                    self.exists = False
+
+        # Verify order of QMF events.
+        helper = EventHelper()
+        r = broker.connect().session().receiver(helper.eventAddress())
+        threads = [Receiver("qq"), Receiver("qq")]
+        for t in threads: t.start()
+        queue = QmfObject("queue", "qName", "qq")
+        finish = time.time() + self.duration()
+        try:
+            while time.time() < finish:
+                queue.qmf_event(r.fetch())
+        finally:
+            for t in threads: t.stopped = True; t.join()
 
-class RecoveryTests(BrokerTest):
+class RecoveryTests(HaBrokerTest):
     """Tests for recovery after a failure."""
 
     def test_queue_hold(self):
         """Verify that the broker holds queues without sufficient backup,
         i.e. does not complete messages sent to those queues."""
 
-        # We don't want backups to time out for this test, set long timeout.
-        cluster = HaCluster(self, 4, args=["--ha-backup-timeout=100000"]);
-        # Wait for the primary to be ready
-        cluster[0].wait_status("active")
-        # Create a queue before the failure.
-        s1 = cluster.connect(0).session().sender("q1;{create:always}")
-        for b in cluster: b.wait_backup("q1")
-        for i in xrange(100): s1.send(str(i))
-        # Kill primary and 2 backups
-        for i in [0,1,2]: cluster.kill(i, False)
-        cluster[3].promote()    # New primary, backups will be 1 and 2
-        cluster[3].wait_status("recovering")
-
-        def assertSyncTimeout(s):
-            try:
-                s.sync(timeout=.01)
-                self.fail("Expected Timeout exception")
-            except Timeout: pass
-
-        # Create a queue after the failure
-        s2 = cluster.connect(3).session().sender("q2;{create:always}")
-
-        # Verify that messages sent are not completed
-        for i in xrange(100,200): s1.send(str(i), sync=False); s2.send(str(i), sync=False)
-        assertSyncTimeout(s1)
-        self.assertEqual(s1.unsettled(), 100)
-        assertSyncTimeout(s2)
-        self.assertEqual(s2.unsettled(), 100)
-
-        # Verify we can receive even if sending is on hold:
-        cluster[3].assert_browse("q1", [str(i) for i in range(100)+range(100,200)])
-
-        # Restart backups, verify queues are released only when both backups are up
-        cluster.restart(1)
-        assertSyncTimeout(s1)
-        self.assertEqual(s1.unsettled(), 100)
-        assertSyncTimeout(s2)
-        self.assertEqual(s2.unsettled(), 100)
-        self.assertEqual(cluster[3].ha_status(), "recovering")
-        cluster.restart(2)
-
-        # Verify everything is up to date and active
-        def settled(sender): sender.sync(); return sender.unsettled() == 0;
-        assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
-        assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
-        cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
-        cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
-        cluster[3].wait_status("active"),
-        s1.session.connection.close()
-        s2.session.connection.close()
+        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
+        try:
+            # We don't want backups to time out for this test, set long timeout.
+            cluster = HaCluster(self, 4, args=["--ha-backup-timeout=120"]);
+            # Wait for the primary to be ready
+            cluster[0].wait_status("active")
+            for b in cluster[1:4]: b.wait_status("ready")
+            # Create a queue before the failure.
+            s1 = cluster.connect(0).session().sender("q1;{create:always}")
+            for b in cluster: b.wait_backup("q1")
+            for i in xrange(100): s1.send(str(i))
+
+            # Kill primary and 2 backups
+            cluster[3].wait_status("ready")
+            for i in [0,1,2]: cluster.kill(i, False)
+            cluster[3].promote()    # New primary, backups will be 1 and 2
+            cluster[3].wait_status("recovering")
+
+            def assertSyncTimeout(s):
+                try:
+                    s.sync(timeout=.01)
+                    self.fail("Expected Timeout exception")
+                except Timeout: pass
+
+            # Create a queue after the failure
+            s2 = cluster.connect(3).session().sender("q2;{create:always}")
+
+            # Verify that messages sent are not completed
+            for i in xrange(100,200):
+                s1.send(str(i), sync=False);
+                s2.send(str(i), sync=False)
+            assertSyncTimeout(s1)
+            self.assertEqual(s1.unsettled(), 100)
+            assertSyncTimeout(s2)
+            self.assertEqual(s2.unsettled(), 100)
+
+            # Verify we can receive even if sending is on hold:
+            cluster[3].assert_browse("q1", [str(i) for i in range(200)])
+
+            # Restart backups, verify queues are released only when both backups are up
+            cluster.restart(1)
+            assertSyncTimeout(s1)
+            self.assertEqual(s1.unsettled(), 100)
+            assertSyncTimeout(s2)
+            self.assertEqual(s2.unsettled(), 100)
+            cluster.restart(2)
+
+            # Verify everything is up to date and active
+            def settled(sender): sender.sync(timeout=1); return sender.unsettled() == 0;
+            assert retry(lambda: settled(s1)), "Unsetttled=%s"%(s1.unsettled())
+            assert retry(lambda: settled(s2)), "Unsetttled=%s"%(s2.unsettled())
+            cluster[1].assert_browse_backup("q1", [str(i) for i in range(100)+range(100,200)])
+            cluster[1].assert_browse_backup("q2", [str(i) for i in range(100,200)])
+            cluster[3].wait_status("active"),
+            s1.session.connection.close()
+            s2.session.connection.close()
+        finally: l.restore()
 
     def test_expected_backup_timeout(self):
         """Verify that we time-out expected backups and release held queues
@@ -972,6 +1161,52 @@ class RecoveryTests(BrokerTest):
         s.sync(timeout=1)      # And released after the timeout.
         self.assertEqual(cluster[2].ha_status(), "active")
 
+    def test_join_ready_cluster(self):
+        """If we join a cluster where the primary is dead, the new primary is
+        not yet promoted and there are ready backups then we should refuse
+        promotion so that one of the ready backups can be chosen."""
+        # FIXME aconway 2012-10-05: smaller timeout
+        cluster = HaCluster(self, 2, args=["--link-heartbeat-interval", 1])
+        cluster[0].wait_status("active")
+        cluster[1].wait_status("ready")
+        cluster.bounce(0, promote_next=False)
+        self.assertRaises(Exception, cluster[0].promote)
+        os.kill(cluster[1].pid, signal.SIGSTOP) # Test for timeout if unresponsive.
+        cluster.bounce(0, promote_next=False)
+        cluster[0].promote()
+
+
+class ConfigurationTests(HaBrokerTest):
+    """Tests for configuration settings."""
+
+    def test_client_broker_url(self):
+        """Check that setting of broker and public URLs obeys correct defaulting
+        and precedence"""
+
+        def check(broker, brokers, public):
+            qmf = broker.qmf()
+            self.assertEqual(brokers, qmf.brokersUrl)
+            self.assertEqual(public, qmf.publicUrl)
+
+        def start(brokers, public, known=None):
+            args=[]
+            if brokers: args.append("--ha-brokers-url="+brokers)
+            if public: args.append("--ha-public-url="+public)
+            if known: args.append("--known-hosts-url="+known)
+            return HaBroker(self, args=args)
+
+        # Both set explictily, no defaulting
+        b = start("foo:123", "bar:456")
+        check(b, "amqp:tcp:foo:123", "amqp:tcp:bar:456")
+        b.set_brokers_url("foo:999")
+        check(b, "amqp:tcp:foo:999", "amqp:tcp:bar:456")
+        b.set_public_url("bar:999")
+        check(b, "amqp:tcp:foo:999", "amqp:tcp:bar:999")
+
+        # Allow "none" to mean "not set"
+        b = start("none", "none")
+        check(b, "", "")
+
 if __name__ == "__main__":
     shutil.rmtree("brokertest.tmp", True)
     qpid_ha = os.getenv("QPID_HA_EXEC")
@@ -979,5 +1214,5 @@ if __name__ == "__main__":
         os.execvp("qpid-python-test",
                   ["qpid-python-test", "-m", "ha_tests"] + sys.argv[1:])
     else:
-        print "Skipping ha_tests, qpid_ha not available"
+        print "Skipping ha_tests, %s not available"%(qpid_ha)
 

Modified: qpid/branches/asyncstore/cpp/src/tests/ipv6_test
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/ipv6_test?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/ipv6_test (original)
+++ qpid/branches/asyncstore/cpp/src/tests/ipv6_test Thu Feb 28 16:14:30 2013
@@ -122,43 +122,3 @@ else
     rm rdata-in rdata-out
 fi
 
-# Cluster smoke test follows
-test -z $CLUSTER_LIB && exit 0	# Exit if cluster not supported.
-
-## Test failover in a cluster using IPv6 only
-. cpg_check.sh
-cpg_enabled || exit 0
-
-pick_port() {
-    # We need a fixed port to set --cluster-url. Use qpidd to pick a free port.
-    # Note this method is racy
-    PICK=$($QPIDD_EXEC -dp0)
-    $QPIDD_EXEC -qp $PICK
-    echo $PICK
-}
-
-ssl_cluster_broker() {		# $1 = port
-    $QPIDD_EXEC $COMMON_OPTS --load-module  $CLUSTER_LIB --cluster-name ipv6_test.$HOSTNAME.$$ --cluster-url amqp:[$TEST_HOSTNAME]:$1 --port $1
-    # Wait for broker to be ready
-    ./qpid-ping -b $TEST_HOSTNAME -qp $1 || { echo "Cannot connect to broker on $1"; exit 1; }
-    echo "Running IPv6 cluster broker on port $1"
-}
-
-PORT1=`pick_port`; ssl_cluster_broker $PORT1
-PORT2=`pick_port`; ssl_cluster_broker $PORT2
-
-# Pipe receive output to uniq to remove duplicates
-./qpid-receive --connection-options "{reconnect:true, reconnect-timeout:5}" --failover-updates -b amqp:[$TEST_HOSTNAME]:$PORT1 -a "foo;{create:always}" -f | uniq > ssl_test_receive.tmp &
-
-./qpid-send -b amqp:[$TEST_HOSTNAME]:$PORT2 --content-string=one -a "foo;{create:always}"
-
-$QPIDD_EXEC -qp $PORT1 # Kill broker 1 receiver should fail-over.
-./qpid-send -b amqp:[$TEST_HOSTNAME]:$PORT2 --content-string=two -a "foo;{create:always}" --send-eos 1
-wait				# Wait for qpid-receive
-{ echo one; echo two; } > ssl_test_receive.cmp
-diff  ssl_test_receive.tmp ssl_test_receive.cmp || { echo "Failover failed"; exit 1; }
-
-$QPIDD_EXEC -qp $PORT2
-
-rm -f ssl_test_receive.*
-

Modified: qpid/branches/asyncstore/cpp/src/tests/qpid-cluster-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/qpid-cluster-benchmark?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/qpid-cluster-benchmark (original)
+++ qpid/branches/asyncstore/cpp/src/tests/qpid-cluster-benchmark Thu Feb 28 16:14:30 2013
@@ -55,12 +55,10 @@ done
 shift $(($OPTIND-1))
 
 CONNECTION_OPTIONS="--connection-options {tcp-nodelay:$TCP_NODELAY,reconnect:$RECONNECT,heartbeat:$HEARTBEAT}"
-CREATE_OPTIONS="node:{x-declare:{arguments:{'qpid.replicate':all}}}"
 
 BROKER=$(echo $BROKERS | sed s/,.*//)
 run_test() { echo $*; shift; "$@"; echo; echo; echo; }
 
 OPTS="$OPTS $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $CONNECTION_OPTIONS $NO_DELETE"
-OPTS="$OPTS --create-option $CREATE_OPTIONS"
 
 run_test "Benchmark:" qpid-cpp-benchmark $OPTS "$@"

Modified: qpid/branches/asyncstore/cpp/src/tests/qpid-cpp-benchmark
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/qpid-cpp-benchmark?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/qpid-cpp-benchmark (original)
+++ qpid/branches/asyncstore/cpp/src/tests/qpid-cpp-benchmark Thu Feb 28 16:14:30 2013
@@ -18,7 +18,7 @@
 # under the License.
 #
 
-import optparse, time, qpid.messaging, re
+import optparse, time, qpid.messaging, re, os
 from threading import Thread
 from subprocess import Popen, PIPE, STDOUT
 
@@ -77,6 +77,10 @@ op.add_option("--no-delete", default=Fal
               help="Don't delete the test queues.")
 op.add_option("--fill-drain", default=False, action="store_true",
               help="First fill the queues, then drain them")
+op.add_option("--qpid-send-path", default="", type="str", metavar="PATH",
+              help="path to qpid-send binary")
+op.add_option("--qpid-receive-path", default="", type="str", metavar="PATH",
+              help="path to qpid-receive binary")
 
 single_quote_re = re.compile("'")
 def posix_quote(string):
@@ -115,7 +119,7 @@ def start_receive(queue, index, opts, re
     messages = msg_total/opts.receivers;
     if (index < msg_total%opts.receivers): messages += 1
     if (messages == 0): return None
-    command = ["qpid-receive",
+    command = [os.path.join(opts.qpid_receive_path, "qpid-receive"),
                "-b", broker,
                "-a", address,
                "-m", str(messages),
@@ -138,7 +142,7 @@ def start_receive(queue, index, opts, re
 
 def start_send(queue, opts, broker, host):
     address="%s;{%s}"%(queue,",".join(opts.send_option + ["create:always"]))
-    command = ["qpid-send",
+    command = [os.path.join(opts.qpid_send_path, "qpid-send"),
                "-b", broker,
                "-a", address,
                "--messages", str(opts.messages),

Modified: qpid/branches/asyncstore/cpp/src/tests/qpid-receive.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/qpid-receive.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/qpid-receive.cpp Thu Feb 28 16:14:30 2013
@@ -222,7 +222,7 @@ int main(int argc, char ** argv)
                             if (msg.getCorrelationId().size()) std::cout << "CorrelationId: " << msg.getCorrelationId() << std::endl;
                             if (msg.getUserId().size()) std::cout << "UserId: " << msg.getUserId() << std::endl;
                             if (msg.getTtl().getMilliseconds()) std::cout << "TTL: " << msg.getTtl().getMilliseconds() << std::endl;
-                            if (msg.getPriority()) std::cout << "Priority: " << msg.getPriority() << std::endl;
+                            if (msg.getPriority()) std::cout << "Priority: " << ((uint) msg.getPriority()) << std::endl;
                             if (msg.getDurable()) std::cout << "Durable: true" << std::endl;
                             if (msg.getRedelivered()) std::cout << "Redelivered: true" << std::endl;
                             std::cout << "Properties: " << msg.getProperties() << std::endl;

Modified: qpid/branches/asyncstore/cpp/src/tests/qpid-send.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/qpid-send.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/qpid-send.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/qpid-send.cpp Thu Feb 28 16:14:30 2013
@@ -200,7 +200,7 @@ struct Options : public qpid::Options
         std::string name;
         std::string value;
         if (nameval(property, name, value)) {
-            message.getProperties()[name] = value;
+            message.getProperties()[name].parse(value);
         } else {
             message.getProperties()[name] = Variant();
         }

Modified: qpid/branches/asyncstore/cpp/src/tests/run_acl_tests
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/run_acl_tests?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/run_acl_tests (original)
+++ qpid/branches/asyncstore/cpp/src/tests/run_acl_tests Thu Feb 28 16:14:30 2013
@@ -24,22 +24,26 @@ source ./test_env.sh
 DATA_DIR=`pwd`/data_dir
 DATA_DIRI=`pwd`/data_diri
 DATA_DIRU=`pwd`/data_diru
+DATA_DIRQ=`pwd`/data_dirq
 
 trap stop_brokers INT TERM QUIT
 
 start_brokers() {
-    ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIR --load-module $ACL_LIB --acl-file policy.acl --auth no --log-to-file local.log > qpidd.port
+    ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIR --load-module $ACL_LIB --acl-file policy.acl --auth no                               --log-to-file local.log > qpidd.port
     LOCAL_PORT=`cat qpidd.port`
-    ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRI --load-module $ACL_LIB --acl-file policy.acl --auth no --max-connections-per-ip 2 --log-to-file locali.log > qpiddi.port
+    ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRI --load-module $ACL_LIB --acl-file policy.acl --auth no --connection-limit-per-ip 2   --log-to-file locali.log > qpiddi.port
     LOCAL_PORTI=`cat qpiddi.port`
-    ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRU --load-module $ACL_LIB --acl-file policy.acl --auth no --max-connections-per-user 2 --log-to-file localu.log > qpiddu.port
+    ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRU --load-module $ACL_LIB --acl-file policy.acl --auth no --connection-limit-per-user 2 --log-to-file localu.log > qpiddu.port
     LOCAL_PORTU=`cat qpiddu.port`
+    ../qpidd --daemon --port 0 --no-module-dir --data-dir $DATA_DIRQ --load-module $ACL_LIB --acl-file policy.acl --auth no --max-queues-per-user 2      --log-to-file localq.log > qpiddq.port
+    LOCAL_PORTQ=`cat qpiddq.port`
 }
 
 stop_brokers() {
         $QPIDD_EXEC --no-module-dir -q --port $LOCAL_PORT
         $QPIDD_EXEC --no-module-dir -q --port $LOCAL_PORTI
         $QPIDD_EXEC --no-module-dir -q --port $LOCAL_PORTU
+        $QPIDD_EXEC --no-module-dir -q --port $LOCAL_PORTQ
 }
 
 test_loading_acl_from_absolute_path(){
@@ -59,20 +63,24 @@ if test -d ${PYTHON_DIR} ;  then
     rm -rf $DATA_DIR
     rm -rf $DATA_DIRI
     rm -rf $DATA_DIRU
+    rm -rf $DATA_DIRQ
     mkdir -p $DATA_DIR
     mkdir -p $DATA_DIRI
     mkdir -p $DATA_DIRU
+    mkdir -p $DATA_DIRQ
     cp $srcdir/policy.acl $DATA_DIR
     cp $srcdir/policy.acl $DATA_DIRI
     cp $srcdir/policy.acl $DATA_DIRU
+    cp $srcdir/policy.acl $DATA_DIRQ
     start_brokers
-    echo "Running acl tests using brokers on ports $LOCAL_PORT, $LOCAL_PORTI, and $LOCAL_PORTU"
-    $QPID_PYTHON_TEST -b localhost:$LOCAL_PORT -m acl -Dport-i=$LOCAL_PORTI -Dport-u=$LOCAL_PORTU || EXITCODE=1
+    echo "Running acl tests using brokers on ports $LOCAL_PORT, $LOCAL_PORTI, $LOCAL_PORTU, and $LOCAL_PORTQ"
+    $QPID_PYTHON_TEST -b localhost:$LOCAL_PORT -m acl -Dport-i=$LOCAL_PORTI -Dport-u=$LOCAL_PORTU -Dport-q=$LOCAL_PORTQ || EXITCODE=1
     stop_brokers || EXITCODE=1
     test_loading_acl_from_absolute_path || EXITCODE=1
     rm -rf $DATA_DIR
     rm -rf $DATA_DIRI
     rm -rf $DATA_DIRU
+    rm -rf $DATA_DIRQ
     exit $EXITCODE
 fi
 

Modified: qpid/branches/asyncstore/cpp/src/tests/run_federation_sys_tests
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/run_federation_sys_tests?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/run_federation_sys_tests (original)
+++ qpid/branches/asyncstore/cpp/src/tests/run_federation_sys_tests Thu Feb 28 16:14:30 2013
@@ -25,10 +25,6 @@ source ./test_env.sh
 
 MODULENAME=federation_sys
 
-# Test for clustering
-source cpg_check.sh
-if cpg_enabled; then CLUSTERING_ENABLED=1; fi
-
 # Test for long test
 if [[ "$1" == "LONG_TEST" ]]; then
     USE_LONG_TEST=1
@@ -42,11 +38,7 @@ if [ -z ${USE_LONG_TEST} ]; then
     SKIPTESTS="-i federation_sys.A_Long* -i federation_sys.B_Long* ${SKIPTESTS}"
 fi
 echo "WARNING: Tests using persistence will be ignored."
-if [ -z ${CLUSTERING_ENABLED} ]; then
-    SKIPTESTS="${SKIPTESTS} -i federation_sys.C_* -i federation_sys.D_*"
-elif [ -z ${USE_LONG_TEST} ]; then
-    SKIPTESTS="${SKIPTESTS} -i federation_sys.C_Long* -i federation_sys.D_Long*"
-fi
+SKIPTESTS="${SKIPTESTS} -i federation_sys.C_* -i federation_sys.D_*"
 
 start_brokers() {
     start_broker() {
@@ -56,35 +48,21 @@ start_brokers() {
     }
     start_broker "" LOCAL_PORT
     start_broker "" REMOTE_PORT
-    if [ -n "${CLUSTERING_ENABLED}" ]; then
-        start_broker "--load-module ${CLUSTER_LIB} --cluster-name test-cluster-1" CLUSTER_C1_1
-        start_broker "--load-module ${CLUSTER_LIB} --cluster-name test-cluster-1" CLUSTER_C1_2
-        start_broker "--load-module ${CLUSTER_LIB} --cluster-name test-cluster-2" CLUSTER_C2_1
-        start_broker "--load-module ${CLUSTER_LIB} --cluster-name test-cluster-2" CLUSTER_C2_2
-    fi
     rm qpidd.port
 }
 
 stop_brokers() {
     ${QPIDD_EXEC} -q --port ${LOCAL_PORT}
     ${QPIDD_EXEC} -q --port ${REMOTE_PORT}
-    if [ -n "${CLUSTERING_ENABLED}" ]; then
-        ${QPID_CLUSTER_EXEC} --all-stop --force localhost:${CLUSTER_C1_1}
-        ${QPID_CLUSTER_EXEC} --all-stop --force localhost:${CLUSTER_C2_1}
-    fi
 }
 
 if test -d ${PYTHON_DIR} ;  then
     start_brokers
-    if [ -z ${CLUSTERING_ENABLED} ]; then
-        echo "Running federation tests using brokers on local port ${LOCAL_PORT}, remote port ${REMOTE_PORT} (NOTE: clustering is DISABLED)"
-    else
-        echo "Running federation tests using brokers on local port ${LOCAL_PORT}, remote port ${REMOTE_PORT}, local cluster nodes ${CLUSTER_C1_1} ${CLUSTER_C1_2}, remote cluster nodes ${CLUSTER_C2_1} ${CLUSTER_C2_2}"
-    fi
+    echo "Running federation tests using brokers on local port ${LOCAL_PORT}, remote port ${REMOTE_PORT} (NOTE: clustering is DISABLED)"
     if [ -z ${USE_LONG_TEST} ]; then
         echo "NOTE: To run a full set of federation system tests, use \"make check-long\". To test with persistence, run the store version of this script."
     fi
-    ${QPID_PYTHON_TEST} -m ${MODULENAME} ${SKIPTESTS} -b localhost:${REMOTE_PORT} -Dlocal-port=${LOCAL_PORT} -Dremote-port=${REMOTE_PORT} -Dlocal-cluster-ports="${CLUSTER_C1_1} ${CLUSTER_C1_2}" -Dremote-cluster-ports="${CLUSTER_C2_1} ${CLUSTER_C2_2}" $@
+    ${QPID_PYTHON_TEST} -m ${MODULENAME} ${SKIPTESTS} -b localhost:${REMOTE_PORT} -Dlocal-port=${LOCAL_PORT} -Dremote-port=${REMOTE_PORT} $@
     RETCODE=$?
     stop_brokers
     if test x${RETCODE} != x0; then

Modified: qpid/branches/asyncstore/cpp/src/tests/sasl_fed_ex
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/sasl_fed_ex?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/sasl_fed_ex (original)
+++ qpid/branches/asyncstore/cpp/src/tests/sasl_fed_ex Thu Feb 28 16:14:30 2013
@@ -34,17 +34,11 @@ then
   echo
   # These are the four different ways of creating links ( or routes+links ) 
   # that the qpid-route command provides.
-  echo "Usage: ${script_name} dynamic|link|queue|route [cluster]"
+  echo "Usage: ${script_name} dynamic|link|queue|route"
   echo
   exit 1
 fi
 
-# Has the user told us to do clustering ? -----------
-clustering_flag=
-if [ $# -eq "2" ] && [ "$2" == "cluster" ]; then
-  clustering_flag=true
-fi
-
 qpid_route_method=$1
 
 # Debugging print. --------------------------
@@ -128,15 +122,7 @@ DST_TCP_PORT=5807
 SRC_TCP_PORT_2=5802
 DST_TCP_PORT_2=5803
 
-CLUSTER_NAME_SUFFIX=`hostname | tr '.' ' ' | awk '{print $1}'`
-CLUSTER_1_NAME=sasl_fed_ex_cluster_1_${CLUSTER_NAME_SUFFIX}
-CLUSTER_2_NAME=sasl_fed_ex_cluster_2_${CLUSTER_NAME_SUFFIX}
-
-print "CLUSTER_1_NAME == ${CLUSTER_1_NAME}"
-print "CLUSTER_2_NAME == ${CLUSTER_2_NAME}"
-
 SSL_LIB=${moduledir}/ssl.so
-CLUSTER_LIB=${moduledir}/cluster.so
 
 export QPID_SSL_CERT_NAME=${TEST_HOSTNAME}
 
@@ -183,80 +169,26 @@ COMMON_BROKER_OPTIONS="                 
                       
 
 function start_brokers {
-  if [ $1 ]; then
-    # clustered ----------------------------------------
-    print "Starting SRC cluster"
-
-    print "  src broker 1"
-    $QPIDD_EXEC                                  \
-      --port=${SRC_TCP_PORT}                     \
-      --ssl-port ${SRC_SSL_PORT}                 \
-      ${COMMON_BROKER_OPTIONS}                   \
-      --load-module ${CLUSTER_LIB}               \
-      --cluster-name ${CLUSTER_1_NAME}           \
-      --log-to-file $tmp_root/qpidd_src.log 2> /dev/null
-
-    broker_ports[0]=${SRC_TCP_PORT}
-
-    print "  src broker 2"
-    $QPIDD_EXEC                                  \
-      --port=${SRC_TCP_PORT_2}                   \
-      --ssl-port ${SRC_SSL_PORT_2}               \
-      ${COMMON_BROKER_OPTIONS}                   \
-      --load-module ${CLUSTER_LIB}               \
-      --cluster-name ${CLUSTER_1_NAME}           \
-      --log-to-file $tmp_root/qpidd_src_2.log 2> /dev/null
-
-    broker_ports[1]=${SRC_TCP_PORT_2}
-
-
-    print "Starting DST cluster"
-
-    print "  dst broker 1"
-    $QPIDD_EXEC                                  \
-      --port=${DST_TCP_PORT}                     \
-      --ssl-port ${DST_SSL_PORT}                 \
-      ${COMMON_BROKER_OPTIONS}                   \
-      --load-module ${CLUSTER_LIB}               \
-      --cluster-name ${CLUSTER_2_NAME}           \
-      --log-to-file $tmp_root/qpidd_dst.log 2> /dev/null
-
-    broker_ports[2]=${DST_TCP_PORT}
-
-    print "  dst broker 2"
-    $QPIDD_EXEC                                  \
-      --port=${DST_TCP_PORT_2}                   \
-      --ssl-port ${DST_SSL_PORT_2}               \
-      ${COMMON_BROKER_OPTIONS}                   \
-      --load-module ${CLUSTER_LIB}               \
-      --cluster-name ${CLUSTER_2_NAME}           \
-      --log-to-file $tmp_root/qpidd_dst_2.log 2> /dev/null
-
-    broker_ports[3]=${DST_TCP_PORT_2}
-
-  else
     # vanilla brokers --------------------------------
     print "Starting SRC broker"
     $QPIDD_EXEC                                  \
-      --port=${SRC_TCP_PORT}                     \
-      --ssl-port ${SRC_SSL_PORT}                 \
-      ${COMMON_BROKER_OPTIONS}                   \
-      --log-to-file $tmp_root/qpidd_src.log 2> /dev/null
+	--port=${SRC_TCP_PORT}                     \
+	--ssl-port ${SRC_SSL_PORT}                 \
+	${COMMON_BROKER_OPTIONS}                   \
+	--log-to-file $tmp_root/qpidd_src.log 2> /dev/null
 
     broker_ports[0]=${SRC_TCP_PORT}
 
     print "Starting DST broker"
     $QPIDD_EXEC                                  \
-      --port=${DST_TCP_PORT}                     \
-      --ssl-port ${DST_SSL_PORT}                 \
-      ${COMMON_BROKER_OPTIONS}                   \
-      --log-to-file $tmp_root/qpidd_dst.log 2> /dev/null
+	--port=${DST_TCP_PORT}                     \
+	--ssl-port ${DST_SSL_PORT}                 \
+	${COMMON_BROKER_OPTIONS}                   \
+	--log-to-file $tmp_root/qpidd_dst.log 2> /dev/null
 
     broker_ports[1]=${DST_TCP_PORT}
-  fi
 }
 
-
 function halt_brokers {
   n_brokers=${#broker_ports[@]}
   print "Halting ${n_brokers} brokers."
@@ -270,7 +202,7 @@ function halt_brokers {
 }
 
 
-start_brokers $clustering_flag
+start_brokers
 
 
 # I am not randomizing these names, because this test creates its own brokers.
@@ -329,9 +261,7 @@ fi
 # to avoid false negatives.
 sleep 5
 
-# This should work the same whether or not we are running a clustered test.
-# In the case of clustered tests, the status is not printed by qpid_route.
-# So in either case, I will look only at the transport field, which should be "ssl".
+# Look only at the transport field, which should be "ssl".
 print "check the link"
 link_status=$($QPID_ROUTE_EXEC link list localhost:${DST_TCP_PORT} | tail -1 | awk '{print $3}')
 

Modified: qpid/branches/asyncstore/cpp/src/tests/ssl_test
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/ssl_test?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/ssl_test (original)
+++ qpid/branches/asyncstore/cpp/src/tests/ssl_test Thu Feb 28 16:14:30 2013
@@ -193,29 +193,3 @@ echo "Running SSL/TCP mux test on random
 ./qpid-perftest --count ${COUNT} --port ${PORT} -P tcp -b $TEST_HOSTNAME --summary || error "TCP connection failed!"
 
 stop_brokers
-
-test -z $CLUSTER_LIB && exit 0	# Exit if cluster not supported.
-
-## Test failover in a cluster using SSL only
-source cpg_check.sh
-cpg_enabled || exit 0
-
-PORT1=`pick_port`; ssl_cluster_broker $PORT1
-echo "Running SSL cluster broker on port $PORT1"
-
-PORT2=`pick_port`; ssl_cluster_broker $PORT2
-echo "Running SSL cluster broker on port $PORT2"
-
-# Pipe receive output to uniq to remove duplicates
-./qpid-receive --connection-options "{reconnect:true, reconnect-timeout:5}" --failover-updates -b amqp:ssl:$TEST_HOSTNAME:$PORT1 -a "foo;{create:always}" -f | uniq > ssl_test_receive.tmp &
-./qpid-send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=one -a "foo;{create:always}"
-
-stop_broker 0 # Kill broker 1 - receiver should fail-over.
-echo "Killed SSL cluster broker on port $PORT1"
-
-./qpid-send -b amqp:ssl:$TEST_HOSTNAME:$PORT2 --content-string=two -a "foo;{create:always}" --send-eos 1
-wait				# Wait for qpid-receive
-{ echo one; echo two; } > ssl_test_receive.cmp
-diff  ssl_test_receive.tmp ssl_test_receive.cmp || { echo "Failover failed"; exit 1; }
-rm -f ssl_test_receive.*
-

Modified: qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/storePerftools/asyncPerf/PerfTest.cpp Thu Feb 28 16:14:30 2013
@@ -157,7 +157,7 @@ PerfTest::destroyQueues() {
 int
 runPerfTest(int argc, char** argv) {
     // Load async store module
-    qpid::tryShlib ("asyncStore.so", false);
+    qpid::tryShlib ("asyncStore.so");
 
     qpid::CommonOptions co;
     qpid::asyncStore::AsyncStoreOptions aso;

Modified: qpid/branches/asyncstore/cpp/src/tests/test_env.ps1.in
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/test_env.ps1.in?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/test_env.ps1.in (original)
+++ qpid/branches/asyncstore/cpp/src/tests/test_env.ps1.in Thu Feb 28 16:14:30 2013
@@ -62,8 +62,6 @@ $env:TEST_STORE_LIB="$testmoduledir\test
 #exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; }
 #exportmodule ACL_LIB acl.so
 #exportmodule CLUSTER_LIB cluster.so
-#exportmodule REPLICATING_LISTENER_LIB replicating_listener.so
-#exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so
 #exportmodule SSLCONNECTOR_LIB sslconnector.so
 #exportmodule SSL_LIB ssl.so
 #exportmodule WATCHDOG_LIB watchdog.so

Modified: qpid/branches/asyncstore/cpp/src/tests/test_env.sh.in
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/test_env.sh.in?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/test_env.sh.in (original)
+++ qpid/branches/asyncstore/cpp/src/tests/test_env.sh.in Thu Feb 28 16:14:30 2013
@@ -43,7 +43,6 @@ export PYTHON_COMMANDS=$QPID_TOOLS/src/p
 export PYTHONPATH=$srcdir:$PYTHON_DIR:$PYTHON_COMMANDS:$QPID_TESTS_PY:$QMF_LIB:$PYTHONPATH
 export QPID_CONFIG_EXEC=$PYTHON_COMMANDS/qpid-config
 export QPID_ROUTE_EXEC=$PYTHON_COMMANDS/qpid-route
-export QPID_CLUSTER_EXEC=$PYTHON_COMMANDS/qpid-cluster
 export QPID_HA_EXEC=$PYTHON_COMMANDS/qpid-ha
 
 # Executables
@@ -63,10 +62,7 @@ export TEST_STORE_LIB=$testmoduledir/tes
 
 exportmodule() { test -f $moduledir/$2 && eval "export $1=$moduledir/$2"; }
 exportmodule ACL_LIB acl.so
-exportmodule CLUSTER_LIB cluster.so
 exportmodule HA_LIB ha.so
-exportmodule REPLICATING_LISTENER_LIB replicating_listener.so
-exportmodule REPLICATION_EXCHANGE_LIB replication_exchange.so
 exportmodule SSLCONNECTOR_LIB sslconnector.so
 exportmodule SSL_LIB ssl.so
 exportmodule WATCHDOG_LIB watchdog.so

Modified: qpid/branches/asyncstore/cpp/src/tests/test_store.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/test_store.cpp?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/test_store.cpp (original)
+++ qpid/branches/asyncstore/cpp/src/tests/test_store.cpp Thu Feb 28 16:14:30 2013
@@ -37,17 +37,13 @@
 //#include "qpid/broker/amqp_0_10/MessageTransfer.h"
 //#include "qpid/framing/AMQFrame.h"
 //#include "qpid/log/Statement.h"
+//#include "qpid/sys/Thread.h"
 //#include "qpid/Plugin.h"
 //#include "qpid/Options.h"
 //#include <boost/cast.hpp>
 //#include <boost/lexical_cast.hpp>
 //#include <memory>
 //#include <fstream>
-//
-//using namespace qpid;
-//using namespace broker;
-//using namespace std;
-//using namespace qpid::sys;
 
 namespace qpid {
 namespace tests {

Modified: qpid/branches/asyncstore/cpp/src/tests/testagent.mk
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/tests/testagent.mk?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/tests/testagent.mk (original)
+++ qpid/branches/asyncstore/cpp/src/tests/testagent.mk Thu Feb 28 16:14:30 2013
@@ -46,6 +46,6 @@ testagent-testagent.$(OBJEXT): $(TESTAGE
 qpidexectest_PROGRAMS+=testagent
 testagent_CXXFLAGS=$(CXXFLAGS) -Itestagent_gen
 testagent_SOURCES=testagent.cpp $(TESTAGENT_GEN_SRC)
-testagent_LDADD=$(top_builddir)/src/libqmf.la
+testagent_LDADD=$(top_builddir)/src/libqmf.la $(top_builddir)/src/libqpidcommon.la $(top_builddir)/src/libqpidtypes.la $(top_builddir)/src/libqpidclient.la
 
 EXTRA_DIST+=testagent.xml

Modified: qpid/branches/asyncstore/cpp/src/versions.cmake
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/cpp/src/versions.cmake?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/cpp/src/versions.cmake (original)
+++ qpid/branches/asyncstore/cpp/src/versions.cmake Thu Feb 28 16:14:30 2013
@@ -36,4 +36,5 @@ set (qpidtypes_version 1.0.0)
 set (rdmawrap_version 2.0.0)
 set (sslcommon_version 2.0.0)
 set (asyncStore_version 1.0.0)
+#set (legacystore_version 1.0.0)
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org