You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/07/23 23:29:01 UTC

svn commit: r1364804 - in /qpid/trunk/qpid/cpp/src/qpid/ha: QueueGuard.cpp QueueGuard.h QueueRange.h

Author: aconway
Date: Mon Jul 23 21:29:01 2012
New Revision: 1364804

URL: http://svn.apache.org/viewvc?rev=1364804&view=rev
Log:
QPID-4159: HA Missing messages in failover test.

QueueGuard was taking its snapshot of the initial queue range *before* it
registered its QueueObserver. That means it was possible to have unguarded messages
between the end of the snapshot and the first position protected by the guard.

Fixed race condition in QueueRange constructor: Must call getPosition() *after*
getFront() since both may be advancing and we want to end up with a valid range
front <= back+1.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueRange.h

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp?rev=1364804&r1=1364803&r2=1364804&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.cpp Mon Jul 23 21:29:01 2012
@@ -50,15 +50,15 @@ class QueueGuard::QueueObserver : public
 
 
 QueueGuard::QueueGuard(broker::Queue& q, const BrokerInfo& info)
-    : queue(q), subscription(0), range(q)
+    : queue(q), subscription(0)
 {
-    // NOTE: The QueueGuard is created before the queue becomes active: either
-    // when a backup is promoted, or when a new queue is created on the primary.
     std::ostringstream os;
     os << "Primary guard " << queue.getName() << "@" << info.getLogId() << ": ";
     logPrefix = os.str();
     observer.reset(new QueueObserver(*this));
     queue.addObserver(observer);
+    // Set range after addObserver so we know that range.back+1 is a guarded position.
+    range = QueueRange(q);
 }
 
 QueueGuard::~QueueGuard() { cancel(); }
@@ -112,6 +112,7 @@ void completeBefore(QueueGuard* guard, S
 
 bool QueueGuard::subscriptionStart(SequenceNumber position) {
    // Complete any messages before or at the ReplicatingSubscription start position.
+   // Those messages are already on the backup.
     if (!delayed.empty() && delayed.front() <= position) {
         // FIXME aconway 2012-06-15: queue iteration, only messages in delayed
         queue.eachMessage(boost::bind(&completeBefore, this, position, _1));

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h?rev=1364804&r1=1364803&r2=1364804&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueGuard.h Mon Jul 23 21:29:01 2012
@@ -83,12 +83,14 @@ class QueueGuard {
      * QueueGuard is created before the queue becomes active: either when a
      * backup is promoted, or when a new queue is created on the primary.
      *
-     * NOTE: The first position protected by the guard is getRange().getBack()+1
+     * NOTE: The first position guaranteed to be protected by the guard is
+     * getRange().getBack()+1. It is possible that the guard has protected
+     * some messages before that point.
      */
     const QueueRange& getRange() const { return range; } // range is immutable, no lock needed.
 
     /** Inform the guard of the stating position for the attached subscription.
-     * Complete messages that will not be seen by the subscriptino.
+     * Complete messages that will not be seen by the subscription.
      *@return true if the subscription has already advanced to a guarded position.
      */
     bool subscriptionStart(framing::SequenceNumber position);
@@ -102,7 +104,7 @@ class QueueGuard {
     framing::SequenceSet delayed;
     ReplicatingSubscription* subscription;
     boost::shared_ptr<QueueObserver> observer;
-    const QueueRange range;
+    QueueRange range;
 };
 }} // namespace qpid::ha
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueRange.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueRange.h?rev=1364804&r1=1364803&r2=1364804&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueRange.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueRange.h Mon Jul 23 21:29:01 2012
@@ -33,17 +33,31 @@ namespace ha {
 
 /**
  * Get the front/back range of a queue or from a ReplicatingSubscription arguments table.
+ *
+ * The *back* of the queue is the position of the latest (most recently pushed)
+ * message on the queue or, if the queue is empty, the back is n-1 where n is
+ * the position that will be assigned to the next message pushed onto the queue.
+ *
+ * The *front* of the queue is the position of the oldest (next to be consumed) message
+ * on the queue or, if the queue is empty, it is the position that will be occupied
+ * by the next message pushed onto the queue.
+ *
+ * This leads to the slightly surprising conclusion that for an empty queue
+ * front = back+1
  */
 struct QueueRange {
   public:
     framing::SequenceNumber front, back;
 
-    QueueRange() { }
+    QueueRange() : front(1), back(0) { } // Empty range.
 
     QueueRange(broker::Queue& q) {
-        back = q.getPosition();
-        front = back+1;         // assume empty
-        ReplicatingSubscription::getFront(q, front);
+        if (ReplicatingSubscription::getFront(q, front))
+            back = q.getPosition();
+        else {
+            back = q.getPosition();
+            front = back+1;     // empty
+        }
         assert(front <= back + 1);
     }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org