You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2013/06/17 16:19:12 UTC

svn commit: r1493771 [2/2] - in /qpid/trunk/qpid/cpp: include/qpid/types/ src/ src/qpid/broker/ src/qpid/ha/ src/qpid/types/ src/tests/

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.h Mon Jun 17 14:19:10 2013
@@ -25,8 +25,9 @@
 #include "ReplicationTest.h"
 #include "BrokerInfo.h"
 #include "types.h"
+#include "hash.h"
+#include "qpid/sys/unordered_map.h"
 #include <set>
-#include <map>
 
 namespace qpid {
 
@@ -58,17 +59,12 @@ class RemoteBackup
     RemoteBackup(const BrokerInfo&, broker::Connection*);
     ~RemoteBackup();
 
-    /** Set all queues in the registry as catch-up queues.
-     *@createGuards if true create guards also, if false guards are created on demand.
-     */
-    void setCatchupQueues(broker::QueueRegistry&, bool createGuards);
-
     /** Return guard associated with a queue. Used to create ReplicatingSubscription. */
     GuardPtr guard(const QueuePtr&);
 
     /** Is the remote backup connected? */
     void setConnection(broker::Connection* c) { connection = c; }
-    bool isConnected() const { return connection; }
+    broker::Connection* getConnection() const { return connection; }
 
     /** ReplicatingSubscription associated with queue is ready.
      * Note: may set isReady()
@@ -90,18 +86,26 @@ class RemoteBackup
     /**Cancel all queue guards, called if we are timed out. */
     void cancel();
 
+    /** Set a catch-up queue for this backup.
+     *@createGuard if true create a guard immediately.
+     */
+    void catchupQueue(const QueuePtr&, bool createGuard);
+
     BrokerInfo getBrokerInfo() const { return brokerInfo; }
+
+    void startCatchup() { started = true; }
+
   private:
-    typedef std::map<QueuePtr, GuardPtr> GuardMap;
+    typedef qpid::sys::unordered_map<QueuePtr, GuardPtr,
+                                     SharedPtrHasher<broker::Queue> > GuardMap;
     typedef std::set<QueuePtr> QueueSet;
 
-    void catchupQueue(const QueuePtr&, bool createGuard);
-
     std::string logPrefix;
     BrokerInfo brokerInfo;
     ReplicationTest replicationTest;
     GuardMap guards;
     QueueSet catchupQueues;
+    bool started;
     broker::Connection* connection;
     bool reportedReady;
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Mon Jun 17 14:19:10 2013
@@ -20,12 +20,16 @@
  */
 
 #include "makeMessage.h"
+#include "IdSetter.h"
 #include "QueueGuard.h"
-#include "QueueRange.h"
 #include "QueueReplicator.h"
+#include "QueueSnapshots.h"
 #include "ReplicatingSubscription.h"
 #include "Primary.h"
+#include "HaBroker.h"
+#include "qpid/assert.h"
 #include "qpid/broker/Queue.h"
+#include "qpid/broker/QueueObserver.h"
 #include "qpid/broker/SessionContext.h"
 #include "qpid/broker/ConnectionState.h"
 #include "qpid/broker/amqp_0_10/MessageTransfer.h"
@@ -35,6 +39,7 @@
 #include "qpid/types/Uuid.h"
 #include <sstream>
 
+
 namespace qpid {
 namespace ha {
 
@@ -45,53 +50,20 @@ using sys::Mutex;
 using broker::amqp_0_10::MessageTransfer;
 
 const string ReplicatingSubscription::QPID_REPLICATING_SUBSCRIPTION("qpid.ha-replicating-subscription");
-const string ReplicatingSubscription::QPID_BACK("qpid.ha-back");
-const string ReplicatingSubscription::QPID_FRONT("qpid.ha-front");
 const string ReplicatingSubscription::QPID_BROKER_INFO("qpid.ha-broker-info");
+const string ReplicatingSubscription::QPID_ID_SET("qpid.ha-info");
 
-namespace {
-const string DOLLAR("$");
-const string INTERNAL("-internal");
-} // namespace
-
-// Scan the queue for gaps and add them to the subscriptions dequed set.
-class DequeueScanner
-{
+class ReplicatingSubscription::QueueObserver : public broker::QueueObserver {
   public:
-    DequeueScanner(
-        ReplicatingSubscription& rs,
-        SequenceNumber front_,
-        SequenceNumber back_    // Inclusive
-    ) : subscription(rs), front(front_), back(back_)
-    {
-        assert(front <= back);
-        // INVARIANT deques have been added for positions <= at.
-        at = front - 1;
-    }
-
-    void operator()(const Message& m) {
-        if (m.getSequence() >= front && m.getSequence() <= back) {
-            if (m.getSequence() > at+1) subscription.dequeued(at+1, m.getSequence()-1);
-            at = m.getSequence();
-        }
-    }
-
-    // Must call after scanning the queue.
-    void finish() {
-        if (at < back) subscription.dequeued(at+1, back);
-    }
-
+    QueueObserver(ReplicatingSubscription& rs_) : rs(rs_) {}
+    void enqueued(const broker::Message&) {}
+    void dequeued(const broker::Message& m) { rs.dequeued(m.getReplicationId()); }
+    void acquired(const broker::Message&) {}
+    void requeued(const broker::Message&) {}
   private:
-    ReplicatingSubscription& subscription;
-    SequenceNumber front;
-    SequenceNumber back;
-    SequenceNumber at;
+    ReplicatingSubscription& rs;
 };
 
-string mask(const string& in)
-{
-    return DOLLAR + in + INTERNAL;
-}
 
 /* Called by SemanticState::consume to create a consumer */
 boost::shared_ptr<broker::SemanticState::ConsumerImpl>
@@ -110,6 +82,7 @@ ReplicatingSubscription::Factory::create
     boost::shared_ptr<ReplicatingSubscription> rs;
     if (arguments.isSet(QPID_REPLICATING_SUBSCRIPTION)) {
         rs.reset(new ReplicatingSubscription(
+                     haBroker,
                      parent, name, queue, ack, acquire, exclusive, tag,
                      resumeId, resumeTtl, arguments));
         rs->initialize();
@@ -117,7 +90,15 @@ ReplicatingSubscription::Factory::create
     return rs;
 }
 
+namespace {
+void copyIf(boost::shared_ptr<MessageInterceptor> from, boost::shared_ptr<IdSetter>& to) {
+    boost::shared_ptr<IdSetter> result = boost::dynamic_pointer_cast<IdSetter>(from);
+    if (result) to = result;
+}
+} // namespace
+
 ReplicatingSubscription::ReplicatingSubscription(
+    HaBroker& hb,
     SemanticState* parent,
     const string& name,
     Queue::shared_ptr queue,
@@ -130,7 +111,8 @@ ReplicatingSubscription::ReplicatingSubs
     const framing::FieldTable& arguments
 ) : ConsumerImpl(parent, name, queue, ack, REPLICATOR, exclusive, tag,
                  resumeId, resumeTtl, arguments),
-    ready(false)
+    position(0), ready(false), cancelled(false),
+    haBroker(hb)
 {
     try {
         FieldTable ft;
@@ -140,64 +122,57 @@ ReplicatingSubscription::ReplicatingSubs
 
         // Set a log prefix message that identifies the remote broker.
         ostringstream os;
-        os << "Primary " << queue->getName() << "@" << info << ": ";
+        os << "Subscription to " << queue->getName() << " at " << info << ": ";
         logPrefix = os.str();
 
-        // NOTE: Once the guard is attached we can have concurrent
-        // calls to dequeued so we need to lock use of this->dequeues.
-        //
-        // However we must attach the guard _before_ we scan for
-        // initial dequeues to be sure we don't miss any dequeues
-        // between the scan and attaching the guard.
+        // If this is a non-cluster standalone replication then we need to
+        // set up an IdSetter if there is not already one.
+        boost::shared_ptr<IdSetter> idSetter;
+        queue->getMessageInterceptors().each(
+            boost::bind(&copyIf, _1, boost::ref(idSetter)));
+        if (!idSetter) {
+            QPID_LOG(debug, logPrefix << "Standalone replication");
+            queue->getMessageInterceptors().add(
+                boost::shared_ptr<IdSetter>(new IdSetter(queue->getName(), 1)));
+        }
+
+        // If there's already a guard (we are in failover) use it, else create one.
         if (Primary::get()) guard = Primary::get()->getGuard(queue, info);
         if (!guard) guard.reset(new QueueGuard(*queue, info));
-        guard->attach(*this);
 
-        QueueRange backup(arguments); // Remote backup range.
-        QueueRange backupOriginal(backup);
-        QueueRange primary(guard->getRange()); // Unguarded range when the guard was set.
-        backupPosition = backup.back;
-
-        // Sync backup and primary queues, don't send messages already on the backup
-
-        if (backup.front > primary.front || // Missing messages at front
-            backup.back < primary.front ||  // No overlap
-            primary.empty() || backup.empty()) // Empty
+        // NOTE: Once the observer is attached we can have concurrent
+        // calls to dequeued so we need to lock use of this->dequeues.
+        //
+        // However we must attach the observer _before_ we snapshot for
+        // initial dequeues to be sure we don't miss any dequeues
+        // between the snapshot and attaching the observer.
+        observer.reset(new QueueObserver(*this));
+        queue->addObserver(observer);
+        ReplicationIdSet primary = haBroker.getQueueSnapshots()->get(queue)->snapshot();
+        std::string backupStr = arguments.getAsString(ReplicatingSubscription::QPID_ID_SET);
+        ReplicationIdSet backup;
+        if (!backupStr.empty()) backup = decodeStr<ReplicationIdSet>(backupStr);
+
+        // Initial dequeues are messages on backup but not on primary.
+        ReplicationIdSet initDequeues = backup - primary;
+        QueuePosition front,back;
+        queue->getRange(front, back, broker::REPLICATOR); // Outside lock, getRange locks queue
         {
-            // No useful overlap - erase backup and start from the beginning
-            if (!backup.empty()) dequeued(backup.front, backup.back);
-            position = primary.front-1;
+            sys::Mutex::ScopedLock l(lock); // Concurrent calls to dequeued()
+            dequeues += initDequeues;       // Messages on backup that are not on primary.
+            skip = backup - initDequeues;   // Messages already on the backup.
+
+            // Queue front is moving but we know this subscriptions will start at a
+            // position >= front so if front is safe then position must be.
+            position = front;
+
+            QPID_LOG(debug, logPrefix << "Subscribed: front " << front
+                     << ", back " << back
+                     << ", start " << position
+                     << ", guarded " << guard->getFirst()
+                     << ", on backup " << skip);
+            checkReady(l);
         }
-        else {  // backup and primary do overlap.
-            // Remove messages from backup that are not in primary.
-            if (primary.back < backup.back) {
-                dequeued(primary.back+1, backup.back); // Trim excess messages at back
-                backup.back = primary.back;
-            }
-            if (backup.front < primary.front) {
-                dequeued(backup.front, primary.front-1); // Trim excess messages at front
-                backup.front = primary.front;
-            }
-            DequeueScanner scan(*this, backup.front, backup.back);
-            // FIXME aconway 2012-06-15: Optimize queue traversal, only in range.
-            queue->eachMessage(boost::ref(scan)); // Remove missing messages in between.
-            scan.finish();
-            position = backup.back;
-            //move cursor to position
-            queue->seek(*this, position);
-        }
-        // NOTE: we are assuming that the messages that are on the backup are
-        // consistent with those on the primary. If the backup is a replica
-        // queue and hasn't been tampered with then that will be the case.
-
-        QPID_LOG(debug, logPrefix << "Subscribed:"
-                 << " backup:" << backupOriginal << " adjusted backup:" << backup
-                 << " primary:" << primary
-                 << " catch-up: " << position << "-" << primary.back
-                 << "(" << primary.back-position << ")");
-
-        // Check if we are ready yet.
-        if (guard->subscriptionStart(position)) setReady();
     }
     catch (const std::exception& e) {
         QPID_LOG(error, logPrefix << "Creation error: " << e.what()
@@ -208,6 +183,7 @@ ReplicatingSubscription::ReplicatingSubs
 
 ReplicatingSubscription::~ReplicatingSubscription() {}
 
+
 // Called in subscription's connection thread when the subscription is created.
 // Called separate from ctor because sending events requires
 // shared_from_this
@@ -215,12 +191,9 @@ ReplicatingSubscription::~ReplicatingSub
 void ReplicatingSubscription::initialize() {
     try {
         Mutex::ScopedLock l(lock); // Note dequeued() can be called concurrently.
-
-        // Send initial dequeues and position to the backup.
+        // Send initial dequeues to the backup.
         // There must be a shared_ptr(this) when sending.
         sendDequeueEvent(l);
-        sendPositionEvent(position, l);
-        backupPosition = position;
     }
     catch (const std::exception& e) {
         QPID_LOG(error, logPrefix << "Initialization error: " << e.what()
@@ -229,53 +202,64 @@ void ReplicatingSubscription::initialize
     }
 }
 
+// True if the next position for the ReplicatingSubscription is a guarded position.
+bool ReplicatingSubscription::isGuarded(sys::Mutex::ScopedLock&) {
+    return position+1 >= guard->getFirst();
+}
+
 // Message is delivered in the subscription's connection thread.
 bool ReplicatingSubscription::deliver(
     const qpid::broker::QueueCursor& c, const qpid::broker::Message& m)
 {
+    Mutex::ScopedLock l(lock);
+    ReplicationId id = m.getReplicationId();
+    position = m.getSequence();
     try {
-        QPID_LOG(trace, logPrefix << "Replicating " << m.getSequence());
-        {
-            Mutex::ScopedLock l(lock);
-            position = m.getSequence();
-
-            // m.getSequence() is the position of the new message on local queue.
-            // backupPosition is latest position on backup queue before enqueueing
-            if (m.getSequence() <= backupPosition)
-                throw Exception(
-                    QPID_MSG(logPrefix << "Expected position >  " << backupPosition
-                             << " but got " << m.getSequence()));
-            if (m.getSequence() - backupPosition > 1) {
-                // Position has advanced because of messages dequeued ahead of us.
-                // Send the position before message was enqueued.
-                sendPositionEvent(m.getSequence()-1, l);
-            }
-            // Backup will automatically advance by 1 on delivery of message.
-            backupPosition = m.getSequence();
+        bool result = false;
+        if (skip.contains(id)) {
+            skip -= id;
+            guard->complete(id); // This will never be acknowledged.
+            result = false;
+        }
+        else {
+            QPID_LOG(trace, logPrefix << "Replicated " << LogMessageId(*getQueue(), m));
+            // Only consider unguarded messages for ready status.
+            if (!ready && !isGuarded(l)) unacked += id;
+            sendIdEvent(id, l);
+            result = ConsumerImpl::deliver(c, m);
         }
-        return ConsumerImpl::deliver(c, m);
+        checkReady(l);
+        return result;
     } catch (const std::exception& e) {
-        QPID_LOG(critical, logPrefix << "Error replicating " << m.getSequence()
+        QPID_LOG(critical, logPrefix << "Error replicating " << LogMessageId(*getQueue(), m)
                  << ": " << e.what());
         throw;
     }
 }
 
-void ReplicatingSubscription::setReady() {
-    {
-        Mutex::ScopedLock l(lock);
-        if (ready) return;
+/**
+ *@param position: must be <= last position seen by subscription.
+ */
+void ReplicatingSubscription::checkReady(sys::Mutex::ScopedLock& l) {
+    if (!ready && isGuarded(l) && unacked.empty()) {
         ready = true;
+        sys::Mutex::ScopedUnlock u(lock);
+        // Notify Primary that a subscription is ready.
+        QPID_LOG(debug, logPrefix << "Caught up");
+        if (Primary::get()) Primary::get()->readyReplica(*this);
     }
-    // Notify Primary that a subscription is ready.
-    QPID_LOG(debug, logPrefix << "Caught up");
-    if (Primary::get()) Primary::get()->readyReplica(*this);
 }
 
 // Called in the subscription's connection thread.
 void ReplicatingSubscription::cancel()
 {
+    {
+        Mutex::ScopedLock l(lock);
+        if (cancelled) return;
+        cancelled = true;
+    }
     QPID_LOG(debug, logPrefix << "Cancelled");
+    getQueue()->removeObserver(observer);
     guard->cancel();
     ConsumerImpl::cancel();
 }
@@ -283,10 +267,15 @@ void ReplicatingSubscription::cancel()
 // Consumer override, called on primary in the backup's IO thread.
 void ReplicatingSubscription::acknowledged(const broker::DeliveryRecord& r) {
     // Finish completion of message, it has been acknowledged by the backup.
-    QPID_LOG(trace, logPrefix << "Acknowledged " << r.getMessageId());
-    guard->complete(r.getMessageId());
-    // If next message is protected by the guard then we are ready
-    if (r.getMessageId() >= guard->getRange().back) setReady();
+    ReplicationId id = r.getReplicationId();
+    QPID_LOG(trace, logPrefix << "Acknowledged " <<
+             LogMessageId(*getQueue(), r.getMessageId(), r.getReplicationId()));
+    guard->complete(id);
+    {
+        Mutex::ScopedLock l(lock);
+        unacked -= id;
+        checkReady(l);
+    }
     ConsumerImpl::acknowledged(r);
 }
 
@@ -295,59 +284,36 @@ void ReplicatingSubscription::sendDequeu
 {
     if (dequeues.empty()) return;
     QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
-    string buf(dequeues.encodedSize(),'\0');
-    framing::Buffer buffer(&buf[0], buf.size());
-    dequeues.encode(buffer);
+    string buffer = encodeStr(dequeues);
     dequeues.clear();
-    buffer.reset();
-    {
-        Mutex::ScopedUnlock u(lock);
-        sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
-    }
+    sendEvent(QueueReplicator::DEQUEUE_EVENT_KEY, buffer, l);
 }
 
-// Called via QueueObserver::dequeued override on guard.
 // Called after the message has been removed
 // from the deque and under the messageLock in the queue. Called in
 // arbitrary connection threads.
-void ReplicatingSubscription::dequeued(const Message& m)
+void ReplicatingSubscription::dequeued(ReplicationId id)
 {
-    QPID_LOG(trace, logPrefix << "Dequeued " << m.getSequence());
+    QPID_LOG(trace, logPrefix << "Dequeued ID " << id);
     {
         Mutex::ScopedLock l(lock);
-        dequeues.add(m.getSequence());
+        dequeues.add(id);
     }
     notify();                   // Ensure a call to doDispatch
 }
 
-// Called during construction while scanning for initial dequeues.
-void ReplicatingSubscription::dequeued(SequenceNumber first, SequenceNumber last) {
-    QPID_LOG(trace, logPrefix << "Initial dequeue [" << first << ", " << last << "]");
-    {
-        Mutex::ScopedLock l(lock);
-        dequeues.add(first,last);
-    }
-}
 
 // Called with lock held. Called in subscription's connection thread.
-void ReplicatingSubscription::sendPositionEvent(SequenceNumber pos, Mutex::ScopedLock& l)
+void ReplicatingSubscription::sendIdEvent(ReplicationId pos, Mutex::ScopedLock& l)
 {
-    if (pos == backupPosition) return; // No need to send.
-    QPID_LOG(trace, logPrefix << "Sending position " << pos << ", was " << backupPosition);
-    string buf(pos.encodedSize(),'\0');
-    framing::Buffer buffer(&buf[0], buf.size());
-    pos.encode(buffer);
-    buffer.reset();
-    {
-        Mutex::ScopedUnlock u(lock);
-        sendEvent(QueueReplicator::POSITION_EVENT_KEY, buffer, l);
-    }
+    sendEvent(QueueReplicator::ID_EVENT_KEY, encodeStr(pos), l);
 }
 
 void ReplicatingSubscription::sendEvent(const std::string& key,
-                                        const framing::Buffer& buffer,
+                                        const std::string& buffer,
                                         Mutex::ScopedLock&)
 {
+    Mutex::ScopedUnlock u(lock);
     broker::Message message = makeMessage(buffer);
     MessageTransfer& transfer = MessageTransfer::get(message);
     DeliveryProperties* props =
@@ -370,7 +336,6 @@ bool ReplicatingSubscription::doDispatch
         return ConsumerImpl::doDispatch();
     }
     catch (const std::exception& e) {
-        // FIXME aconway 2012-10-05: detect queue deletion, no warning.
         QPID_LOG(warning, logPrefix << " exception in dispatch: " << e.what());
         return false;
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Mon Jun 17 14:19:10 2013
@@ -43,6 +43,7 @@ class Buffer;
 
 namespace ha {
 class QueueGuard;
+class HaBroker;
 
 /**
  * A susbcription that replicates to a remote backup.
@@ -61,30 +62,36 @@ class QueueGuard;
  *
  * Lifecycle: broker::Queue holds shared_ptrs to this as a consumer.
  *
- * Lock Hierarchy: ReplicatingSubscription MUST NOT call QueueGuard with its
- * lock held QueueGuard MAY call ReplicatingSubscription with its lock held.
+ *  ReplicatingSubscription makes calls on QueueGuard, but not vice-versa.
  */
 class ReplicatingSubscription : public broker::SemanticState::ConsumerImpl
 {
   public:
     typedef broker::SemanticState::ConsumerImpl ConsumerImpl;
 
-    struct Factory : public broker::ConsumerFactory {
+    class Factory : public broker::ConsumerFactory {
+      public:
+        Factory(HaBroker& hb) : haBroker(hb) {}
+
+        HaBroker& getHaBroker() const { return haBroker; }
+
         boost::shared_ptr<broker::SemanticState::ConsumerImpl> create(
             broker::SemanticState* parent,
             const std::string& name, boost::shared_ptr<broker::Queue> ,
             bool ack, bool acquire, bool exclusive, const std::string& tag,
             const std::string& resumeId, uint64_t resumeTtl,
             const framing::FieldTable& arguments);
+      private:
+        HaBroker& haBroker;
     };
 
     // Argument names for consume command.
     static const std::string QPID_REPLICATING_SUBSCRIPTION;
-    static const std::string QPID_BACK;
-    static const std::string QPID_FRONT;
     static const std::string QPID_BROKER_INFO;
+    static const std::string QPID_ID_SET;
 
-    ReplicatingSubscription(broker::SemanticState* parent,
+    ReplicatingSubscription(HaBroker& haBroker,
+                            broker::SemanticState* parent,
                             const std::string& name, boost::shared_ptr<broker::Queue> ,
                             bool ack, bool acquire, bool exclusive, const std::string& tag,
                             const std::string& resumeId, uint64_t resumeTtl,
@@ -92,12 +99,6 @@ class ReplicatingSubscription : public b
 
     ~ReplicatingSubscription();
 
-    // Called via QueueGuard::dequeued.
-    //@return true if the message requires completion.
-    void dequeued(const broker::Message&);
-
-    // Called during initial scan for dequeues.
-    void dequeued(framing::SequenceNumber first, framing::SequenceNumber last);
 
     // Consumer overrides.
     bool deliver(const broker::QueueCursor& cursor, const broker::Message& msg);
@@ -121,19 +122,27 @@ class ReplicatingSubscription : public b
     bool doDispatch();
 
   private:
+    class QueueObserver;
+  friend class QueueObserver;
+
     std::string logPrefix;
-    framing::SequenceSet dequeues;
-    framing::SequenceNumber position;
-    framing::SequenceNumber backupPosition;
+    QueuePosition position;
+    ReplicationIdSet dequeues;  // Dequeues to be sent in next dequeue event.
+    ReplicationIdSet skip;      // Messages already on backup will be skipped.
+    ReplicationIdSet unacked;   // Replicated but un-acknowledged.
     bool ready;
+    bool cancelled;
     BrokerInfo info;
     boost::shared_ptr<QueueGuard> guard;
+    HaBroker& haBroker;
+    boost::shared_ptr<QueueObserver> observer;
 
+    bool isGuarded(sys::Mutex::ScopedLock&);
+    void dequeued(ReplicationId);
     void sendDequeueEvent(sys::Mutex::ScopedLock&);
-    void sendPositionEvent(framing::SequenceNumber, sys::Mutex::ScopedLock&);
-    void setReady();
-    void sendEvent(const std::string& key, const framing::Buffer&,
-                   sys::Mutex::ScopedLock&);
+    void sendIdEvent(ReplicationId, sys::Mutex::ScopedLock&);
+    void sendEvent(const std::string& key, const std::string& data, sys::Mutex::ScopedLock&);
+    void checkReady(sys::Mutex::ScopedLock&);
   friend struct Factory;
 };
 

Copied: qpid/trunk/qpid/cpp/src/qpid/ha/hash.h (from r1493713, qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/hash.h?p2=qpid/trunk/qpid/cpp/src/qpid/ha/hash.h&p1=qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h&r1=1493713&r2=1493771&rev=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/hash.h Mon Jun 17 14:19:10 2013
@@ -1,5 +1,5 @@
-#ifndef QPID_HA_MAKEMESSAGE_H
-#define QPID_HA_MAKEMESSAGE_H
+#ifndef QPID_HA_HASH_H
+#define QPID_HA_HASH_H
 
 /*
  *
@@ -22,22 +22,21 @@
  *
  */
 
-#include "qpid/broker/Message.h"
+#include <boost/shared_ptr.hpp>
 
 namespace qpid {
-namespace framing {
-class Buffer;
-}
 namespace ha {
 
-/**
- * Create internal messages used by HA components.
- */
-broker::Message makeMessage(
-    const framing::Buffer& content,
-    const std::string& destination=std::string()
-);
+template<class T> struct TrivialHasher {
+    size_t  operator()(T value) const  { return static_cast<size_t>(value); }
+};
+
+template<class T> struct SharedPtrHasher {
+    size_t  operator()(const boost::shared_ptr<T>& ptr) const  {
+        return reinterpret_cast<size_t>(ptr.get());
+    }
+};
 
 }} // namespace qpid::ha
 
-#endif  /*!QPID_HA_MAKEMESSAGE_H*/
+#endif  /*!QPID_HA_HASH_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.cpp Mon Jun 17 14:19:10 2013
@@ -54,4 +54,9 @@ broker::Message makeMessage(const framin
     return broker::Message(transfer, 0);
 }
 
+broker::Message makeMessage(const std::string& content, const std::string& destination) {
+    framing::Buffer buffer(const_cast<char*>(&content[0]), content.size());
+    return makeMessage(buffer, destination);
+}
+
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/makeMessage.h Mon Jun 17 14:19:10 2013
@@ -23,6 +23,10 @@
  */
 
 #include "qpid/broker/Message.h"
+#include "qpid/framing/Buffer.h"
+#include <string>
+
+/** Utilities for creating messages used by HA internally. */
 
 namespace qpid {
 namespace framing {
@@ -38,6 +42,25 @@ broker::Message makeMessage(
     const std::string& destination=std::string()
 );
 
+broker::Message makeMessage(const std::string& content,
+                            const std::string& destination=std::string());
+
+/** Encode value as a string. */
+template <class T> std::string encodeStr(const T& value) {
+    std::string encoded(value.encodedSize(), '\0');
+    framing::Buffer buffer(&encoded[0], encoded.size());
+    value.encode(buffer);
+    return encoded;
+}
+
+/** Decode value from a string. */
+template <class T> T decodeStr(const std::string& encoded) {
+    framing::Buffer buffer(const_cast<char*>(&encoded[0]), encoded.size());
+    T value;
+    value.decode(buffer);
+    return value;
+}
+
 }} // namespace qpid::ha
 
 #endif  /*!QPID_HA_MAKEMESSAGE_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp Mon Jun 17 14:19:10 2013
@@ -21,6 +21,8 @@
 
 #include "types.h"
 #include "qpid/Msg.h"
+#include "qpid/broker/Message.h"
+#include "qpid/broker/Queue.h"
 #include "qpid/Exception.h"
 #include <algorithm>
 #include <iostream>
@@ -83,4 +85,19 @@ ostream& operator<<(ostream& o, const Id
     return o;
 }
 
+LogMessageId::LogMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id) :
+    queue(q.getName()), position(pos), replicationId(id) {}
+
+LogMessageId::LogMessageId(const broker::Queue& q, const broker::Message& m) :
+    queue(q.getName()), position(m.getSequence()), replicationId(m.getReplicationId()) {}
+
+LogMessageId::LogMessageId(const std::string& q, const broker::Message& m) :
+    queue(q), position(m.getSequence()), replicationId(m.getReplicationId()) {}
+
+std::ostream& operator<<(std::ostream& o, const LogMessageId& m) {
+    return o  << m.queue << "[" << m.position << "]=" << m.replicationId;
+}
+
+
+
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/types.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/types.h?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/types.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/types.h Mon Jun 17 14:19:10 2013
@@ -24,12 +24,20 @@
 
 #include "qpid/types/Variant.h"
 #include "qpid/types/Uuid.h"
+#include "qpid/framing/SequenceSet.h"
+
+#include <boost/shared_ptr.hpp>
+
 #include <string>
 #include <set>
 #include <iosfwd>
 
 namespace qpid {
 
+namespace broker {
+class Message;
+class Queue;
+}
 namespace framing {
 class FieldTable;
 }
@@ -106,5 +114,23 @@ class IdSet : public std::set<types::Uui
 
 std::ostream& operator<<(std::ostream& o, const IdSet& ids);
 
+// Use type names to distinguish Positions from Replication Ids
+typedef framing::SequenceNumber QueuePosition;
+typedef framing::SequenceNumber ReplicationId;
+typedef framing::SequenceSet QueuePositionSet;
+typedef framing::SequenceSet ReplicationIdSet;
+
+/** Helper for logging message ID  */
+struct LogMessageId {
+    typedef boost::shared_ptr<broker::Queue> QueuePtr;
+    LogMessageId(const broker::Queue& q, QueuePosition pos, ReplicationId id);
+    LogMessageId(const broker::Queue& q, const broker::Message& m);
+    LogMessageId(const std::string& q, const broker::Message& m);
+    const std::string& queue;
+    QueuePosition position;
+    ReplicationId replicationId;
+};
+std::ostream& operator<<(std::ostream&, const LogMessageId&);
+
 }} // qpid::ha
 #endif  /*!QPID_HA_ENUM_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/types/Uuid.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/types/Uuid.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/types/Uuid.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/types/Uuid.cpp Mon Jun 17 14:19:10 2013
@@ -144,4 +144,12 @@ std::string Uuid::str() const
     return os.str();
 }
 
+size_t Uuid::hash() const {
+    std::size_t seed = 0;
+    for(size_t i = 0; i < SIZE; ++i)
+        seed ^= static_cast<std::size_t>(bytes[i]) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
+    return seed;
+}
+
+
 }} // namespace qpid::types

Modified: qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp (original)
+++ qpid/trunk/qpid/cpp/src/tests/DeliveryRecordTest.cpp Mon Jun 17 14:19:10 2013
@@ -49,7 +49,7 @@ QPID_AUTO_TEST_CASE(testSort)
 
     list<DeliveryRecord> records;
     for (list<SequenceNumber>::iterator i = ids.begin(); i != ids.end(); i++) {
-        DeliveryRecord r(QueueCursor(CONSUMER), framing::SequenceNumber(), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false);
+        DeliveryRecord r(QueueCursor(CONSUMER), framing::SequenceNumber(), SequenceNumber(), Queue::shared_ptr(), "tag", Consumer::shared_ptr(), false, false, false);
         r.setId(*i);
         records.push_back(r);
     }

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Mon Jun 17 14:19:10 2013
@@ -107,7 +107,7 @@ class HaBroker(Broker):
         ha_port = ha_port or HaPort(test)
         args = copy(args)
         args += ["--load-module", BrokerTest.ha_lib,
-                 "--log-enable=debug+:ha::",
+                 "--log-enable=trace+:ha::", # FIXME aconway 2013-06-14: debug+
                  # Non-standard settings for faster tests.
                  "--link-maintenance-interval=0.1",
                  # Heartbeat and negotiate time are needed so that a broker wont
@@ -177,7 +177,7 @@ acl allow all all
                 self._status = self.ha_status()
                 return self._status == status;
             except ConnectionError: return False
-        assert retry(try_get_status, timeout=20), "%s expected=%r, actual=%r"%(
+        assert retry(try_get_status, timeout=5), "%s expected=%r, actual=%r"%(
             self, status, self._status)
 
     def wait_queue(self, queue, timeout=1):

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Mon Jun 17 14:19:10 2013
@@ -18,7 +18,7 @@
 # under the License.
 #
 
-import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest, random
+import os, signal, sys, time, imp, re, subprocess, glob, random, logging, shutil, math, unittest
 import traceback
 from qpid.messaging import Message, SessionError, NotFound, ConnectionError, ReceiverError, Connection, Timeout, Disposition, REJECTED, Empty
 from qpid.datatypes import uuid4, UUID
@@ -100,7 +100,7 @@ class ReplicationTests(HaBrokerTest):
             self.assert_browse_retry(b, prefix+"q1", ["1", "4", prefix+"e1"])
 
             # Verify exchange with replicate=configuration
-            b.sender(prefix+"e2/key2").send(Message(prefix+"e2")) 
+            b.sender(prefix+"e2/key2").send(Message(prefix+"e2"))
             self.assert_browse_retry(b, prefix+"q2", [prefix+"e2"])
 
             b.sender(prefix+"e4/key4").send(Message("drop2")) # Verify unbind.
@@ -284,9 +284,9 @@ class ReplicationTests(HaBrokerTest):
 
             # Set up replication with qpid-ha
             backup.replicate(primary.host_port(), "q")
-            ps.send("a")
+            ps.send("a", timeout=1)
             backup.assert_browse_backup("q", ["a"])
-            ps.send("b")
+            ps.send("b", timeout=1)
             backup.assert_browse_backup("q", ["a", "b"])
             self.assertEqual("a", pr.fetch().content)
             pr.session.acknowledge()
@@ -295,11 +295,11 @@ class ReplicationTests(HaBrokerTest):
             # Set up replication with qpid-config
             ps2 = pc.session().sender("q2;{create:always}")
             backup.config_replicate(primary.host_port(), "q2");
-            ps2.send("x")
+            ps2.send("x", timeout=1)
             backup.assert_browse_backup("q2", ["x"])
         finally: l.restore()
 
-    def test_queue_replica_failover(self):
+    def test_standalone_queue_replica_failover(self):
         """Test individual queue replication from a cluster to a standalone
         backup broker, verify it fails over."""
         l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
@@ -319,6 +319,7 @@ class ReplicationTests(HaBrokerTest):
             backup.assert_browse_backup("q", ["a"])
             ps.send("b")
             backup.assert_browse_backup("q", ["a", "b"])
+            cluster[0].wait_status("ready")
             cluster.bounce(1)
             self.assertEqual("a", pr.fetch().content)
             pr.session.acknowledge()
@@ -331,16 +332,20 @@ class ReplicationTests(HaBrokerTest):
         """Verify that we replicate to an LVQ correctly"""
         cluster = HaCluster(self, 2)
         s = cluster[0].connect().session().sender("lvq; {create:always, node:{x-declare:{arguments:{'qpid.last_value_queue_key':lvq-key}}}}")
-        def send(key,value): s.send(Message(content=value,properties={"lvq-key":key}))
-        for kv in [("a","a-1"),("b","b-1"),("a","a-2"),("a","a-3"),("c","c-1"),("c","c-2")]:
-            send(*kv)
-        cluster[1].assert_browse_backup("lvq", ["b-1", "a-3", "c-2"])
-        send("b","b-2")
-        cluster[1].assert_browse_backup("lvq", ["a-3", "c-2", "b-2"])
-        send("c","c-3")
-        cluster[1].assert_browse_backup("lvq", ["a-3", "b-2", "c-3"])
-        send("d","d-1")
-        cluster[1].assert_browse_backup("lvq", ["a-3", "b-2", "c-3", "d-1"])
+
+        def send(key,value,expect):
+            s.send(Message(content=value,properties={"lvq-key":key}), timeout=1)
+            cluster[1].assert_browse_backup("lvq", expect)
+
+        send("a", "a-1", ["a-1"])
+        send("b", "b-1", ["a-1", "b-1"])
+        send("a", "a-2", ["b-1", "a-2"])
+        send("a", "a-3", ["b-1", "a-3"])
+        send("c", "c-1", ["b-1", "a-3", "c-1"])
+        send("c", "c-2", ["b-1", "a-3", "c-2"])
+        send("b", "b-2", ["a-3", "c-2", "b-2"])
+        send("c", "c-3", ["a-3", "b-2", "c-3"])
+        send("d", "d-1", ["a-3", "b-2", "c-3", "d-1"])
 
     def test_ring(self):
         """Test replication with the ring queue policy"""
@@ -416,8 +421,8 @@ class ReplicationTests(HaBrokerTest):
             def send(self, connection):
                 """Send messages, then acquire one but don't acknowledge"""
                 s = connection.session()
-                for m in range(10): s.sender(self.address).send(str(m))
-                s.receiver(self.address).fetch()
+                for m in range(10): s.sender(self.address).send(str(m), timeout=1)
+                s.receiver(self.address, timeout=1).fetch()
 
             def verify(self, brokertest, backup):
                 backup.assert_browse_backup(self.queue, self.expect, msg=self.queue)
@@ -959,8 +964,7 @@ class LongTests(HaBrokerTest):
                     for s in senders: s.sender.assert_running()
                     for r in receivers: r.receiver.assert_running()
                     checkpoint = [ r.received+100 for r in receivers ]
-                    dead = None
-                    victim = random.randint(0,2)
+                    victim = random.choice([0,1,2,primary]) # Give the primary a better chance.
                     if victim == primary:
                         # Don't kill primary till it is active and the next
                         # backup is ready, otherwise we can lose messages.
@@ -984,13 +988,8 @@ class LongTests(HaBrokerTest):
         finally:
             for s in senders: s.stop()
             for r in receivers: r.stop()
-            unexpected_dead = []
-            for i in xrange(3):
-                if not brokers[i].is_running() and i != dead:
-                    unexpected_dead.append(i)
-                if brokers[i].is_running(): brokers.kill(i, False)
-            if unexpected_dead:
-                raise Exception("Brokers not running: %s"%unexpected_dead)
+            dead = filter(lambda i: not brokers[i].is_running(), xrange(3))
+            if dead: raise Exception("Brokers not running: %s"%dead)
 
     def test_qmf_order(self):
         """QPID 4402:  HA QMF events can be out of order.
@@ -1066,7 +1065,7 @@ class RecoveryTests(HaBrokerTest):
             # Create a queue before the failure.
             s1 = cluster.connect(0).session().sender("q1;{create:always}")
             for b in cluster: b.wait_backup("q1")
-            for i in xrange(10): s1.send(str(i))
+            for i in xrange(10): s1.send(str(i), timeout=0.1)
 
             # Kill primary and 2 backups
             cluster[3].wait_status("ready")
@@ -1125,17 +1124,17 @@ class RecoveryTests(HaBrokerTest):
         cluster[0].wait_status("active") # Primary ready
         for b in cluster[1:3]: b.wait_status("ready") # Backups ready
         for i in [0,1]: cluster.kill(i, False)
-        cluster[2].promote()    # New primary, backups will be 1 and 2
+        cluster[2].promote()    # New primary, expected backup will 1
         cluster[2].wait_status("recovering")
         # Should not go active till the expected backup connects or times out.
         self.assertEqual(cluster[2].ha_status(), "recovering")
-        # Messages should be held expected backup times out
+        # Messages should be held till expected backup times out
         s = cluster[2].connect().session().sender("q;{create:always}")
-        for i in xrange(100): s.send(str(i), sync=False)
+        s.send("foo", sync=False)
         # Verify message held initially.
         try: s.sync(timeout=.01); self.fail("Expected Timeout exception")
         except Timeout: pass
-        s.sync(timeout=1)      # And released after the timeout.
+        s.sync(timeout=1)       # And released after the timeout.
         self.assertEqual(cluster[2].ha_status(), "active")
 
     def test_join_ready_cluster(self):

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark?rev=1493771&r1=1493770&r2=1493771&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark Mon Jun 17 14:19:10 2013
@@ -1,5 +1,5 @@
 #!/bin/sh
-#
+echo#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -26,8 +26,8 @@ REPEAT="--repeat 10"
 QUEUES="-q 6"
 SENDERS="-s 3"
 RECEIVERS="-r 3"
-BROKERS=			# Local broker
-CLIENT_HOSTS=			# No ssh, all clients are local
+BROKERS= # Local broker
+CLIENT_HOSTS= # No ssh, all clients are local
 # Connection options
 TCP_NODELAY=false
 RECONNECT=true



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org