You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/02/07 20:26:12 UTC

svn commit: r1443678 - in /qpid/trunk/qpid/cpp/src: qpid/ha/ tests/

Author: aconway
Date: Thu Feb  7 19:26:12 2013
New Revision: 1443678

URL: http://svn.apache.org/r1443678
Log:
QPID-4555: HA Primary sets explicit qpid.replicate in Queue and Exchange arguments.

Previously both Primary and Backup would calculate the qpid.replicate value
independently, assuming the result would be the same. In the case of exclusive
queues, the exclusivity can change over time so its possible that primary and
backup won't agree.

Now only Primary does the calculation with exclusive, auto-delete etc. and puts
an explicity qpid.replicate in the queue or event arguments. Backup uses the
value set by primary.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1443678&r1=1443677&r2=1443678&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Thu Feb  7 19:26:12 2013
@@ -242,12 +242,14 @@ class BrokerReplicator::UpdateTracker {
 
     /** Add an exchange name */
     void addExchange(Exchange::shared_ptr ex)  {
-        if (repTest.isReplicated(CONFIGURATION, *ex)) initial.insert(ex->getName());
+        if (repTest.getLevel(*ex))
+            initial.insert(ex->getName());
     }
 
     /** Add a queue name. */
     void addQueue(Queue::shared_ptr q) {
-        if (repTest.isReplicated(CONFIGURATION, *q)) initial.insert(q->getName());
+        if (repTest.getLevel(*q))
+            initial.insert(q->getName());
     }
 
     /** Received an event for name */
@@ -279,7 +281,7 @@ class BrokerReplicator::UpdateTracker {
 
 BrokerReplicator::BrokerReplicator(HaBroker& hb, const boost::shared_ptr<Link>& l)
     : Exchange(QPID_CONFIGURATION_REPLICATOR),
-      logPrefix("Backup: "), replicationTest(hb.getReplicationTest()),
+      logPrefix("Backup: "), replicationTest(NONE),
       haBroker(hb), broker(hb.getBroker()),
       exchanges(broker.getExchanges()), queues(broker.getQueues()),
       link(l),
@@ -472,9 +474,7 @@ void BrokerReplicator::route(Deliverable
 
 void BrokerReplicator::doEventQueueDeclare(Variant::Map& values) {
     Variant::Map argsMap = asMapVoid(values[ARGS]);
-    bool autoDel = values[AUTODEL].asBool();
-    bool excl = values[EXCL].asBool();
-    if (values[DISP] == CREATED && replicationTest.isReplicated(CONFIGURATION, argsMap, autoDel, excl)) {
+    if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) {
         string name = values[QNAME].asString();
         QueueSettings settings(values[DURABLE].asBool(), values[AUTODEL].asBool());
         QPID_LOG(debug, logPrefix << "Queue declare event: " << name);
@@ -488,7 +488,7 @@ void BrokerReplicator::doEventQueueDecla
                      << name);
             deleteQueue(name);
         }
-        replicateQueue(name, values[DURABLE].asBool(), autoDel, args,
+        replicateQueue(name, values[DURABLE].asBool(), values[AUTODEL].asBool(), args,
                        values[ALTEX].asString());
     }
 }
@@ -506,7 +506,7 @@ void BrokerReplicator::doEventQueueDelet
     // sessions may be closed by a "queue deleted" exception.
     string name = values[QNAME].asString();
     boost::shared_ptr<Queue> queue = queues.find(name);
-    if (queue && replicationTest.replicateLevel(queue->getSettings().storeSettings)) {
+    if (queue && replicationTest.getLevel(*queue)) {
         QPID_LOG(debug, logPrefix << "Queue delete event: " << name);
         if (queueTracker.get()) queueTracker->event(name);
         deleteQueue(name);
@@ -515,8 +515,7 @@ void BrokerReplicator::doEventQueueDelet
 
 void BrokerReplicator::doEventExchangeDeclare(Variant::Map& values) {
     Variant::Map argsMap(asMapVoid(values[ARGS]));
-    if (!replicationTest.replicateLevel(argsMap)) return; // Not a replicated exchange.
-    if (values[DISP] == CREATED && replicationTest.replicateLevel(argsMap)) {
+    if (values[DISP] == CREATED && replicationTest.getLevel(argsMap)) {
         string name = values[EXNAME].asString();
         QPID_LOG(debug, logPrefix << "Exchange declare event: " << name);
         if (exchangeTracker.get()) exchangeTracker->event(name);
@@ -542,7 +541,7 @@ void BrokerReplicator::doEventExchangeDe
     boost::shared_ptr<Exchange> exchange = exchanges.find(name);
     if (!exchange) {
         QPID_LOG(warning, logPrefix << "Exchange delete event, not found: " << name);
-    } else if (!replicationTest.replicateLevel(exchange->getArgs())) {
+    } else if (!replicationTest.getLevel(*exchange)) {
         QPID_LOG(warning, logPrefix << "Exchange delete event, not replicated: " << name);
     } else {
         QPID_LOG(debug, logPrefix << "Exchange delete event:" << name);
@@ -559,11 +558,12 @@ void BrokerReplicator::doEventBind(Varia
         queues.find(values[QNAME].asString());
     framing::FieldTable args;
     qpid::amqp_0_10::translate(asMapVoid(values[ARGS]), args);
-    // We only replicate binds for a replicated queue to replicated
-    // exchange that both exist locally.
-    if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
-        queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) &&
-        replicationTest.replicateLevel(args))
+    // We only replicate binds for a replicated queue to replicated exchange
+    // that both exist locally. Respect the replication level set in the
+    // bind arguments, but replicate by default.
+    if (exchange && replicationTest.getLevel(*exchange) &&
+        queue && replicationTest.getLevel(*queue) &&
+        ReplicationTest(ALL).getLevel(args))
     {
         string key = values[KEY].asString();
         QPID_LOG(debug, logPrefix << "Bind event: exchange=" << exchange->getName()
@@ -581,8 +581,8 @@ void BrokerReplicator::doEventUnbind(Var
         queues.find(values[QNAME].asString());
     // We only replicate unbinds for a replicated queue to replicated
     // exchange that both exist locally.
-    if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
-        queue && replicationTest.replicateLevel(queue->getSettings().storeSettings))
+    if (exchange && replicationTest.getLevel(*exchange) &&
+        queue && replicationTest.getLevel(*queue))
     {
         string key = values[KEY].asString();
         QPID_LOG(debug, logPrefix << "Unbind event: exchange=" << exchange->getName()
@@ -630,12 +630,7 @@ Variant getHaUuid(const Variant::Map& ma
 
 void BrokerReplicator::doResponseQueue(Variant::Map& values) {
     Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
-    if (!replicationTest.isReplicated(
-            CONFIGURATION,
-            values[ARGUMENTS].asMap(),
-            values[AUTODELETE].asBool(),
-            values[EXCLUSIVE].asBool()))
-        return;
+    if (!replicationTest.getLevel(argsMap)) return;
     string name(values[NAME].asString());
     if (!queueTracker.get())
         throw Exception(QPID_MSG("Unexpected queue response: " << values));
@@ -664,7 +659,7 @@ void BrokerReplicator::doResponseQueue(V
 
 void BrokerReplicator::doResponseExchange(Variant::Map& values) {
     Variant::Map argsMap(asMapVoid(values[ARGUMENTS]));
-    if (!replicationTest.replicateLevel(argsMap)) return;
+    if (!replicationTest.getLevel(argsMap)) return;
     string name = values[NAME].asString();
     if (!exchangeTracker.get())
         throw Exception(QPID_MSG("Unexpected exchange response: " << values));
@@ -718,10 +713,11 @@ void BrokerReplicator::doResponseBind(Va
     framing::FieldTable args;
     qpid::amqp_0_10::translate(asMapVoid(values[ARGUMENTS]), args);
 
-    // Automatically replicate binding if queue and exchange exist and are replicated
-    if (exchange && replicationTest.replicateLevel(exchange->getArgs()) &&
-        queue && replicationTest.replicateLevel(queue->getSettings().storeSettings) &&
-        replicationTest.replicateLevel(args))
+    // Automatically replicate binding if queue and exchange exist and are replicated.
+    // Respect replicate setting in binding args but default to replicated.
+    if (exchange && replicationTest.getLevel(*exchange) &&
+        queue && replicationTest.getLevel(*queue) &&
+        ReplicationTest(ALL).getLevel(args))
     {
         string key = values[BINDING_KEY].asString();
         QPID_LOG(debug, logPrefix << "Bind response: exchange:" << exName
@@ -741,8 +737,7 @@ void BrokerReplicator::doResponseHaBroke
     try {
         QPID_LOG(trace, logPrefix << "HA Broker response: " << values);
         ReplicateLevel mine = haBroker.getSettings().replicateDefault.get();
-        ReplicateLevel primary = replicationTest.replicateLevel(
-            values[REPLICATE_DEFAULT].asString());
+        ReplicateLevel primary = replicationTest.getLevel(values[REPLICATE_DEFAULT].asString());
         if (mine != primary)
             throw Exception(QPID_MSG("Replicate default on backup (" << mine
                                      << ") does not match primary (" <<  primary << ")"));
@@ -759,7 +754,7 @@ void BrokerReplicator::doResponseHaBroke
 boost::shared_ptr<QueueReplicator> BrokerReplicator::startQueueReplicator(
     const boost::shared_ptr<Queue>& queue)
 {
-    if (replicationTest.replicateLevel(queue->getSettings().storeSettings) == ALL) {
+    if (replicationTest.getLevel(*queue) == ALL) {
         boost::shared_ptr<QueueReplicator> qr(
             new QueueReplicator(haBroker, queue, link));
         if (!exchanges.registerExchange(qr))

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h?rev=1443678&r1=1443677&r2=1443678&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.h Thu Feb  7 19:26:12 2013
@@ -142,8 +142,8 @@ class BrokerReplicator : public broker::
     void setMembership(const types::Variant::List&); // Set membership from list.
 
     std::string logPrefix;
-    std::string userId, remoteHost;
     ReplicationTest replicationTest;
+    std::string userId, remoteHost;
     HaBroker& haBroker;
     broker::Broker& broker;
     broker::ExchangeRegistry& exchanges;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1443678&r1=1443677&r2=1443678&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Thu Feb  7 19:26:12 2013
@@ -61,7 +61,6 @@ using boost::dynamic_pointer_cast;
 HaBroker::HaBroker(broker::Broker& b, const Settings& s)
     : systemId(b.getSystem()->getSystemId().data()),
       settings(s),
-      replicationTest(s.replicateDefault.get()),
       broker(b),
       observer(new ConnectionObserver(*this, systemId)),
       role(new StandAlone),

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1443678&r1=1443677&r2=1443678&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Thu Feb  7 19:26:12 2013
@@ -25,7 +25,6 @@
 #include "BrokerInfo.h"
 #include "Membership.h"
 #include "types.h"
-#include "ReplicationTest.h"
 #include "Settings.h"
 #include "qpid/Url.h"
 #include "qpid/sys/Mutex.h"
@@ -85,7 +84,6 @@ class HaBroker : public management::Mana
     void shutdown(const std::string& message);
 
     BrokerStatus getStatus() const;
-    ReplicationTest getReplicationTest() const { return replicationTest; }
     boost::shared_ptr<ConnectionObserver> getObserver() { return observer; }
 
     BrokerInfo getBrokerInfo() const { return membership.getInfo(); }
@@ -108,7 +106,6 @@ class HaBroker : public management::Mana
     mutable sys::Mutex lock;
     Url publicUrl, brokerUrl;
     std::vector<Url> knownBrokers;
-    ReplicationTest replicationTest;
 
     // Independently thread-safe member variables
     broker::Broker& broker;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1443678&r1=1443677&r2=1443678&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Thu Feb  7 19:26:12 2013
@@ -83,7 +83,8 @@ Primary* Primary::instance = 0;
 
 Primary::Primary(HaBroker& hb, const BrokerInfo::Set& expect) :
     haBroker(hb), membership(hb.getMembership()),
-    logPrefix("Primary: "), active(false)
+    logPrefix("Primary: "), active(false),
+    replicationTest(hb.getSettings().replicateDefault.get())
 {
     hb.getMembership().setStatus(RECOVERING);
     assert(instance == 0);
@@ -97,8 +98,7 @@ Primary::Primary(HaBroker& hb, const Bro
         // the QueueGuards are created.
         QPID_LOG(notice, logPrefix << "Promoted to primary. Expected backups: " << expect);
         for (BrokerInfo::Set::const_iterator i = expect.begin(); i != expect.end(); ++i) {
-            boost::shared_ptr<RemoteBackup> backup(
-                new RemoteBackup(*i, haBroker.getReplicationTest(), 0));
+            boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(*i, 0));
             backups[i->getSystemId()] = backup;
             if (!backup->isReady()) expectedBackups.insert(backup);
             backup->setCatchupQueues(hb.getBroker().getQueues(), true); // Create guards
@@ -196,19 +196,25 @@ void Primary::readyReplica(const Replica
 
 // NOTE: Called with queue registry lock held.
 void Primary::queueCreate(const QueuePtr& q) {
-    if (haBroker.getReplicationTest().isReplicated(CONFIGURATION, *q)) {
+    // Set replication argument.
+    ReplicateLevel level = replicationTest.useLevel(*q);
+    QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
+             << " replication: " << printable(level));
+    q->addArgument(QPID_REPLICATE, printable(level).str());
+    if (level) {
         // Give each queue a unique id to avoid confusion of same-named queues.
         q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));
-    }
-    Mutex::ScopedLock l(lock);
-    for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
-        i->second->queueCreate(q);
-        checkReady(i, l);
+        Mutex::ScopedLock l(lock);
+        for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i) {
+            i->second->queueCreate(q);
+            checkReady(i, l);
+        }
     }
 }
 
 // NOTE: Called with queue registry lock held.
 void Primary::queueDestroy(const QueuePtr& q) {
+        QPID_LOG(debug, logPrefix << "Destroyed queue " << q->getName());
     Mutex::ScopedLock l(lock);
     for (BackupMap::iterator i = backups.begin(); i != backups.end(); ++i)
         i->second->queueDestroy(q);
@@ -217,16 +223,21 @@ void Primary::queueDestroy(const QueuePt
 
 // NOTE: Called with exchange registry lock held.
 void Primary::exchangeCreate(const ExchangePtr& ex) {
-    if (haBroker.getReplicationTest().isReplicated(CONFIGURATION, *ex)) {
+    ReplicateLevel level = replicationTest.useLevel(*ex);
+    QPID_LOG(debug, logPrefix << "Created exchange " << ex->getName()
+             << " replication: " << printable(level));
+    FieldTable args = ex->getArgs();
+    args.setString(QPID_REPLICATE, printable(level).str()); // Set replication arg.
+    if (level) {
         // Give each exchange a unique id to avoid confusion of same-named exchanges.
-        FieldTable args = ex->getArgs();
         args.set(QPID_HA_UUID, FieldTable::ValuePtr(new UuidValue(&Uuid(true)[0])));
-        ex->setArgs(args);
     }
+    ex->setArgs(args);
 }
 
 // NOTE: Called with exchange registry lock held.
-void Primary::exchangeDestroy(const ExchangePtr&) {
+void Primary::exchangeDestroy(const ExchangePtr& ex) {
+    QPID_LOG(debug, logPrefix << "Destroyed exchange " << ex->getName());
     // Do nothing
  }
 
@@ -237,8 +248,7 @@ void Primary::opened(broker::Connection&
         BackupMap::iterator i = backups.find(info.getSystemId());
         if (i == backups.end()) {
             QPID_LOG(info, logPrefix << "New backup connected: " << info);
-            boost::shared_ptr<RemoteBackup> backup(
-                new RemoteBackup(info, haBroker.getReplicationTest(), &connection));
+            boost::shared_ptr<RemoteBackup> backup(new RemoteBackup(info, &connection));
             {
                 // Avoid deadlock with queue registry lock.
                 Mutex::ScopedUnlock u(lock);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h?rev=1443678&r1=1443677&r2=1443678&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h Thu Feb  7 19:26:12 2013
@@ -24,6 +24,7 @@
 
 #include "types.h"
 #include "BrokerInfo.h"
+#include "ReplicationTest.h"
 #include "Role.h"
 #include "qpid/sys/Mutex.h"
 #include <boost/shared_ptr.hpp>
@@ -104,6 +105,8 @@ class Primary : public Role
     Membership& membership;
     std::string logPrefix;
     bool active;
+    ReplicationTest replicationTest;
+
     /**
      * Set of expected backups that must be ready before we declare ourselves
      * active. These are backups that were known and ready before the primary

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1443678&r1=1443677&r2=1443678&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Thu Feb  7 19:26:12 2013
@@ -34,15 +34,16 @@ using sys::Mutex;
 using boost::bind;
 
 RemoteBackup::RemoteBackup(
-    const BrokerInfo& info, ReplicationTest rt, broker::Connection* c
+    const BrokerInfo& info, broker::Connection* c
 ) : logPrefix("Primary: Remote backup "+info.getLogId()+": "),
-    brokerInfo(info), replicationTest(rt), connection(c), reportedReady(false)
+    brokerInfo(info), replicationTest(NONE), connection(c), reportedReady(false)
 {}
 
 void RemoteBackup::setCatchupQueues(broker::QueueRegistry& queues, bool createGuards)
 {
-    QPID_LOG(debug, logPrefix << "Setting catch-up queues" << (createGuards ? " and guards" : ""));
     queues.eachQueue(boost::bind(&RemoteBackup::catchupQueue, this, _1, createGuards));
+    QPID_LOG(debug, logPrefix << "Set " << catchupQueues.size() << " catch-up queues"
+             << (createGuards ? " and guards" : ""));
 }
 
 RemoteBackup::~RemoteBackup() { cancel(); }
@@ -64,7 +65,7 @@ bool RemoteBackup::isReady() {
 }
 
 void RemoteBackup::catchupQueue(const QueuePtr& q, bool createGuard) {
-    if (replicationTest.isReplicated(ALL, *q)) {
+    if (replicationTest.getLevel(*q) == ALL) {
         QPID_LOG(debug, logPrefix << "Catch-up queue"
                  << (createGuard ? " and guard" : "") << ": " << q->getName());
         catchupQueues.insert(q);
@@ -105,7 +106,7 @@ void RemoteBackup::ready(const QueuePtr&
 
 // Called via ConfigurationObserver::queueCreate and from catchupQueue
 void RemoteBackup::queueCreate(const QueuePtr& q) {
-    if (replicationTest.isReplicated(ALL, *q))
+    if (replicationTest.getLevel(*q) == ALL)
         guards[q].reset(new QueueGuard(*q, brokerInfo));
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h?rev=1443678&r1=1443677&r2=1443678&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h Thu Feb  7 19:26:12 2013
@@ -55,7 +55,7 @@ class RemoteBackup
     /** Note: isReady() can be true after construction
      *@param connected true if the backup is already connected.
      */
-    RemoteBackup(const BrokerInfo&, ReplicationTest, broker::Connection*);
+    RemoteBackup(const BrokerInfo&, broker::Connection*);
     ~RemoteBackup();
 
     /** Set all queues in the registry as catch-up queues.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp?rev=1443678&r1=1443677&r2=1443678&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.cpp Thu Feb  7 19:26:12 2013
@@ -19,6 +19,7 @@
  *
  */
 #include "ReplicationTest.h"
+#include "qpid/log/Statement.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/Exchange.h"
 #include "qpid/framing/FieldTable.h"
@@ -28,53 +29,49 @@ namespace ha {
 
 using types::Variant;
 
-ReplicateLevel ReplicationTest::replicateLevel(const std::string& str) {
+ReplicateLevel ReplicationTest::getLevel(const std::string& str) {
     Enum<ReplicateLevel> rl(replicateDefault);
     if (!str.empty()) rl.parse(str);
     return rl.get();
 }
 
-ReplicateLevel ReplicationTest::replicateLevel(const framing::FieldTable& f) {
+ReplicateLevel ReplicationTest::getLevel(const framing::FieldTable& f) {
     if (f.isSet(QPID_REPLICATE))
-        return replicateLevel(f.getAsString(QPID_REPLICATE));
+        return getLevel(f.getAsString(QPID_REPLICATE));
     else
         return replicateDefault;
 }
 
-ReplicateLevel ReplicationTest::replicateLevel(const Variant::Map& m) {
+ReplicateLevel ReplicationTest::getLevel(const Variant::Map& m) {
     Variant::Map::const_iterator i = m.find(QPID_REPLICATE);
     if (i != m.end())
-        return replicateLevel(i->second.asString());
+        return getLevel(i->second.asString());
     else
         return replicateDefault;
 }
 
-namespace {
-const std::string AUTO_DELETE_TIMEOUT("qpid.auto_delete_timeout");
-}
-
-bool ReplicationTest::isReplicated(
-    ReplicateLevel level, const Variant::Map& args, bool autodelete, bool exclusive)
-{
-    bool ignore = autodelete && exclusive && args.find(AUTO_DELETE_TIMEOUT) == args.end();
-    return !ignore && replicateLevel(args) >= level;
+ReplicateLevel ReplicationTest::getLevel(const broker::Queue& q) {
+    const Variant::Map& qmap(q.getSettings().original);
+    Variant::Map::const_iterator i = qmap.find(QPID_REPLICATE);
+    if (i != qmap.end())
+        return getLevel(i->second.asString());
+    else
+        return getLevel(q.getSettings().storeSettings);
 }
 
-bool ReplicationTest::isReplicated(
-    ReplicateLevel level, const framing::FieldTable& args, bool autodelete, bool exclusive)
-{
-    bool ignore = autodelete && exclusive && !args.isSet(AUTO_DELETE_TIMEOUT);
-    return !ignore && replicateLevel(args) >= level;
+ReplicateLevel ReplicationTest::getLevel(const broker::Exchange& ex) {
+    return getLevel(ex.getArgs());
 }
 
-bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Queue& q)
+ReplicateLevel ReplicationTest::useLevel(const broker::Queue& q)
 {
-    return isReplicated(level, q.getSettings().storeSettings, q.isAutoDelete(), q.hasExclusiveOwner());
+    bool ignore = q.isAutoDelete() && q.hasExclusiveOwner() &&
+        !q.getSettings().autoDeleteDelay;
+    return ignore ? ReplicationTest(NONE).getLevel(q) : getLevel(q);
 }
 
-bool ReplicationTest::isReplicated(ReplicateLevel level, const broker::Exchange& ex)
-{
-    return replicateLevel(ex.getArgs()) >= level;
+ReplicateLevel ReplicationTest::useLevel(const broker::Exchange& ex) {
+    return ReplicationTest::getLevel(ex);
 }
 
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h?rev=1443678&r1=1443677&r2=1443678&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicationTest.h Thu Feb  7 19:26:12 2013
@@ -48,22 +48,24 @@ class ReplicationTest
     ReplicationTest(ReplicateLevel replicateDefault_) :
         replicateDefault(replicateDefault_) {}
 
-    // Return the simple replication level, accounting for defaults.
-    ReplicateLevel replicateLevel(const std::string& str);
-    ReplicateLevel replicateLevel(const framing::FieldTable& f);
-    ReplicateLevel replicateLevel(const types::Variant::Map& m);
+    // Get the replication level set on an object, or default if not set.
+    ReplicateLevel getLevel(const std::string& str);
+    ReplicateLevel getLevel(const framing::FieldTable& f);
+    ReplicateLevel getLevel(const types::Variant::Map& m);
+    ReplicateLevel getLevel(const broker::Queue&);
+    ReplicateLevel getLevel(const broker::Exchange&);
+
+    // Calculate level for objects that may not have replication set,
+    // including auto-delete/exclusive settings.
+    ReplicateLevel useLevel(const types::Variant::Map& args, bool autodelete, bool exclusive);
+    ReplicateLevel useLevel(const framing::FieldTable& args, bool autodelete, bool exclusive);
+    ReplicateLevel useLevel(const broker::Queue&);
+    ReplicateLevel useLevel(const broker::Exchange&);
 
-    // Return true if replication for a queue is enabled at level or higher,
-    // taking account of default level and queue settings.
-    bool isReplicated(ReplicateLevel level,
-                      const types::Variant::Map& args, bool autodelete, bool exclusive);
-    bool isReplicated(ReplicateLevel level,
-                      const framing::FieldTable& args, bool autodelete, bool exclusive);
-    bool isReplicated(ReplicateLevel level, const broker::Queue&);
-    bool isReplicated(ReplicateLevel level, const broker::Exchange&);
   private:
     ReplicateLevel replicateDefault;
 };
+
 }} // namespace qpid::ha
 
 #endif  /*!QPID_HA_REPLICATIONTEST_H*/

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1443678&r1=1443677&r2=1443678&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Thu Feb  7 19:26:12 2013
@@ -702,15 +702,17 @@ acl deny all all
         s.sender("e1;{create:always, node:{type:topic}}")
 
         # cluster[1] will be the backup, has extra queues/exchanges
+        xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}"
+        node = "node:{%s}"%(xdecl)
         s = cluster[1].connect_admin().session()
-        s.sender("q1;{create:always}")
-        s.sender("q2;{create:always}")
-        s.sender("e1;{create:always, node:{type:topic}}")
-        s.sender("e2;{create:always, node:{type:topic}}")
+        s.sender("q1;{create:always, %s}"%(node))
+        s.sender("q2;{create:always, %s}"%(node))
+        s.sender("e1;{create:always, node:{type:topic, %s}}"%(xdecl))
+        s.sender("e2;{create:always, node:{type:topic, %s}}"%(xdecl))
         for a in ["q1", "q2", "e1", "e2"]: cluster[1].wait_backup(a)
 
         cluster[0].promote()
-        # Verify the backup deletes the surpluis queue and exchange
+        # Verify the backup deletes the surplus queue and exchange
         cluster[1].wait_status("ready")
         s = cluster[1].connect_admin().session()
         self.assertRaises(NotFound, s.receiver, ("q2"));
@@ -868,12 +870,14 @@ acl deny all all
 
         # Simulate the race by re-creating the objects before promoting the new primary
         cluster.kill(0, False)
+        xdecl = "x-declare:{arguments:{'qpid.replicate':'all'}}"
+        node = "node:{%s}"%(xdecl)
         sn = cluster[1].connect_admin().session()
         sn.sender("qq;{delete:always}").close()
-        s = sn.sender("qq;{create:always}")
+        s = sn.sender("qq;{create:always, %s}"%(node))
         s.send("foo")
         sn.sender("xx;{delete:always}").close()
-        sn.sender("xx;{create:always,node:{type:topic}}")
+        sn.sender("xx;{create:always,node:{type:topic,%s}}"%(xdecl))
         cluster[1].promote()
         cluster[1].wait_status("active")
         # Verify we are not still using the old objects on cluster[2]



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org