You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/10/12 20:38:54 UTC
svn commit: r1397676 - in /qpid/trunk/qpid/cpp/src:
qpid/ha/BrokerReplicator.cpp qpid/ha/QueueReplicator.cpp
qpid/ha/QueueReplicator.h qpid/ha/StatusCheck.cpp
tests/qpid-cluster-benchmark
Author: aconway
Date: Fri Oct 12 18:38:53 2012
New Revision: 1397676
URL: http://svn.apache.org/viewvc?rev=1397676&view=rev
Log:
QPID-4369: HA backup brokers core dump in benchmark test.
Was seeing core dumps with QueueReplicator::queue == 0. Caused by race
conditions when calling QueueReplicator::deactivate. Renamed deactivate to
destroy and call it only when the broker::Queue is destroyed.
Modified:
qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1397676&r1=1397675&r2=1397676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Oct 12 18:38:53 2012
@@ -277,14 +277,7 @@ void collectQueueReplicators(
}
} // namespace
-void BrokerReplicator::shutdown() {
- QPID_LOG(debug, logPrefix << "BrokerReplicator shutting down.");
- set<boost::shared_ptr<QueueReplicator> > collect;
- broker.getExchanges().eachExchange(
- boost::bind(&collectQueueReplicators, _1, boost::ref(collect)));
- for_each(collect.begin(), collect.end(),
- boost::bind(&QueueReplicator::deactivate, _1));
-}
+void BrokerReplicator::shutdown() {}
// This is called in the connection IO thread when the bridge is started.
void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
@@ -672,8 +665,6 @@ boost::shared_ptr<QueueReplicator> Broke
}
void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
- boost::shared_ptr<QueueReplicator> qr(findQueueReplicator(name));
- if (qr) qr->deactivate();
Queue::shared_ptr queue = broker.getQueues().find(name);
if (queue) {
// Purge before deleting to ensure that we don't reroute any
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1397676&r1=1397675&r2=1397676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Oct 12 18:38:53 2012
@@ -47,6 +47,7 @@ namespace ha {
using namespace broker;
using namespace framing;
using namespace std;
+using sys::Mutex;
const std::string QPID_HA_EVENT_PREFIX("qpid.ha-");
const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue");
@@ -94,7 +95,8 @@ class QueueReplicator::QueueObserver : p
void requeued(const Message&) {}
void consumerAdded( const Consumer& ) {}
void consumerRemoved( const Consumer& ) {}
- void destroy() { queueReplicator->deactivate(); }
+ // Queue observer is destroyed when the queue is.
+ void destroy() { queueReplicator->destroy(); }
private:
boost::shared_ptr<QueueReplicator> queueReplicator;
};
@@ -115,7 +117,8 @@ QueueReplicator::QueueReplicator(HaBroke
// This must be separate from the constructor so we can call shared_from_this.
void QueueReplicator::activate() {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
std::pair<Bridge::shared_ptr, bool> result =
queue->getBroker()->getLinks().declare(
bridgeName,
@@ -141,12 +144,14 @@ void QueueReplicator::activate() {
queue->addObserver(observer);
}
-QueueReplicator::~QueueReplicator() { deactivate(); }
+QueueReplicator::~QueueReplicator() {}
-void QueueReplicator::deactivate() {
- QPID_LOG(debug, logPrefix << "Deactivated");
- sys::Mutex::ScopedLock l(lock);
- if (bridge) bridge->close();
+void QueueReplicator::destroy() {
+ // Called from Queue::destroyed()
+ Mutex::ScopedLock l(lock);
+ if (!bridge) return;
+ QPID_LOG(debug, logPrefix << "Destroyed.");
+ bridge->close();
// Need to drop shared pointers to avoid pointer cycles keeping this in memory.
queue.reset();
link.reset();
@@ -156,7 +161,8 @@ void QueueReplicator::deactivate() {
// Called in a broker connection thread when the bridge is created.
void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
AMQP_ServerProxy peer(sessionHandler.out);
const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
FieldTable settings;
@@ -197,7 +203,13 @@ template <class T> T decodeContent(Messa
}
}
-void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) {
+void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) {
+ boost::shared_ptr<Queue> q;
+ {
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
+ q = queue;
+ }
// Thread safe: only calls thread safe Queue functions.
queue->dequeueMessageAt(n);
}
@@ -218,7 +230,8 @@ void QueueReplicator::route(Deliverable&
{
try {
const std::string& key = msg.getMessage().getRoutingKey();
- sys::Mutex::ScopedLock l(lock);
+ Mutex::ScopedLock l(lock);
+ if (!queue) return; // Already destroyed
if (!isEventKey(key)) {
msg.deliverTo(queue);
// We are on a backup so the queue is not modified except via this.
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1397676&r1=1397675&r2=1397676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Fri Oct 12 18:38:53 2012
@@ -70,7 +70,6 @@ class QueueReplicator : public broker::E
~QueueReplicator();
void activate(); // Call after ctor
- void deactivate(); // Call before dtor
std::string getType() const;
bool bind(boost::shared_ptr<broker::Queue
@@ -90,6 +89,7 @@ class QueueReplicator : public broker::E
class QueueObserver;
void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
+ void destroy(); // Called when the queue is destroyed.
void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&);
HaBroker& haBroker;
Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp?rev=1397676&r1=1397675&r2=1397676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp Fri Oct 12 18:38:53 2012
@@ -93,7 +93,7 @@ void StatusCheckThread::run() {
QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status);
}
} catch(const exception& error) {
- QPID_LOG(warning, "Error checking status of " << url << ": " << error.what());
+ QPID_LOG(info, "Checking status of " << url << ": " << error.what());
}
delete this;
}
Modified: qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark?rev=1397676&r1=1397675&r2=1397676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark Fri Oct 12 18:38:53 2012
@@ -55,12 +55,10 @@ done
shift $(($OPTIND-1))
CONNECTION_OPTIONS="--connection-options {tcp-nodelay:$TCP_NODELAY,reconnect:$RECONNECT,heartbeat:$HEARTBEAT}"
-CREATE_OPTIONS="node:{x-declare:{arguments:{'qpid.replicate':all}}}"
BROKER=$(echo $BROKERS | sed s/,.*//)
run_test() { echo $*; shift; "$@"; echo; echo; echo; }
OPTS="$OPTS $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $CONNECTION_OPTIONS $NO_DELETE"
-OPTS="$OPTS --create-option $CREATE_OPTIONS"
run_test "Benchmark:" qpid-cpp-benchmark $OPTS "$@"
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org