You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/12/13 20:30:13 UTC

svn commit: r1550819 - in /qpid/trunk/qpid/cpp/src: qpid/broker/ qpid/ha/ tests/

Author: aconway
Date: Fri Dec 13 19:30:12 2013
New Revision: 1550819

URL: http://svn.apache.org/r1550819
Log:
QPID-5421: HA replication error in stand-alone replication

There were replication errors because with stand-alone replication an IdSetter
was not set on the original queue until queue replication was set up. Any
messages on the queue *before* replication was setup had 0 replication IDs. When
one of those messages was dequeued on the source queue, an incorrect message was
dequeued on the replica queue.

The fix is to add an IdSetter to every queue when replication is enabled.

The unit test  ha_tests.ReplicationTests.test_standalone_queue_replica has been
updated to test for this issue.

This commit also has some general tidy-up work around IdSetter and QueueSnapshot.

Removed:
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshots.h
Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Observers.h
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
    qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshot.h
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Observers.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Observers.h?rev=1550819&r1=1550818&r2=1550819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Observers.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Observers.h Fri Dec 13 19:30:12 2013
@@ -59,6 +59,14 @@ class Observers
         std::for_each(copy.begin(), copy.end(), f);
     }
 
+    template <class T> boost::shared_ptr<T> findType() const {
+        sys::Mutex::ScopedLock l(lock);
+        typename Set::const_iterator i =
+            std::find_if(observers.begin(), observers.end(), &isA<T>);
+        return i == observers.end() ?
+            boost::shared_ptr<T>() : boost::dynamic_pointer_cast<T>(*i);
+    }
+
   protected:
     typedef std::set<ObserverPtr> Set;
     Observers() : lock(myLock) {}
@@ -71,6 +79,10 @@ class Observers
         std::for_each(observers.begin(), observers.end(), f);
     }
 
+    template <class T> static bool isA(const ObserverPtr&o) {
+        return boost::dynamic_pointer_cast<T>(o);
+    }
+
     mutable sys::Mutex myLock;
     mutable sys::Mutex& lock;
     Set observers;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1550819&r1=1550818&r2=1550819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Dec 13 19:30:12 2013
@@ -892,7 +892,7 @@ void BrokerReplicator::disconnected() {
 
     // Make copy of exchanges so we can work outside the registry lock.
     ExchangeVector exs;
-    exchanges.eachExchange(boost::bind(&exchangeAccumulatorCallback, boost::ref(exs), _1));
+    exchanges.eachExchange(boost::bind(&ExchangeVector::push_back, boost::ref(exs), _1));
     for_each(exs.begin(), exs.end(),
              boost::bind(&BrokerReplicator::disconnectedQueueReplicator, this, _1));
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp?rev=1550819&r1=1550818&r2=1550819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.cpp Fri Dec 13 19:30:12 2013
@@ -22,17 +22,18 @@
 #include "BackupConnectionExcluder.h"
 #include "ConnectionObserver.h"
 #include "HaBroker.h"
+#include "IdSetter.h"
 #include "Primary.h"
 #include "QueueReplicator.h"
 #include "ReplicatingSubscription.h"
 #include "Settings.h"
 #include "StandAlone.h"
 #include "QueueSnapshot.h"
-#include "QueueSnapshots.h"
 #include "qpid/amqp_0_10/Codecs.h"
 #include "qpid/assert.h"
 #include "qpid/Exception.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/BrokerObserver.h"
 #include "qpid/broker/Link.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/SignalHandler.h"
@@ -60,6 +61,20 @@ using sys::Mutex;
 using boost::shared_ptr;
 using boost::dynamic_pointer_cast;
 
+// In a HaBroker we always need to add QueueSnapshot and IdSetter to each queue
+// because we don't know in advance which queues might be used for stand-alone
+// replication.
+//
+// TODO aconway 2013-12-13: Can we restrict this to queues identified as replicated?
+//
+class HaBroker::BrokerObserver : public broker::BrokerObserver {
+  public:
+    void queueCreate(const boost::shared_ptr<broker::Queue>& q) {
+        q->getObservers().add(boost::shared_ptr<QueueSnapshot>(new QueueSnapshot));
+        q->getMessageInterceptors().add(boost::shared_ptr<IdSetter>(new IdSetter));
+    }
+};
+
 // Called in Plugin::earlyInitialize
 HaBroker::HaBroker(broker::Broker& b, const Settings& s)
     : systemId(b.getSystem()->getSystemId().data()),
@@ -69,8 +84,7 @@ HaBroker::HaBroker(broker::Broker& b, co
       observer(new ConnectionObserver(*this, systemId)),
       role(new StandAlone),
       membership(BrokerInfo(systemId, STANDALONE), *this),
-      failoverExchange(new FailoverExchange(*b.GetVhostObject(), b)),
-      queueSnapshots(shared_ptr<QueueSnapshots>(new QueueSnapshots))
+      failoverExchange(new FailoverExchange(*b.GetVhostObject(), b))
 {
     // If we are joining a cluster we must start excluding clients now,
     // otherwise there's a window for a client to connect before we get to
@@ -82,8 +96,7 @@ HaBroker::HaBroker(broker::Broker& b, co
         broker.getConnectionObservers().add(observer);
         broker.getExchanges().registerExchange(failoverExchange);
     }
-    // QueueSnapshots are needed for standalone replication as well as cluster.
-    broker.getBrokerObservers().add(queueSnapshots);
+    broker.getBrokerObservers().add(boost::shared_ptr<BrokerObserver>(new BrokerObserver()));
 }
 
 namespace {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h?rev=1550819&r1=1550818&r2=1550819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/HaBroker.h Fri Dec 13 19:30:12 2013
@@ -54,8 +54,6 @@ class Backup;
 class ConnectionObserver;
 class Primary;
 class Role;
-class QueueSnapshot;
-class QueueSnapshots;
 class QueueReplicator;
 
 /**
@@ -98,14 +96,14 @@ class HaBroker : public management::Mana
 
     void setAddress(const Address&); // set self address from a self-connection
 
-    boost::shared_ptr<QueueSnapshots> getQueueSnapshots() { return queueSnapshots; }
-
     boost::shared_ptr<QueueReplicator> findQueueReplicator(const std::string& queueName);
 
     /** Authenticated user ID for queue create/delete */
     std::string getUserId() const { return userId; }
 
   private:
+    class BrokerObserver;
+
     void setPublicUrl(const Url&);
     void setBrokerUrl(const Url&);
     void updateClientUrl(sys::Mutex::ScopedLock&);
@@ -129,7 +127,6 @@ class HaBroker : public management::Mana
     boost::shared_ptr<Role> role;
     Membership membership;
     boost::shared_ptr<FailoverExchange> failoverExchange;
-    boost::shared_ptr<QueueSnapshots> queueSnapshots;
 };
 }} // namespace qpid::ha
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h?rev=1550819&r1=1550818&r2=1550819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/IdSetter.h Fri Dec 13 19:30:12 2013
@@ -43,15 +43,11 @@ namespace ha {
 class IdSetter : public broker::MessageInterceptor
 {
   public:
-    IdSetter(const std::string& q, ReplicationId firstId) : nextId(firstId), name(q) {
-        QPID_LOG(trace, "Initial replication ID for " << name << " =" << nextId.get());
-    }
-
+    IdSetter(ReplicationId firstId=1) : nextId(firstId) {}
     void record(broker::Message& m) { m.setReplicationId(nextId++); }
 
   private:
     sys::AtomicValue<uint32_t> nextId;
-    std::string name;
 };
 
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1550819&r1=1550818&r2=1550819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Fri Dec 13 19:30:12 2013
@@ -104,8 +104,6 @@ Primary::Primary(HaBroker& hb, const Bro
     QueueReplicator::copy(hb.getBroker().getExchanges(), qrs);
     std::for_each(qrs.begin(), qrs.end(), boost::bind(&QueueReplicator::promoted, _1));
 
-    broker::QueueRegistry& queues = hb.getBroker().getQueues();
-    queues.eachQueue(boost::bind(&Primary::initializeQueue, this, _1));
     if (expect.empty()) {
         QPID_LOG(notice, logPrefix << "Promoted to primary. No expected backups.");
     }
@@ -140,15 +138,6 @@ Primary::~Primary() {
     haBroker.getObserver()->reset();
 }
 
-void Primary::initializeQueue(boost::shared_ptr<broker::Queue> q) {
-    if (replicationTest.useLevel(*q) == ALL) {
-        boost::shared_ptr<QueueReplicator> qr = haBroker.findQueueReplicator(q->getName());
-        ReplicationId firstId = qr ? qr->getMaxId()+1 : ReplicationId(1);
-        q->getMessageInterceptors().add(
-            boost::shared_ptr<IdSetter>(new IdSetter(q->getName(), firstId)));
-    }
-}
-
 void Primary::checkReady() {
     bool activate = false;
     {
@@ -261,7 +250,6 @@ void Primary::queueCreate(const QueuePtr
     if (level) {
         QPID_LOG(debug, logPrefix << "Created queue " << q->getName()
                  << " replication: " << printable(level));
-        initializeQueue(q);
         // Give each queue a unique id. Used by backups to avoid confusion of
         // same-named queues.
         q->addArgument(QPID_HA_UUID, types::Variant(Uuid(true)));

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h?rev=1550819&r1=1550818&r2=1550819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h Fri Dec 13 19:30:12 2013
@@ -125,7 +125,6 @@ class Primary : public Role
     RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&);
     void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&);
 
-    void initializeQueue(boost::shared_ptr<broker::Queue>);
     void checkReady();
     void checkReady(RemoteBackupPtr);
     void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1550819&r1=1550818&r2=1550819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Dec 13 19:30:12 2013
@@ -21,8 +21,9 @@
 
 #include "Event.h"
 #include "HaBroker.h"
+#include "IdSetter.h"
 #include "QueueReplicator.h"
-#include "QueueSnapshots.h"
+#include "QueueSnapshot.h"
 #include "ReplicatingSubscription.h"
 #include "Settings.h"
 #include "types.h"
@@ -122,6 +123,11 @@ QueueReplicator::QueueReplicator(HaBroke
       settings(hb.getSettings()),
       nextId(0), maxId(0)
 {
+    // The QueueReplicator will take over setting replication IDs.
+    boost::shared_ptr<IdSetter> setter =
+        q->getMessageInterceptors().findType<IdSetter>();
+    if (setter) q->getMessageInterceptors().remove(setter);
+
     args.setString(QPID_REPLICATE, printable(NONE).str());
     Uuid uuid(true);
     bridgeName = replicatorName(q->getName()) + std::string(".") + uuid.str();
@@ -212,8 +218,9 @@ void QueueReplicator::initializeBridge(B
     arguments.setString(ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION, getType());
     arguments.setInt(QPID_SYNC_FREQUENCY, 1); // TODO aconway 2012-05-22: optimize?
     arguments.setTable(ReplicatingSubscription::QPID_BROKER_INFO, brokerInfo.asFieldTable());
-    arguments.setString(ReplicatingSubscription::QPID_ID_SET,
-                        encodeStr(haBroker.getQueueSnapshots()->get(queue)->snapshot()));
+    boost::shared_ptr<QueueSnapshot> qs = queue->getObservers().findType<QueueSnapshot>();
+    if (qs) arguments.setString(ReplicatingSubscription::QPID_ID_SET, encodeStr(qs->getSnapshot()));
+
     try {
         peer.getMessage().subscribe(
             args.i_src, args.i_dest, 0/*accept-explicit*/, 1/*not-acquired*/,
@@ -254,6 +261,7 @@ void QueueReplicator::dequeueEvent(const
 }
 
 // Called in connection thread of the queues bridge to primary.
+
 void QueueReplicator::route(Deliverable& deliverable)
 {
     try {
@@ -293,11 +301,6 @@ void QueueReplicator::idEvent(const stri
     nextId = decodeStr<IdEvent>(data).id;
 }
 
-ReplicationId QueueReplicator::getMaxId() {
-    Mutex::ScopedLock l(lock);
-    return maxId;
-}
-
 void QueueReplicator::incomingExecutionException(ErrorCode e, const std::string& msg) {
     if (e == ERROR_CODE_NOT_FOUND || e == ERROR_CODE_RESOURCE_DELETED) {
         // If the queue is destroyed at the same time we are subscribing, we may
@@ -320,14 +323,19 @@ bool QueueReplicator::hasBindings() { re
 std::string QueueReplicator::getType() const { return ReplicatingSubscription::QPID_QUEUE_REPLICATOR; }
 
 void QueueReplicator::promoted() {
-    // Promoted to primary, deal with auto-delete now.
-    if (queue && queue->isAutoDelete() && subscribed) {
-        // Make a temporary shared_ptr to prevent premature deletion of queue.
-        // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
-        // which could delete the queue while it's still running it's destroyed logic.
-        boost::shared_ptr<Queue> q(queue);
-        q->releaseFromUse();
-        q->scheduleAutoDelete();
+    if (queue) {
+        // On primary QueueReplicator no longer sets IDs, start an IdSetter.
+        queue->getMessageInterceptors().add(
+            boost::shared_ptr<IdSetter>(new IdSetter(maxId+1)));
+        // Process auto-deletes
+        if (queue->isAutoDelete() && subscribed) {
+            // Make a temporary shared_ptr to prevent premature deletion of queue.
+            // Otherwise scheduleAutoDelete can call this->destroy, which resets this->queue
+            // which could delete the queue while it's still running it's destroyed logic.
+            boost::shared_ptr<Queue> q(queue);
+            q->releaseFromUse();
+            q->scheduleAutoDelete();
+        }
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1550819&r1=1550818&r2=1550819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Fri Dec 13 19:30:12 2013
@@ -85,8 +85,6 @@ class QueueReplicator : public broker::E
 
     boost::shared_ptr<broker::Queue> getQueue() const { return queue; }
 
-    ReplicationId getMaxId();
-
     // No-op unused Exchange virtual functions.
     bool bind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);
     bool unbind(boost::shared_ptr<broker::Queue>, const std::string&, const framing::FieldTable*);

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshot.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshot.h?rev=1550819&r1=1550818&r2=1550819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshot.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueSnapshot.h Fri Dec 13 19:30:12 2013
@@ -53,7 +53,7 @@ class  QueueSnapshot : public broker::Qu
 
     void requeued(const broker::Message&) {}
 
-    ReplicationIdSet snapshot() {
+    ReplicationIdSet getSnapshot() {
         sys::Mutex::ScopedLock l(lock);
         return set;
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1550819&r1=1550818&r2=1550819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Fri Dec 13 19:30:12 2013
@@ -22,7 +22,7 @@
 #include "Event.h"
 #include "IdSetter.h"
 #include "QueueGuard.h"
-#include "QueueSnapshots.h"
+#include "QueueSnapshot.h"
 #include "ReplicatingSubscription.h"
 #include "TxReplicatingSubscription.h"
 #include "Primary.h"
@@ -129,17 +129,6 @@ void ReplicatingSubscription::initialize
         info.printId(os) << ": ";
         logPrefix = os.str();
 
-        // If this is a non-cluster standalone replication then we need to
-        // set up an IdSetter if there is not already one.
-        boost::shared_ptr<IdSetter> idSetter;
-        queue->getMessageInterceptors().each(
-            boost::bind(&copyIf, _1, boost::ref(idSetter)));
-        if (!idSetter) {
-            QPID_LOG(debug, logPrefix << "Standalone replication");
-            queue->getMessageInterceptors().add(
-                boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), 1)));
-        }
-
         // If there's already a guard (we are in failover) use it, else create one.
         if (primary) guard = primary->getGuard(queue, info);
         if (!guard) guard.reset(new QueueGuard(*queue, info));
@@ -152,14 +141,14 @@ void ReplicatingSubscription::initialize
         // between the snapshot and attaching the observer.
         queue->getObservers().add(
             boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
-        boost::shared_ptr<QueueSnapshot> snapshot = haBroker.getQueueSnapshots()->get(queue);
+        boost::shared_ptr<QueueSnapshot> snapshot = queue->getObservers().findType<QueueSnapshot>();
         // There may be no snapshot if the queue is being deleted concurrently.
         if (!snapshot) {
             queue->getObservers().remove(
                 boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
             throw ResourceDeletedException(logPrefix+"Can't subscribe, queue deleted");
         }
-        ReplicationIdSet primaryIds = snapshot->snapshot();
+        ReplicationIdSet primaryIds = snapshot->getSnapshot();
         std::string backupStr = getArguments().getAsString(ReplicatingSubscription::QPID_ID_SET);
         ReplicationIdSet backupIds;
         if (!backupStr.empty()) backupIds = decodeStr<ReplicationIdSet>(backupStr);

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1550819&r1=1550818&r2=1550819&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Fri Dec 13 19:30:12 2013
@@ -272,33 +272,34 @@ class ReplicationTests(HaBrokerTest):
 
     def test_standalone_queue_replica(self):
         """Test replication of individual queues outside of cluster mode"""
-        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
-        try:
-            primary = HaBroker(self, name="primary", ha_cluster=False,
-                               args=["--ha-queue-replication=yes"]);
-            pc = primary.connect()
-            ps = pc.session().sender("q;{create:always}")
-            pr = pc.session().receiver("q;{create:always}")
-            backup = HaBroker(self, name="backup", ha_cluster=False,
-                              args=["--ha-queue-replication=yes"])
-            br = backup.connect().session().receiver("q;{create:always}")
+        primary = HaBroker(self, name="primary", ha_cluster=False,
+                           args=["--ha-queue-replication=yes"]);
+        pc = primary.connect()
+        ps = pc.session().sender("q;{create:always}")
+        pr = pc.session().receiver("q;{create:always}")
+        backup = HaBroker(self, name="backup", ha_cluster=False,
+                          args=["--ha-queue-replication=yes"])
+        bs = backup.connect().session()
+        br = bs.receiver("q;{create:always}")
+
+        def srange(*args): return [str(i) for i in xrange(*args)]
+
+        for m in srange(3): ps.send(m)
+        # Set up replication with qpid-ha
+        backup.replicate(primary.host_port(), "q")
+        backup.assert_browse_backup("q", srange(3))
+        for m in srange(3,6): ps.send(str(m))
+        backup.assert_browse_backup("q", srange(6))
+        self.assertEqual("0", pr.fetch().content)
+        pr.session.acknowledge()
+        backup.assert_browse_backup("q", srange(1,6))
+
+        # Set up replication with qpid-config
+        ps2 = pc.session().sender("q2;{create:always}")
+        backup.config_replicate(primary.host_port(), "q2");
+        ps2.send("x", timeout=1)
+        backup.assert_browse_backup("q2", ["x"])
 
-            # Set up replication with qpid-ha
-            backup.replicate(primary.host_port(), "q")
-            ps.send("a", timeout=1)
-            backup.assert_browse_backup("q", ["a"])
-            ps.send("b", timeout=1)
-            backup.assert_browse_backup("q", ["a", "b"])
-            self.assertEqual("a", pr.fetch().content)
-            pr.session.acknowledge()
-            backup.assert_browse_backup("q", ["b"])
-
-            # Set up replication with qpid-config
-            ps2 = pc.session().sender("q2;{create:always}")
-            backup.config_replicate(primary.host_port(), "q2");
-            ps2.send("x", timeout=1)
-            backup.assert_browse_backup("q2", ["x"])
-        finally: l.restore()
 
     def test_standalone_queue_replica_failover(self):
         """Test individual queue replication from a cluster to a standalone



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org