You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/09/21 16:55:50 UTC

svn commit: r1173695 - in /qpid/branches/qpid-2920-active/qpid/cpp/src: qpid/broker/ qpid/cluster/exp/ qpid/sys/ tests/

Author: aconway
Date: Wed Sep 21 14:55:49 2011
New Revision: 1173695

URL: http://svn.apache.org/viewvc?rev=1173695&view=rev
Log:
QPID-2920: Fixing hangs in qid-cpp-benchmark with 2 brokers.

This test hangs: qpid-cpp-benchmark -b localhost:5556,localhost:5555 -r2 -m10000

Modified:
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h
    qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Timer.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/tests/BrokerClusterCalls.cpp
    qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1173695&r1=1173694&r2=1173695&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/broker/Queue.cpp Wed Sep 21 14:55:49 2011
@@ -227,7 +227,7 @@ void Queue::requeue(const QueuedMessage&
             }
         }
     }
-    if (broker) broker->getCluster().requeue(msg); // FIXME aconway 2011-09-12: review. rename requeue?
+    if (broker) broker->getCluster().requeue(msg);
     copy.notify();
 }
 
@@ -255,7 +255,6 @@ bool Queue::acquireMessageAt(const Seque
     ClusterAcquireScope acquireScope; // Outside lock
     Mutex::ScopedLock locker(messageLock);
     assertClusterSafe();
-    QPID_LOG(debug, "Attempting to acquire message at " << position);
     if (messages->remove(position, message)) {
         QPID_LOG(debug, "Acquired message at " << position << " from " << name);
         acquireScope.qmsg = message;
@@ -307,13 +306,13 @@ Queue::ConsumeCode Queue::consumeNextMes
     while (true) {
         Stoppable::Scope consumeScope(consuming);
         if (!consumeScope) {
-            QPID_LOG(trace, "Queue is stopped: " << name);
+            QPID_LOG(trace, "Queue stopped, can't  consume: " << name);
             listeners.addListener(c);
             return NO_MESSAGES;
         }
         ClusterAcquireScope acquireScope; // Outside the lock
         Mutex::ScopedLock locker(messageLock);
-        if (messages->empty()) { // FIXME aconway 2011-06-07: ugly
+        if (messages->empty()) {
             QPID_LOG(debug, "No messages to dispatch on queue '" << name << "'");
             listeners.addListener(c);
             return NO_MESSAGES;
@@ -914,10 +913,6 @@ void Queue::notifyDeleted()
     set.notifyAll();
 }
 
-void Queue::consumingStopped() {
-    if (broker) broker->getCluster().stopped(*this);
-}
-
 void Queue::bound(const string& exchange, const string& key,
                   const FieldTable& args)
 {
@@ -1287,12 +1282,19 @@ void Queue::UsageBarrier::destroy()
 }
 
 void Queue::stopConsumers() {
-    QPID_LOG(trace, "Queue stopped: " << getName());
+    QPID_LOG(trace, "Stopping consumers on " << getName());
     consuming.stop();
 }
 
 void Queue::startConsumers() {
-    QPID_LOG(trace, "Queue started: " << getName());
+    QPID_LOG(trace, "Starting consumers on " << getName());
     consuming.start();
     notifyListener();
 }
+
+// Called when all busy threads exitd due to stopConsumers()
+void Queue::consumingStopped() {
+    QPID_LOG(trace, "Stopped consumers on " << getName());
+    if (broker) broker->getCluster().stopped(*this);
+}
+

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h?rev=1173695&r1=1173694&r2=1173695&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/CountdownTimer.h Wed Sep 21 14:55:49 2011
@@ -23,6 +23,7 @@
  */
 
 #include "qpid/sys/Timer.h"
+#include "qpid/log/Statement.h" // FIXME aconway 2011-09-19: remove
 #include <boost/function.hpp>
 
 namespace qpid {
@@ -44,6 +45,7 @@ class CountdownTimer {
 
     /** Start the countdown if not already started. */
     void start() {
+        QPID_LOG(debug, "FIXME CountdownTimer::start");
         sys::Mutex::ScopedLock l(lock);
         if (!timerRunning) {
             timerRunning = true;
@@ -54,6 +56,7 @@ class CountdownTimer {
 
     /** Stop the countdown if not already stopped. */
     void stop() {
+        QPID_LOG(debug, "FIXME CountdownTimer::stop");
         sys::Mutex::ScopedLock l(lock);
         if (timerRunning) {
             timerRunning = false;
@@ -73,6 +76,7 @@ class CountdownTimer {
 
     // Called when countdown expires.
     void fire() {
+        QPID_LOG(debug, "FIXME CountdownTimer::fire");
         bool doCallback = false;
         {
             sys::Mutex::ScopedLock l(lock);
@@ -87,6 +91,7 @@ class CountdownTimer {
     bool timerRunning;
     boost::function<void()> callback;
     sys::Timer& timer;
+    sys::Duration duration;
 };
 
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp?rev=1173695&r1=1173694&r2=1173695&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/MessageHandler.cpp Wed Sep 21 14:55:49 2011
@@ -89,6 +89,9 @@ void MessageHandler::routed(RoutingId ro
 // FIXME aconway 2011-09-14: performance: pack acquires into a SequenceSet
 // and scan queue once.
 void MessageHandler::acquire(const std::string& q, uint32_t position) {
+    // FIXME aconway 2011-09-15: systematic logging across cluster module.
+    QPID_LOG(trace, "cluster message " << q << "[" << position
+             << "] acquired by " << PrettyId(sender(), self()));
     // Note acquires from other members. My own acquires were executed in
     // the connection thread
     if (sender() != self()) {
@@ -102,18 +105,20 @@ void MessageHandler::acquire(const std::
         assert(qm.payload);
         // Save on context for possible requeue if released/rejected.
         QueueContext::get(*queue)->acquire(qm);
+        // FIXME aconway 2011-09-19: need to record by member-ID to  requeue if member leaves.
     }
+}
+
+void MessageHandler::dequeue(const std::string& q, uint32_t position) {
     // FIXME aconway 2011-09-15: systematic logging across cluster module.
     QPID_LOG(trace, "cluster message " << q << "[" << position
-             << "] acquired by " << PrettyId(sender(), self()));
- }
+             << "] dequeued by " << PrettyId(sender(), self()));
 
-void MessageHandler::dequeue(const std::string& q, uint32_t position) {
-    if (sender() == self()) {
-        // FIXME aconway 2010-10-28: we should complete the ack that initiated
-        // the dequeue at this point, see BrokerContext::dequeue
-    }
-    else {
+    // FIXME aconway 2010-10-28: for local dequeues, we should
+    // complete the ack that initiated the dequeue at this point, see
+    // BrokerContext::dequeue
+
+    if (sender() != self()) {
         // FIXME aconway 2011-09-15: new cluster, inefficient looks up
         // message by position multiple times?
         boost::shared_ptr<Queue> queue = findQueue(q, "Cluster dequeue failed");

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp?rev=1173695&r1=1173694&r2=1173695&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueContext.cpp Wed Sep 21 14:55:49 2011
@@ -48,36 +48,36 @@ QueueContext::QueueContext(broker::Queue
 
 QueueContext::~QueueContext() {}
 
-// Invariant for ownership:
-// UNSUBSCRIBED, SUBSCRIBED => timer stopped, queue stopped
-// SOLE_OWNER => timer stopped, queue started
-// SHARED_OWNER => timer started, queue started
-
 namespace {
 bool isOwner(QueueOwnership o) { return o == SOLE_OWNER || o == SHARED_OWNER; }
 }
 
 // Called by QueueReplica in CPG deliver thread when state changes.
 void QueueContext::replicaState(QueueOwnership newOwnership) {
+
+    // Invariants for ownership:
+    // UNSUBSCRIBED, SUBSCRIBED <=> timer stopped, queue stopped
+    // SOLE_OWNER <=> timer stopped, queue started
+    // SHARED_OWNER <=> timer started, queue started
+
     sys::Mutex::ScopedLock l(lock);
     QueueOwnership before = ownership;
     QueueOwnership after = newOwnership;
-    ownership = after;
-    if (!isOwner(before) && !isOwner(after))
-        ;   // Nothing to do, now ownership change on this transition.
-    else if (isOwner(before) && !isOwner(after)) // Lost ownership
-        ; // Nothing to do, queue and timer were stopped before
-          // sending unsubscribe/resubscribe.
-    else if (!isOwner(before) && isOwner(after)) { // Took ownership
+    assert(before != after);
+    ownership = newOwnership;
+
+    if (!isOwner(before) && isOwner(after)) { // Took ownership
         queue.startConsumers();
         if (after == SHARED_OWNER) timer.start();
     }
     else if (isOwner(before) && isOwner(after) && before != after) {
+        // Changed from shared to sole owner or vice versa
         if (after == SOLE_OWNER) timer.stop();
         else timer.start();
     }
+    // If we lost ownership then the queue and timer will already have
+    // been stopped by timeout()
 }
-
 // FIXME aconway 2011-07-27: Dont spin the token on an empty or idle queue.
 
 // Called in connection threads when a consumer is added
@@ -101,6 +101,7 @@ void QueueContext::cancel(size_t n) {
 
 // Called in timer thread.
 void QueueContext::timeout() {
+    QPID_LOG(debug, "FIXME QueueContext::timeout");
     // When all threads have stopped, queue will call stopped()
     queue.stopConsumers();
 }
@@ -108,8 +109,8 @@ void QueueContext::timeout() {
 // Callback set up by queue.stopConsumers() called in connection thread.
 // Called when no threads are dispatching from the queue.
 void QueueContext::stopped() {
+    QPID_LOG(debug, "FIXME QueueContext::stopped");
     sys::Mutex::ScopedLock l(lock);
-    // FIXME aconway 2011-07-28: review thread safety of state.
     if (consumers == 0)
         mcast.mcast(framing::ClusterQueueUnsubscribeBody(
                         framing::ProtocolVersion(), queue.getName()));

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp?rev=1173695&r1=1173694&r2=1173695&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.cpp Wed Sep 21 14:55:49 2011
@@ -46,16 +46,10 @@ std::ostream& operator<<(std::ostream& o
 }
 
 std::ostream& operator<<(std::ostream& o, QueueOwnership s) {
-    static char* tags[] = { "UNSUBSCRIBED", "SUBSCRIBED", "SOLE_OWNER", "SHARED_OWNER" };
+    static char* tags[] = { "unsubscribed", "subscribed", "sole_owner", "shared_owner" };
     return o << tags[s];
 }
 
-std::ostream& operator<<(std::ostream& o, const QueueReplica& qr) {
-    o << qr.queue->getName() << "(" << qr.getState() << "): "
-      <<  PrintSubscribers(qr.subscribers, qr.getSelf());
-    return o;
-}
-
 void QueueReplica::subscribe(const MemberId& member) {
     QueueOwnership before = getState();
     subscribers.push_back(member);
@@ -81,15 +75,17 @@ void QueueReplica::resubscribe(const Mem
 }
 
 void QueueReplica::update(QueueOwnership before) {
-    QPID_LOG(trace, "cluster: queue replica " << *this << " (was " << before << ")");
     QueueOwnership after = getState();
+    QPID_LOG(trace, "cluster queue replica: " << queue->getName() << ": "
+             << before << "->" << after << " [" << PrintSubscribers(subscribers, self) << "]");
     if (before != after) context->replicaState(after);
 }
 
 QueueOwnership QueueReplica::getState() const {
     if (isOwner())
         return (subscribers.size() > 1) ? SHARED_OWNER : SOLE_OWNER;
-    return (isSubscriber(self)) ? SUBSCRIBED : UNSUBSCRIBED;
+    else
+        return (isSubscriber(self)) ? SUBSCRIBED : UNSUBSCRIBED;
 }
 
 bool QueueReplica::isOwner() const {

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h?rev=1173695&r1=1173694&r2=1173695&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/cluster/exp/QueueReplica.h Wed Sep 21 14:55:49 2011
@@ -72,7 +72,6 @@ class QueueReplica : public RefCounted
 
   friend struct PrintSubscribers;
   friend std::ostream& operator<<(std::ostream&, QueueOwnership);
-  friend std::ostream& operator<<(std::ostream&, const QueueReplica&);
   friend std::ostream& operator<<(std::ostream& o, const PrintSubscribers& ps);
 };
 

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h?rev=1173695&r1=1173694&r2=1173695&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Stoppable.h Wed Sep 21 14:55:49 2011
@@ -68,7 +68,6 @@ class Stoppable {
      */
     void stop() {
         sys::Monitor::ScopedLock l(lock);
-        if (stopped) return;
         stopped = true;
         check(l);
     }

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Timer.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Timer.cpp?rev=1173695&r1=1173694&r2=1173695&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Timer.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/qpid/sys/Timer.cpp Wed Sep 21 14:55:49 2011
@@ -68,7 +68,11 @@ void TimerTask::setupNextFire() {
 }
 
 // Only allow tasks to be delayed
-void TimerTask::restart() { nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period)); }
+void TimerTask::restart() {
+    ScopedLock<Mutex> l(callbackLock);
+    nextFireTime = max(nextFireTime, AbsTime(AbsTime::now(), period));
+    cancelled = false;
+}
 
 void TimerTask::cancel() {
     ScopedLock<Mutex> l(callbackLock);

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/tests/BrokerClusterCalls.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/tests/BrokerClusterCalls.cpp?rev=1173695&r1=1173694&r2=1173695&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/tests/BrokerClusterCalls.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/tests/BrokerClusterCalls.cpp Wed Sep 21 14:55:49 2011
@@ -96,7 +96,6 @@ class DummyCluster : public broker::Clus
     }
     virtual bool dequeue(const broker::QueuedMessage& qm) {
         if (!isRouting) recordQm("dequeue", qm);
-        return false;
     }
 
     // Consumers

Modified: qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp?rev=1173695&r1=1173694&r2=1173695&view=diff
==============================================================================
--- qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp (original)
+++ qpid/branches/qpid-2920-active/qpid/cpp/src/tests/qpid-receive.cpp Wed Sep 21 14:55:49 2011
@@ -198,6 +198,10 @@ int main(int argc, char ** argv)
             std::map<std::string,Sender> replyTo;
 
             while (!done && receiver.fetch(msg, timeout)) {
+                // FIXME aconway 2011-09-19: 
+//                 std::ostringstream os;
+//                 os << "qpid-receive(" << getpid() << ") seq=" << msg.getProperties()[SN] << endl; // FIXME aconway 2011-09-19:
+//                 cerr << os.str() << flush;
                 if (!started) {
                     // Start the time on receipt of the first message to avoid counting
                     // idle time at process startup.
@@ -225,6 +229,7 @@ int main(int argc, char ** argv)
                         if (opts.printContent)
                             std::cout << msg.getContent() << std::endl;//TODO: handle map or list messages
                         if (opts.messages && count >= opts.messages) {
+                            cerr << "qpid-receive(" << getpid() << ") DONE" << endl;
                             done = true;
                         }
                     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org