You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2012/10/12 20:38:54 UTC

svn commit: r1397676 - in /qpid/trunk/qpid/cpp/src: qpid/ha/BrokerReplicator.cpp qpid/ha/QueueReplicator.cpp qpid/ha/QueueReplicator.h qpid/ha/StatusCheck.cpp tests/qpid-cluster-benchmark

Author: aconway
Date: Fri Oct 12 18:38:53 2012
New Revision: 1397676

URL: http://svn.apache.org/viewvc?rev=1397676&view=rev
Log:
QPID-4369: HA backup brokers core dump in benchmark test.

Was seeing core dumps with QueueReplicator::queue == 0. Caused by race
conditions when calling QueueReplicator::deactivate. Renamed deactivate to
destroy and call it only when the broker::Queue is destroyed.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
    qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
    qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1397676&r1=1397675&r2=1397676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Fri Oct 12 18:38:53 2012
@@ -277,14 +277,7 @@ void collectQueueReplicators(
 }
 } // namespace
 
-void BrokerReplicator::shutdown() {
-    QPID_LOG(debug, logPrefix << "BrokerReplicator shutting down.");
-    set<boost::shared_ptr<QueueReplicator> > collect;
-    broker.getExchanges().eachExchange(
-        boost::bind(&collectQueueReplicators, _1, boost::ref(collect)));
-    for_each(collect.begin(), collect.end(),
-             boost::bind(&QueueReplicator::deactivate, _1));
-}
+void BrokerReplicator::shutdown() {}
 
 // This is called in the connection IO thread when the bridge is started.
 void BrokerReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
@@ -672,8 +665,6 @@ boost::shared_ptr<QueueReplicator> Broke
 }
 
 void BrokerReplicator::deleteQueue(const std::string& name, bool purge) {
-    boost::shared_ptr<QueueReplicator> qr(findQueueReplicator(name));
-    if (qr) qr->deactivate();
     Queue::shared_ptr queue = broker.getQueues().find(name);
     if (queue) {
         // Purge before deleting to ensure that we don't reroute any

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1397676&r1=1397675&r2=1397676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Fri Oct 12 18:38:53 2012
@@ -47,6 +47,7 @@ namespace ha {
 using namespace broker;
 using namespace framing;
 using namespace std;
+using sys::Mutex;
 
 const std::string QPID_HA_EVENT_PREFIX("qpid.ha-");
 const std::string QueueReplicator::DEQUEUE_EVENT_KEY(QPID_HA_EVENT_PREFIX+"dequeue");
@@ -94,7 +95,8 @@ class QueueReplicator::QueueObserver : p
     void requeued(const Message&) {}
     void consumerAdded( const Consumer& ) {}
     void consumerRemoved( const Consumer& ) {}
-    void destroy() { queueReplicator->deactivate(); }
+    // Queue observer is destroyed when the queue is.
+    void destroy() { queueReplicator->destroy(); }
   private:
     boost::shared_ptr<QueueReplicator> queueReplicator;
 };
@@ -115,7 +117,8 @@ QueueReplicator::QueueReplicator(HaBroke
 
 // This must be separate from the constructor so we can call shared_from_this.
 void QueueReplicator::activate() {
-    sys::Mutex::ScopedLock l(lock);
+    Mutex::ScopedLock l(lock);
+    if (!queue) return;         // Already destroyed
     std::pair<Bridge::shared_ptr, bool> result =
     queue->getBroker()->getLinks().declare(
         bridgeName,
@@ -141,12 +144,14 @@ void QueueReplicator::activate() {
     queue->addObserver(observer);
 }
 
-QueueReplicator::~QueueReplicator() { deactivate(); }
+QueueReplicator::~QueueReplicator() {}
 
-void QueueReplicator::deactivate() {
-    QPID_LOG(debug, logPrefix << "Deactivated");
-    sys::Mutex::ScopedLock l(lock);
-    if (bridge) bridge->close();
+void QueueReplicator::destroy() {
+    // Called from Queue::destroyed()
+    Mutex::ScopedLock l(lock);
+    if (!bridge) return;
+    QPID_LOG(debug, logPrefix << "Destroyed.");
+    bridge->close();
     // Need to drop shared pointers to avoid pointer cycles keeping this in memory.
     queue.reset();
     link.reset();
@@ -156,7 +161,8 @@ void QueueReplicator::deactivate() {
 
 // Called in a broker connection thread when the bridge is created.
 void QueueReplicator::initializeBridge(Bridge& bridge, SessionHandler& sessionHandler) {
-    sys::Mutex::ScopedLock l(lock);
+    Mutex::ScopedLock l(lock);
+    if (!queue) return;         // Already destroyed
     AMQP_ServerProxy peer(sessionHandler.out);
     const qmf::org::apache::qpid::broker::ArgsLinkBridge& args(bridge.getArgs());
     FieldTable settings;
@@ -197,7 +203,13 @@ template <class T> T decodeContent(Messa
 }
 }
 
-void QueueReplicator::dequeue(SequenceNumber n, sys::Mutex::ScopedLock&) {
+void QueueReplicator::dequeue(SequenceNumber n, Mutex::ScopedLock&) {
+    boost::shared_ptr<Queue> q;
+    {
+        Mutex::ScopedLock l(lock);
+        if (!queue) return;         // Already destroyed
+        q = queue;
+    }
     // Thread safe: only calls thread safe Queue functions.
     queue->dequeueMessageAt(n);
 }
@@ -218,7 +230,8 @@ void QueueReplicator::route(Deliverable&
 {
     try {
         const std::string& key = msg.getMessage().getRoutingKey();
-        sys::Mutex::ScopedLock l(lock);
+        Mutex::ScopedLock l(lock);
+        if (!queue) return;         // Already destroyed
         if (!isEventKey(key)) {
             msg.deliverTo(queue);
             // We are on a backup so the queue is not modified except via this.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1397676&r1=1397675&r2=1397676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.h Fri Oct 12 18:38:53 2012
@@ -70,7 +70,6 @@ class QueueReplicator : public broker::E
     ~QueueReplicator();
 
     void activate();            // Call after ctor
-    void deactivate();          // Call before dtor
 
     std::string getType() const;
     bool bind(boost::shared_ptr<broker::Queue
@@ -90,6 +89,7 @@ class QueueReplicator : public broker::E
     class QueueObserver;
 
     void initializeBridge(broker::Bridge& bridge, broker::SessionHandler& sessionHandler);
+    void destroy();             // Called when the queue is destroyed.
     void dequeue(framing::SequenceNumber, sys::Mutex::ScopedLock&);
 
     HaBroker& haBroker;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp?rev=1397676&r1=1397675&r2=1397676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/StatusCheck.cpp Fri Oct 12 18:38:53 2012
@@ -93,7 +93,7 @@ void StatusCheckThread::run() {
             QPID_LOG(debug, statusCheck.logPrefix << "Status of " << url << ": " << status);
         }
     } catch(const exception& error) {
-        QPID_LOG(warning, "Error checking status of " << url <<  ": " << error.what());
+        QPID_LOG(info, "Checking status of " << url <<  ": " << error.what());
     }
     delete this;
 }

Modified: qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark?rev=1397676&r1=1397675&r2=1397676&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark (original)
+++ qpid/trunk/qpid/cpp/src/tests/qpid-cluster-benchmark Fri Oct 12 18:38:53 2012
@@ -55,12 +55,10 @@ done
 shift $(($OPTIND-1))
 
 CONNECTION_OPTIONS="--connection-options {tcp-nodelay:$TCP_NODELAY,reconnect:$RECONNECT,heartbeat:$HEARTBEAT}"
-CREATE_OPTIONS="node:{x-declare:{arguments:{'qpid.replicate':all}}}"
 
 BROKER=$(echo $BROKERS | sed s/,.*//)
 run_test() { echo $*; shift; "$@"; echo; echo; echo; }
 
 OPTS="$OPTS $REPEAT $BROKERS --summarize $QUEUES $SENDERS $RECEIVERS $MESSAGES $CLIENT_HOSTS $SAVE_RECEIVED $CONNECTION_OPTIONS $NO_DELETE"
-OPTS="$OPTS --create-option $CREATE_OPTIONS"
 
 run_test "Benchmark:" qpid-cpp-benchmark $OPTS "$@"



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org