You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/12/15 23:13:16 UTC
svn commit: r1214969 - in /qpid/branches/qpid-3603/qpid/cpp/src/qpid:
broker/Queue.cpp broker/Queue.h ha/ReplicatingSubscription.cpp
Author: aconway
Date: Thu Dec 15 22:13:16 2011
New Revision: 1214969
URL: http://svn.apache.org/viewvc?rev=1214969&view=rev
Log:
QPID-3603: Fix race condition in setting initial position of ReplicatingSubscription.
Modified:
qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.h
qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1214969&r1=1214968&r2=1214969&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.cpp Thu Dec 15 22:13:16 2011
@@ -1471,15 +1471,6 @@ class FindLowest
};
}
-bool Queue::getOldest(qpid::framing::SequenceNumber& oldest)
-{
- //Horribly inefficient, but saves modifying Messages interface and
- //all its implementations at present:
- FindLowest f;
- eachMessage(boost::bind(&FindLowest::process, &f, _1));
- return f.getLowest(oldest);
-}
-
Queue::UsageBarrier::UsageBarrier(Queue& q) : parent(q), count(0) {}
bool Queue::UsageBarrier::acquire()
Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.h?rev=1214969&r1=1214968&r2=1214969&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/broker/Queue.h Thu Dec 15 22:13:16 2011
@@ -407,7 +407,6 @@ class Queue : public boost::enable_share
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
void setDequeueSincePurge(uint32_t value);
- bool getOldest(framing::SequenceNumber& result);
};
}
}
Modified: qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1214969&r1=1214968&r2=1214969&view=diff
==============================================================================
--- qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/branches/qpid-3603/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Thu Dec 15 22:13:16 2011
@@ -98,24 +98,21 @@ ReplicatingSubscription::ReplicatingSubs
// Note that broker::Queue::getPosition() returns the sequence
// number that will be assigned to the next message *minus 1*.
- // this->position is inherited from ConsumerImpl. It tracks the
- // position of the last message browsed on the local (primary)
- // queue, or more exactly the next sequence number to browse
- // *minus 1*
- qpid::framing::SequenceNumber oldest;
- position = queue->getOldest(oldest) ? --oldest : queue->getPosition();
-
// this->backupPosition tracks the position of the remote backup
// queue, i.e. the sequence number for the next delivered message
// *minus one*
backupPosition = 0;
+
+ // FIXME aconway 2011-12-15: ConsumerImpl::position is left at 0
+ // so we will start consuming from the lowest numbered message.
+ // This is incorrect if the sequence number wraps around, but
+ // this is what all consumers currently do.
}
// Message is delivered in the subscription's connection thread.
bool ReplicatingSubscription::deliver(QueuedMessage& m) {
// Add position events for the subscribed queue, not for the internal event queue.
- if (m.queue && m.queue->getName() == getQueue()->getName()) {
- QPID_LOG(trace, "HA: replicating message to backup: " << QueuePos(m));
+ if (m.queue && m.queue == getQueue().get()) {
assert(position == m.position);
{
sys::Mutex::ScopedLock l(lock);
@@ -130,6 +127,7 @@ bool ReplicatingSubscription::deliver(Qu
}
backupPosition = position;
}
+ QPID_LOG(trace, "HA: replicating message to backup: " << QueuePos(m));
}
return ConsumerImpl::deliver(m);
}
@@ -213,7 +211,7 @@ void ReplicatingSubscription::dequeued(c
{
sys::Mutex::ScopedLock l(lock);
dequeues.add(m.position);
- QPID_LOG(trace, "HA: Will dequeue " << QueuePos(m) << " on " << getName());
+ QPID_LOG(trace, "HA: Dequeued " << QueuePos(m) << " on " << getName());
}
notify(); // Ensure a call to doDispatch
if (m.position > position) {
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org