You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/09/06 23:46:34 UTC

svn commit: r1165881 - in /qpid/branches/qpid-2920-1/qpid/cpp/src: qpid/broker/Queue.cpp qpid/broker/Queue.h qpid/sys/Stoppable.h tests/QueueTest.cpp

Author: aconway
Date: Tue Sep  6 21:46:33 2011
New Revision: 1165881

URL: http://svn.apache.org/viewvc?rev=1165881&view=rev
Log:
QPID-2920: Allow stopping consumers on queues.

Stop consumers from dispatching and wait for already dispatching consumers to exit.

Added:
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h   (with props)
Modified:
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp
    qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.h
    qpid/branches/qpid-2920-1/qpid/cpp/src/tests/QueueTest.cpp

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1165881&r1=1165880&r2=1165881&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp Tue Sep  6 21:46:33 2011
@@ -373,11 +373,18 @@ void Queue::removeListener(Consumer::sha
 
 bool Queue::dispatch(Consumer::shared_ptr c)
 {
-    QueuedMessage msg(this);
-    if (getNextMessage(msg, c)) {
-        c->deliver(msg);
-        return true;
-    } else {
+    Stoppable::Scope doDispatch(dispatching);
+    if (doDispatch) {
+        QueuedMessage msg(this);
+        if (getNextMessage(msg, c)) {
+            c->deliver(msg);
+            return true;
+        } else {
+            return false;
+        }
+    } else { // Dispatching is stopped
+        Mutex::ScopedLock locker(messageLock);
+        listeners.addListener(c); // FIXME aconway 2011-05-05:
         return false;
     }
 }
@@ -1265,3 +1272,13 @@ void Queue::UsageBarrier::destroy()
     parent.deleted = true;
     while (count) parent.messageLock.wait();
 }
+
+// FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()?
+void Queue::stop() {
+    dispatching.stop();
+}
+
+void Queue::start() {
+    dispatching.start();
+    notifyListener();
+}

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.h?rev=1165881&r1=1165880&r2=1165881&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.h Tue Sep  6 21:46:33 2011
@@ -36,6 +36,7 @@
 #include "qpid/framing/FieldTable.h"
 #include "qpid/sys/AtomicValue.h"
 #include "qpid/sys/Monitor.h"
+#include "qpid/sys/Stoppable.h"
 #include "qpid/sys/Timer.h"
 #include "qpid/management/Manageable.h"
 #include "qmf/org/apache/qpid/broker/Queue.h"
@@ -70,6 +71,7 @@ class Exchange;
 class Queue : public boost::enable_shared_from_this<Queue>,
               public PersistableQueue, public management::Manageable {
 
+    // Used to prevent destruction of the queue while it is in use.
     struct UsageBarrier
     {
         Queue& parent;
@@ -129,6 +131,8 @@ class Queue : public boost::enable_share
     UsageBarrier barrier;
     int autoDeleteTimeout;
     boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
+    // Allow dispatching consumer threads to be stopped.
+    sys::Stoppable dispatching;
 
     void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
     void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -385,9 +389,21 @@ class Queue : public boost::enable_share
 
     uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
     void setDequeueSincePurge(uint32_t value);
+
+    /** Stop consumers. Return when all consumer threads are stopped.
+     *@pre Queue is active and not already stopping.
+     */
+    void stop();
+
+    /** Start consumers.
+     *@pre Queue is stopped and idle: no thread in dispatch.
+     */
+    void start();
+
+    /** Context data attached and used by cluster code. */
+    boost::intrusive_ptr<qpid::RefCounted> clusterContext;
 };
-}
-}
+}} // qpid::broker
 
 
 #endif  /*!_broker_Queue_h*/

Added: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h?rev=1165881&view=auto
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h (added)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h Tue Sep  6 21:46:33 2011
@@ -0,0 +1,91 @@
+#ifndef QPID_SYS_STOPPABLE_H
+#define QPID_SYS_STOPPABLE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace sys {
+
+/**
+ * An activity that may be executed by multiple threads, and can be stopped.
+ * Stopping prevents new threads from entering and waits till exiting busy threads leave.
+ */
+class Stoppable {
+  public:
+    Stoppable() : busy(0), stopped(false) {}
+    ~Stoppable() { stop(); }
+
+    /** Mark the scope of a busy thread like this:
+     * <pre>
+     * {
+     *   Stoppable::Scope working(stoppable);
+     *   if (working) { do stuff }
+     * }
+     * </pre>
+     */
+    class Scope {
+        Stoppable& state;
+        bool entered;
+      public:
+        Scope(Stoppable& s) : state(s) { entered = s.enter(); }
+        ~Scope() { if (entered) state.exit(); }
+        operator bool() const { return entered; }
+    };
+
+  friend class Scope;
+
+    /** Mark  stopped, wait for all threads to leave their busy scope. */
+    void stop() {
+        sys::Monitor::ScopedLock l(lock);
+        stopped = true;
+        while (busy > 0) lock.wait();
+    }
+
+    /** Set the state to started.
+     *@pre state is stopped and no theads are busy.
+     */
+    void start() {
+        sys::Monitor::ScopedLock l(lock);
+        assert(stopped && busy == 0); // FIXME aconway 2011-05-06: error handling.
+        stopped = false;
+    }
+
+  private:
+    uint busy;
+    bool stopped;
+    sys::Monitor lock;
+
+    bool enter() {
+        sys::Monitor::ScopedLock l(lock);
+        if (!stopped) ++busy;
+        return !stopped;
+    }
+
+    void exit() {
+        sys::Monitor::ScopedLock l(lock);
+        assert(busy > 0);
+        if (--busy == 0) lock.notifyAll();
+    }
+};
+
+}} // namespace qpid::sys
+
+#endif  /*!QPID_SYS_STOPPABLE_H*/

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/tests/QueueTest.cpp?rev=1165881&r1=1165880&r2=1165881&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/tests/QueueTest.cpp Tue Sep  6 21:46:33 2011
@@ -1,4 +1,4 @@
- /*
+/*
  *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -41,6 +41,7 @@
 
 #include <iostream>
 #include "boost/format.hpp"
+#include <boost/enable_shared_from_this.hpp>
 
 using boost::intrusive_ptr;
 using namespace qpid;
@@ -57,16 +58,22 @@ public:
     typedef boost::shared_ptr<TestConsumer> shared_ptr;
 
     intrusive_ptr<Message> last;
-    bool received;
-    TestConsumer(bool acquire = true):Consumer(acquire), received(false) {};
+    bool received, notified;
+
+    TestConsumer(bool acquire = true):
+        Consumer(acquire), received(false), notified(false) {};
 
     virtual bool deliver(QueuedMessage& msg){
         last = msg.payload;
         received = true;
         return true;
     };
-    void notify() {}
+    void notify() {
+        notified = true;
+    }
+
     OwnershipToken* getSession() { return 0; }
+    void reset() { last = intrusive_ptr<Message>(); received = false; }
 };
 
 class FailOnDeliver : public Deliverable
@@ -1114,6 +1121,30 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
     BOOST_CHECK_EQUAL(5u, tq9->getMessageCount());
 }
 
+QPID_AUTO_TEST_CASE(testStopStart) {
+    boost::shared_ptr<Queue> q(new Queue("foo"));
+    boost::shared_ptr<TestConsumer> c(new TestConsumer);
+    intrusive_ptr<Message> m = create_message("x","y");
+    q->consume(c);
+    // Initially q is started.
+    q->deliver(m);
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK(c->received);
+    c->reset();
+    // Stop q, should not receive message
+    q->stop();
+    q->deliver(m);
+    BOOST_CHECK(!q->dispatch(c));
+    BOOST_CHECK(!c->received);
+    BOOST_CHECK(!c->notified);
+    // Start q, should be notified and delivered
+    q->start();
+    q->deliver(m);
+    BOOST_CHECK(c->notified);
+    BOOST_CHECK(q->dispatch(c));
+    BOOST_CHECK(c->received);
+}
+
 
 QPID_AUTO_TEST_SUITE_END()
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org