You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/12/15 23:13:16 UTC

svn commit: r1214969 - in /qpid/branches/qpid-3603/qpid/cpp/src/qpid: broker/Queue.cpp broker/Queue.h ha/ReplicatingSubscription.cpp

Author: aconway
Date: Thu Dec 15 22:13:16 2011
New Revision: 1214969

URL: http://svn.apache.org/viewvc?rev=1214969&view=rev
Log:
QPID-3603: Fix race condition in setting initial position of ReplicatingSubscription.

Modified:
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1214969&r1=1214968&r2=1214969&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp Thu Dec 15 22:13:16 2011
@@ -1471,15 +1471,6 @@ class FindLowest
 };
 }
 
-bool Queue::getOldest(qpid::framing::SequenceNumber& oldest)
-{
-    //Horribly inefficient, but saves modifying Messages interface and
-    //all its implementations at present:
-    FindLowest f;
-    eachMessage(boost::bind(&FindLowest::process, &f, _1));
-    return f.getLowest(oldest);
-}
-
 Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
 
 bool Queue::UsageBarrier::acquire()

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.h?rev=1214969&r1=1214968&r2=1214969&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.h Thu Dec 15 22:13:16 2011
@@ -407,7 +407,6 @@ class Queue : public boost::enable_share
 
     uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
     void setDequeueSincePurge(uint32_t value);
-    bool getOldest(framing::SequenceNumber& result);
 };
 }
 }

Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1214969&r1=1214968&r2=1214969&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Thu Dec 15 22:13:16 2011
@@ -98,24 +98,21 @@ ReplicatingSubscription::ReplicatingSubs
     // Note that broker::Queue::getPosition() returns the sequence
     // number that will be assigned to the next message *minus 1*.
 
-    // this->position is inherited from ConsumerImpl. It tracks the
-    // position of the last message browsed on the local (primary)
-    // queue, or more exactly the next sequence number to browse
-    // *minus 1*
-    qpid::framing::SequenceNumber oldest;
-    position = queue->getOldest(oldest) ? --oldest : queue->getPosition();
-
     // this->backupPosition tracks the position of the remote backup
     // queue, i.e. the sequence number for the next delivered message
     // *minus one*
     backupPosition = 0;
+
+    // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
+    // so we will start consuming from the lowest numbered message.
+    // This is incorrect if the sequence number wraps around, but
+    // this is what all consumers currently do.
 }
 
 // Message is delivered in the subscription's connection thread.
 bool ReplicatingSubscription::deliver(QueuedMessage& m) {
     // Add position events for the subscribed queue, not for the internal event queue.
-    if (m.queue && m.queue->getName() == getQueue()->getName()) {
-        QPID_LOG(trace, "HA: replicating message to backup: " << QueuePos(m));
+    if (m.queue && m.queue == getQueue().get()) {
         assert(position == m.position);
         {
              sys::Mutex::ScopedLock l(lock);
@@ -130,6 +127,7 @@ bool ReplicatingSubscription::deliver(Qu
              }
              backupPosition = position;
         }
+        QPID_LOG(trace, "HA: replicating message to backup: " << QueuePos(m));
     }
     return ConsumerImpl::deliver(m);
 }
@@ -213,7 +211,7 @@ void ReplicatingSubscription::dequeued(c
     {
         sys::Mutex::ScopedLock l(lock);
         dequeues.add(m.position);
-        QPID_LOG(trace, "HA: Will dequeue " << QueuePos(m) << " on " << getName());
+        QPID_LOG(trace, "HA: Dequeued " << QueuePos(m) << " on " << getName());
     }
     notify();                   // Ensure a call to doDispatch
     if (m.position > position) {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org