You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/03/01 00:38:01 UTC

svn commit: r1295339 - in /qpid/trunk/qpid: cpp/src/qpid/broker/ cpp/src/qpid/ha/ cpp/src/tests/ tools/src/py/

Author: aconway
Date: Wed Feb 29 23:38:00 2012
New Revision: 1295339

URL: http://svn.apache.org/viewvc?rev=1295339&view=rev
Log:
QPID-3603: HA support for stand-alone replication.

- New management method HaBroker.replicate to enable replication.
- qpid-ha tool can enable replication of queues.
- qpid-config tool can create queues with replication enabled.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h
    qpid/trunk/qpid/cpp/src/qpid/ha/management-schema.xml
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py
    qpid/trunk/qpid/tools/src/py/qpid-config
    qpid/trunk/qpid/tools/src/py/qpid-ha

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp?rev=1295339&r1=1295338&r2=1295339&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Link.cpp Wed Feb 29 23:38:00 2012
@@ -206,11 +206,9 @@ void Link::closed(int, std::string text)
     QPID_LOG (info, "Inter-broker link disconnected from " << host << ":" << port << " " << text);
 
     connection = 0;
-
     if (state == STATE_OPERATIONAL) {
         stringstream addr;
         addr << host << ":" << port;
-        QPID_LOG(warning, "Inter-broker link disconnected from " << addr.str());
         if (!hideManagement() && agent)
             agent->raiseEvent(_qmf::EventBrokerLinkDown(addr.str()));
     }
@@ -405,7 +403,6 @@ uint Link::nextChannel()
 void Link::notifyConnectionForced(const string text)
 {
     Mutex::ScopedLock mutex(lock);
-
     setStateLH(STATE_FAILED);
     if (!hideManagement())
         mgmtObject->set_lastError(text);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1295339&r1=1295338&r2=1295339&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Wed Feb 29 23:38:00 2012
@@ -25,8 +25,11 @@
 #include "ReplicatingSubscription.h"
 #include "qpid/Exception.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/Link.h"
+#include "qpid/broker/Queue.h"
 #include "qpid/management/ManagementAgent.h"
 #include "qmf/org/apache/qpid/ha/Package.h"
+#include "qmf/org/apache/qpid/ha/ArgsHaBrokerReplicate.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetBrokers.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetPublicBrokers.h"
 #include "qmf/org/apache/qpid/ha/ArgsHaBrokerSetExpectedBackups.h"
@@ -50,7 +53,6 @@ const std::string BACKUP="backup";
 HaBroker::HaBroker(broker::Broker& b, const Settings& s)
     : broker(b),
       settings(s),
-      backup(new Backup(b, s)),
       mgmtObject(0)
 {
     // Register a factory for replicating subscriptions.
@@ -72,6 +74,9 @@ HaBroker::HaBroker(broker::Broker& b, co
     sys::Mutex::ScopedLock l(lock);
     if (!settings.clientUrl.empty()) setClientUrl(Url(settings.clientUrl), l);
     if (!settings.brokerUrl.empty()) setBrokerUrl(Url(settings.brokerUrl), l);
+
+    // If we are in a cluster, we start in backup mode.
+    if (settings.cluster) backup.reset(new Backup(b, s));
 }
 
 HaBroker::~HaBroker() {}
@@ -81,8 +86,8 @@ Manageable::status_t HaBroker::Managemen
     switch (methodId) {
       case _qmf::HaBroker::METHOD_PROMOTE: {
           if (backup.get()) {   // I am a backup
-              // FIXME aconway 2012-01-26: create primary state before resetting backup
-              // as that allows client connections.
+              // NOTE: resetting backup allows client connections, so any
+              // primary state should be set up here before backup.reset()
               backup.reset();
               QPID_LOG(notice, "HA: Primary promoted from backup");
               mgmtObject->set_status(PRIMARY);
@@ -100,7 +105,27 @@ Manageable::status_t HaBroker::Managemen
       case _qmf::HaBroker::METHOD_SETEXPECTEDBACKUPS: {
           setExpectedBackups(dynamic_cast<_qmf::ArgsHaBrokerSetExpectedBackups&>(args).i_expectedBackups, l);
         break;
-    }
+      }
+      case _qmf::HaBroker::METHOD_REPLICATE: {
+          _qmf::ArgsHaBrokerReplicate& bq_args =
+              dynamic_cast<_qmf::ArgsHaBrokerReplicate&>(args);
+          QPID_LOG(debug, "HA replicating individual queue "<< bq_args.i_queue << " from " << bq_args.i_broker);
+
+          boost::shared_ptr<broker::Queue> queue = broker.getQueues().get(bq_args.i_queue);
+          Url url(bq_args.i_broker);
+          string protocol = url[0].protocol.empty() ? "tcp" : url[0].protocol;
+          std::pair<broker::Link::shared_ptr, bool> result = broker.getLinks().declare(
+              url[0].host, url[0].port, protocol,
+              false,              // durable
+              settings.mechanism, settings.username, settings.password);
+          boost::shared_ptr<broker::Link> link = result.first;
+          link->setUrl(url);
+          // Create a queue replicator
+          boost::shared_ptr<QueueReplicator> qr(new QueueReplicator(queue, link));
+          broker.getExchanges().registerExchange(qr);
+          qr->activate();
+          break;
+      }
 
       default:
         return Manageable::STATUS_UNKNOWN_METHOD;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp?rev=1295339&r1=1295338&r2=1295339&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaPlugin.cpp Wed Feb 29 23:38:00 2012
@@ -31,7 +31,7 @@ struct Options : public qpid::Options {
     Settings& settings;
     Options(Settings& s) : qpid::Options("HA Options"), settings(s) {
         addOptions()
-            ("ha-cluster", optValue(settings.enabled, "yes|no"),
+            ("ha-cluster", optValue(settings.cluster, "yes|no"),
              "Join a HA active/passive cluster.")
             ("ha-brokers", optValue(settings.brokerUrl,"URL"),
              "URL that backup brokers use to connect and fail over.")
@@ -63,11 +63,7 @@ struct HaPlugin : public Plugin {
 
     void initialize(Plugin::Target& target) {
         broker::Broker* broker = dynamic_cast<broker::Broker*>(&target);
-        if (broker && settings.enabled) {
-            QPID_LOG(notice, "HA: Enabled");
-            haBroker.reset(new ha::HaBroker(*broker, settings));
-        } else
-            QPID_LOG(notice, "HA: Disabled");
+        if (broker) haBroker.reset(new ha::HaBroker(*broker, settings));
     }
 };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h?rev=1295339&r1=1295338&r2=1295339&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Settings.h Wed Feb 29 23:38:00 2012
@@ -33,8 +33,8 @@ namespace ha {
 class Settings
 {
   public:
-    Settings() : enabled(false), expectedBackups(0) {}
-    bool enabled;
+    Settings() : cluster(false), expectedBackups(0) {}
+    bool cluster;               // True if we are a cluster member.
     std::string clientUrl;
     std::string brokerUrl;
     size_t expectedBackups;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/management-schema.xml?rev=1295339&r1=1295338&r2=1295339&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/management-schema.xml (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/management-schema.xml Wed Feb 29 23:38:00 2012
@@ -47,6 +47,11 @@
     <method name="setExpectedBackups" desc="Set number of backups expected">
       <arg name="expectedBackups" type="uint16" dir="I"/>
     </method>
+
+    <method name="replicate" desc="Replicate from a remote queue to the local broker.">
+      <arg name="broker" type="sstr" dir="I"/>
+      <arg name="queue" type="sstr" dir="I"/>
+    </method>
   </class>
 
 </schema>

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1295339&r1=1295338&r2=1295339&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Wed Feb 29 23:38:00 2012
@@ -24,34 +24,81 @@ from qpid.datatypes import uuid4
 from brokertest import *
 from threading import Thread, Lock, Condition
 from logging import getLogger, WARN, ERROR, DEBUG
-
+from qpidtoollibs.broker import BrokerAgent
 
 log = getLogger("qpid.ha-tests")
 
 class HaBroker(Broker):
-    def __init__(self, test, args=[], broker_url=None, **kwargs):
+    def __init__(self, test, args=[], broker_url=None, ha_cluster=True, **kwargs):
         assert BrokerTest.ha_lib, "Cannot locate HA plug-in"
-        args=["--load-module", BrokerTest.ha_lib,
-              # FIXME aconway 2012-02-13: workaround slow link failover.
-              "--link-maintenace-interval=0.1",
-              "--ha-cluster=yes"]
-        if broker_url: args += [ "--ha-brokers", broker_url ]
+        args = copy(args)
+        args.extend(["--load-module", BrokerTest.ha_lib,
+                     # FIXME aconway 2012-02-13: workaround slow link failover.
+                     "--link-maintenace-interval=0.1",
+                     "--ha-cluster=%s"%ha_cluster])
+        if broker_url: args.extend([ "--ha-brokers", broker_url ])
         Broker.__init__(self, test, args, **kwargs)
+        self.commands=os.getenv("PYTHON_COMMANDS")
+        assert os.path.isdir(self.commands)
 
     def promote(self):
-        assert os.system("$QPID_HA_EXEC promote -b %s"%(self.host_port())) == 0
+        assert os.system("%s/qpid-ha promote -b %s"%(self.commands, self.host_port())) == 0
 
     def set_client_url(self, url):
         assert os.system(
-            "$QPID_HA_EXEC set --public-brokers=%s -b %s"%(url,self.host_port())) == 0
+            "%s/qpid-ha set --public-brokers=%s -b %s"%(self.commands, url,self.host_port())) == 0
 
     def set_broker_url(self, url):
         assert os.system(
-            "$QPID_HA_EXEC set --brokers=%s -b %s"%(url, self.host_port())) == 0
+            "%s/qpid-ha set --brokers=%s -b %s"%(self.commands, url, self.host_port())) == 0
+
+    def replicate(self, from_broker, queue):
+        assert os.system(
+            "%s/qpid-ha replicate -b %s %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+
+    def config_replicate(self, from_broker, queue):
+        assert os.system(
+            "%s/qpid-config --broker=%s add queue --replicate-from %s %s"%(self.commands, self.host_port(), from_broker, queue)) == 0
+
+    def config_declare(self, queue, replication):
+        assert os.system(
+            "%s/qpid-config --broker=%s add queue %s --replication %s"%(self.commands, self.host_port(), queue, replication)) == 0
+
+class HaCluster(object):
+    _cluster_count = 0
 
-def set_broker_urls(brokers):
-    url = ",".join([b.host_port() for b in brokers])
-    for b in brokers: b.set_broker_url(url)
+    def __init__(self, test, n, **kwargs):
+        """Start a cluster of n brokers"""
+        self.test = test
+        self._brokers = [ HaBroker(test, name="broker%s-%s"%(HaCluster._cluster_count, i), **kwargs) for i in xrange(n)]
+        HaCluster._cluster_count += 1
+        self[0].promote()
+        self.url = ",".join([b.host_port() for b in self])
+        for b in self: b.set_broker_url(self.url)
+
+    def connect(self, i):
+        """Connect with reconnect_urls"""
+        return self[i].connect(reconnect=True, reconnect_urls=self.url.split(","))
+
+    def kill(self, i):
+        """Kill broker i, promote broker i+1"""
+        self[i].kill()
+        self[i].expect = EXPECT_EXIT_FAIL
+        self[(i+1) % len(self)].promote()
+
+    def bounce(self, i):
+        """Stop and restart a broker in a cluster."""
+        self.kill(i)
+        b = self[i]
+        self._brokers[i] = HaBroker(self.test, name=b.name, port=b.port(), broker_url=self.url)
+
+    # Behave like a list of brokers.
+    def __len__(self): return len(self._brokers)
+    def __getitem__(self,index): return self._brokers[index]
+    def __iter__(self): return self._brokers.__iter__()
+
+
+def qr_node(value="messages"): return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
 
 class ShortTests(BrokerTest):
     """Short HA functionality tests."""
@@ -92,6 +139,8 @@ class ShortTests(BrokerTest):
         """Test basic replication of configuration and messages before and
         after backup has connected"""
 
+        getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+
         def queue(name, replicate):
             return "%s;{create:always,node:{x-declare:{arguments:{'qpid.replicate':%s}}}}"%(name, replicate)
 
@@ -177,12 +226,9 @@ class ShortTests(BrokerTest):
             self.assert_browse_retry(p, "foo", msgs[i+1:])
             self.assert_browse_retry(b, "foo", msgs[i+1:])
 
-    def qpid_replicate(self, value="messages"):
-        return "node:{x-declare:{arguments:{'qpid.replicate':%s}}}" % value
-
     def test_sync(self):
         def queue(name, replicate):
-            return "%s;{create:always,%s}"%(name, self.qpid_replicate(replicate))
+            return "%s;{create:always,%s}"%(name, qr_node(replicate))
         primary = HaBroker(self, name="primary")
         primary.promote()
         p = primary.connect().session()
@@ -206,6 +252,7 @@ class ShortTests(BrokerTest):
 
     def test_send_receive(self):
         """Verify sequence numbers of messages sent by qpid-send"""
+        getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
         primary = HaBroker(self, name="primary")
         primary.promote()
         backup1 = HaBroker(self, name="backup1", broker_url=primary.host_port())
@@ -213,14 +260,14 @@ class ShortTests(BrokerTest):
         sender = self.popen(
             ["qpid-send",
              "--broker", primary.host_port(),
-             "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")),
+             "--address", "q;{create:always,%s}"%(qr_node("messages")),
              "--messages=1000",
              "--content-string=x"
              ])
         receiver = self.popen(
             ["qpid-receive",
              "--broker", primary.host_port(),
-             "--address", "q;{create:always,%s}"%(self.qpid_replicate("messages")),
+             "--address", "q;{create:always,%s}"%(qr_node("messages")),
              "--messages=990",
              "--timeout=10"
              ])
@@ -239,7 +286,7 @@ class ShortTests(BrokerTest):
 
     def test_failover_python(self):
         """Verify that backups rejects connections and that fail-over works in python client"""
-        getLogger().setLevel(ERROR) # Disable WARNING log messages due to failover messages
+        getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
         primary = HaBroker(self, name="primary", expect=EXPECT_EXIT_FAIL)
         primary.promote()
         backup = HaBroker(self, name="backup", broker_url=primary.host_port())
@@ -254,7 +301,7 @@ class ShortTests(BrokerTest):
         # Test discovery: should connect to primary after reject by backup
         c = backup.connect(reconnect_urls=[primary.host_port(), backup.host_port()], reconnect=True)
         s = c.session()
-        sender = s.sender("q;{create:always,%s}"%(self.qpid_replicate()))
+        sender = s.sender("q;{create:always,%s}"%(qr_node()))
         self.wait_backup(backup, "q")
         sender.send("foo")
         primary.kill()
@@ -269,7 +316,7 @@ class ShortTests(BrokerTest):
         primary.promote()
         backup = HaBroker(self, name="backup", broker_url=primary.host_port())
         url="%s,%s"%(primary.host_port(), backup.host_port())
-        primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate()))
+        primary.connect().session().sender("q;{create:always,%s}"%(qr_node()))
         self.wait_backup(backup, "q")
 
         sender = NumberedSender(primary, url=url, queue="q", failover_updates = False)
@@ -288,19 +335,75 @@ class ShortTests(BrokerTest):
         receiver.stop()
 
     def test_backup_failover(self):
-        brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
-                    for name in ["a","b","c"] ]
-        url = ",".join([b.host_port() for b in brokers])
-        for b in brokers: b.set_broker_url(url)
-        brokers[0].promote()
+        """Verify that a backup broker fails over and recovers queue state"""
+        brokers = HaCluster(self, 3)
         brokers[0].connect().session().sender(
-            "q;{create:always,%s}"%(self.qpid_replicate())).send("a")
+            "q;{create:always,%s}"%(qr_node())).send("a")
         for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
-        brokers[0].kill()
-        brokers[2].promote()            # c must fail over to b.
-        brokers[2].connect().session().sender("q").send("b")
-        self.assert_browse_backup(brokers[1], "q", ["a","b"])
-        for b in brokers[1:]: b.kill()
+        brokers[0].expect = EXPECT_EXIT_FAIL
+        brokers.kill(0)
+        brokers[1].connect().session().sender("q").send("b")
+        self.assert_browse_backup(brokers[2], "q", ["a","b"])
+        s = brokers[1].connect().session()
+        self.assertEqual("a", s.receiver("q").fetch().content)
+        s.acknowledge()
+        self.assert_browse_backup(brokers[2], "q", ["b"])
+
+    def test_qpid_config_replication(self):
+        """Set up replication via qpid-config"""
+        brokers = HaCluster(self,2)
+        brokers[0].config_declare("q","messages")
+        brokers[0].connect().session().sender("q").send("foo")
+        self.assert_browse_backup(brokers[1], "q", ["foo"])
+
+    def test_standalone_queue_replica(self):
+        """Test replication of individual queues outside of cluster mode"""
+        getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+        primary = HaBroker(self, name="primary", ha_cluster=False, args=["--log-enable=debug+"])
+        pc = primary.connect()
+        ps = pc.session().sender("q;{create:always}")
+        pr = pc.session().receiver("q;{create:always}")
+        backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"])
+        br = backup.connect().session().receiver("q;{create:always}")
+
+        # Set up replication with qpid-ha
+        backup.replicate(primary.host_port(), "q")
+        ps.send("a")
+        self.assert_browse_backup(backup, "q", ["a"])
+        ps.send("b")
+        self.assert_browse_backup(backup, "q", ["a", "b"])
+        self.assertEqual("a", pr.fetch().content)
+        pr.session.acknowledge()
+        self.assert_browse_backup(backup, "q", ["b"])
+
+        # Set up replication with qpid-config
+        ps2 = pc.session().sender("q2;{create:always}")
+        backup.config_replicate(primary.host_port(), "q2");
+        ps2.send("x")
+        self.assert_browse_backup(backup, "q2", ["x"])
+
+
+    def test_queue_replica_failover(self):
+        """Test individual queue replication from a cluster to a standalone backup broker, verify it fails over."""
+        getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
+        cluster = HaCluster(self, 2)
+        primary = cluster[0]
+        pc = cluster.connect(0)
+        ps = pc.session().sender("q;{create:always,%s}"%qr_node("messages"))
+        pr = pc.session().receiver("q;{create:always,%s}"%qr_node("messages"))
+        backup = HaBroker(self, name="backup", ha_cluster=False, args=["--log-enable=debug+"])
+        br = backup.connect().session().receiver("q;{create:always}")
+        backup.replicate(cluster.url, "q")
+        ps.send("a")
+        self.assert_browse_backup(backup, "q", ["a"])
+        cluster.bounce(0)
+        self.assert_browse_backup(backup, "q", ["a"])
+        ps.send("b")
+        self.assert_browse_backup(backup, "q", ["a", "b"])
+        cluster.bounce(1)
+        self.assertEqual("a", pr.fetch().content)
+        pr.session.acknowledge()
+        self.assert_browse_backup(backup, "q", ["b"])
 
     def test_lvq(self):
         """Verify that we replicate to an LVQ correctly"""
@@ -328,6 +431,7 @@ class ShortTests(BrokerTest):
         self.assert_browse_backup(backup, "q", [str(i) for i in range(5,10)])
 
     def test_reject(self):
+        getLogger().setLevel(ERROR) # Hide expected WARNING log messages from failover.
         primary  = HaBroker(self, name="primary")
         primary.promote()
         backup = HaBroker(self, name="backup", broker_url=primary.host_port())

Modified: qpid/trunk/qpid/tools/src/py/qpid-config
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-config?rev=1295339&r1=1295338&r2=1295339&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-config (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-config Wed Feb 29 23:38:00 2012
@@ -77,7 +77,7 @@ Replication levels:
     messages       - replicate configuration and messages
 """
 
-REPLICATE_LEVELS= ["none", "configuration", "messages"]
+REPLICATION_LEVELS= ["none", "configuration", "messages"]
 
 class Config:
     def __init__(self):
@@ -87,7 +87,7 @@ class Config:
         self._ignoreDefault     = False
         self._altern_ex         = None
         self._durable           = False
-        self._replicate         = None
+        self._replication       = None
         self._ha_admin          = False
         self._clusterDurable    = False
         self._if_empty          = True
@@ -110,6 +110,7 @@ class Config:
         self._msgGroupHeader    = None
         self._sharedMsgGroup    = False
         self._extra_arguments   = []
+        self._replicate_from    = None
         self._returnCode        = 0
 
 config = Config()
@@ -130,7 +131,7 @@ FLOW_STOP_SIZE    = "qpid.flow_stop_size
 FLOW_RESUME_SIZE  = "qpid.flow_resume_size"
 MSG_GROUP_HDR_KEY = "qpid.group_header_key"
 SHARED_MSG_GROUP  = "qpid.shared_msg_group"
-REPLICATE = "qpid.replicate"
+REPLICATION = "qpid.replicate"
 #There are various arguments to declare that have specific program
 #options in this utility. However there is now a generic mechanism for
 #passing arguments as well. The SPECIAL_ARGS list contains the
@@ -141,7 +142,7 @@ SPECIAL_ARGS=[
     FILECOUNT,FILESIZE,MAX_QUEUE_SIZE,MAX_QUEUE_COUNT,POLICY_TYPE,CLUSTER_DURABLE,
     LVQ_KEY,MSG_SEQUENCE,IVE,QUEUE_EVENT_GENERATION,
     FLOW_STOP_COUNT,FLOW_STOP_SIZE,FLOW_RESUME_SIZE,
-    MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATE]
+    MSG_GROUP_HDR_KEY,SHARED_MSG_GROUP,REPLICATION]
 
 class JHelpFormatter(IndentedHelpFormatter):
     """Format usage and description without stripping newlines from usage strings
@@ -185,7 +186,7 @@ def OptionsAndArguments(argv):
     group2 = OptionGroup(parser, "Options for Adding Exchanges and Queues")
     group2.add_option("--alternate-exchange", action="store", type="string", metavar="<aexname>", help="Name of the alternate-exchange for the new queue or exchange. Exchanges route messages to the alternate exchange if they are unable to route them elsewhere. Queues route messages to the alternate exchange if they are rejected by a subscriber or orphaned by queue deletion.")
     group2.add_option("--durable", action="store_true", help="The new queue or exchange is durable.")
-    group2.add_option("--replicate", action="store", metavar="<level>", help="Replication level for the new queue or exchange (none, configuration or messages).")
+    group2.add_option("--replication", action="store", metavar="<level>", help="Enable automatic replication in a HA cluster. <level> is 'none', 'configuration' or 'messages').")
     group2.add_option("--ha-admin", action="store_true", help="Allow connection to a HA backup broker.")
     parser.add_option_group(group2)
 
@@ -212,6 +213,7 @@ def OptionsAndArguments(argv):
                       help="Allow message group consumption across multiple consumers.")
     group3.add_option("--argument", dest="extra_arguments", action="append", default=[],
                       metavar="<NAME=VALUE>", help="Specify a key-value pair to add to queue arguments")
+    group3.add_option("--replicate-from", metavar="<broker-url>", help="Replicate from the same-named queue at <broker-url>")
     # no option for declaring an exclusive queue - which can only be used by the session that creates it.
     parser.add_option_group(group3)
 
@@ -252,10 +254,10 @@ def OptionsAndArguments(argv):
         config._altern_ex = opts.alternate_exchange
     if opts.durable:
         config._durable = True
-    if opts.replicate:
-        if not opts.replicate in REPLICATE_LEVELS:
-            raise Exception("Invalid replicate level '%s', should be one of: %s" % (opts.replicate, ", ".join(REPLICATE_LEVELS)))
-        config._replicate = opts.replicate
+    if opts.replication:
+        if not opts.replication in REPLICATION_LEVELS:
+            raise Exception("Invalid replication level '%s', should be one of: %s" % (opts.replication, ", ".join(REPLICATION_LEVELS)))
+        config._replication = opts.replication
     if opts.ha_admin: config._ha_admin = True
     if opts.cluster_durable:
         config._clusterDurable = True
@@ -302,6 +304,8 @@ def OptionsAndArguments(argv):
         config._sharedMsgGroup = True
     if opts.extra_arguments:
         config._extra_arguments = opts.extra_arguments
+    if opts.replicate_from:
+        config._replicate_from = opts.replicate_from
     return args
 
 
@@ -464,7 +468,7 @@ class BrokerManager:
                 args = q.arguments
                 if not args: args = {}
                 if q.durable:    print "--durable",
-                if REPLICATE in args: print "--replicate=%s" % args[REPLICATE],
+                if REPLICATION in args: print "--replication=%s" % args[REPLICATION],
                 if CLUSTER_DURABLE in args and args[CLUSTER_DURABLE] == 1: print "--cluster-durable",
                 if q.autoDelete: print "auto-del",
                 if q.exclusive:  print "excl",
@@ -526,8 +530,8 @@ class BrokerManager:
             declArgs['alternate-exchange'] = config._altern_ex
         if config._durable:
             declArgs['durable'] = 1
-        if config._replicate:
-            declArgs[REPLICATE] = config._replicate
+        if config._replication:
+            declArgs[REPLICATION] = config._replication
         self.broker.addExchange(etype, ename, declArgs)
 
 
@@ -594,11 +598,11 @@ class BrokerManager:
             declArgs['alternate-exchange'] = config._altern_ex
         if config._durable:
             declArgs['durable'] = 1
-        if config._replicate:
-            declArgs[REPLICATE] = config._replicate
-
+        if config._replication:
+            declArgs[REPLICATION] = config._replication
         self.broker.addQueue(qname, declArgs)
-
+        if config._replicate_from:      # Start replication
+            self.broker._method("replicate", {"broker":config._replicate_from, "queue":qname}, "org.apache.qpid.ha:habroker:ha-broker")
 
     def DelQueue(self, args):
         if len(args) < 1:
@@ -751,9 +755,9 @@ def main(argv=None):
             if e.__class__.__name__ != "Timeout":
                 print "Failed: %s: %s" % (e.__class__.__name__, e)
                 return 1
-
     return config._returnCode
 
+
 if __name__ == "__main__":
         sys.exit(main())
 

Modified: qpid/trunk/qpid/tools/src/py/qpid-ha
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-ha?rev=1295339&r1=1295338&r2=1295339&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-ha (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-ha Wed Feb 29 23:38:00 2012
@@ -83,7 +83,9 @@ ReadyCmd()
 
 class ReplicateCmd(Command):
     def __init__(self):
-        Command.__init__(self, "replicate", "Replicate <queue> from broker <primary> to the current broker.", ["<queue>", "<primary>"])
+        Command.__init__(self, "replicate", "Set up replication from <queue> on <remote-broker> to <queue> on the current broker.", ["<queue>", "<remote-broker>"])
+    def do_execute(self, qmf_broker, opts, args):
+        qmf_broker._method("replicate", {"broker":args[1], "queue":args[2]}, HA_BROKER)
 ReplicateCmd()
 
 class SetCmd(Command):



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org