You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2011/09/06 23:46:34 UTC
svn commit: r1165881 - in /qpid/branches/qpid-2920-1/qpid/cpp/src:
qpid/broker/Queue.cpp qpid/broker/Queue.h qpid/sys/Stoppable.h
tests/QueueTest.cpp
Author: aconway
Date: Tue Sep 6 21:46:33 2011
New Revision: 1165881
URL: http://svn.apache.org/viewvc?rev=1165881&view=rev
Log:
QPID-2920: Allow stopping consumers on queues.
Stop consumers from dispatching and wait for already dispatching consumers to exit.
Added:
qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h (with props)
Modified:
qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp
qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.h
qpid/branches/qpid-2920-1/qpid/cpp/src/tests/QueueTest.cpp
Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp?rev=1165881&r1=1165880&r2=1165881&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.cpp Tue Sep 6 21:46:33 2011
@@ -373,11 +373,18 @@ void Queue::removeListener(Consumer::sha
bool Queue::dispatch(Consumer::shared_ptr c)
{
- QueuedMessage msg(this);
- if (getNextMessage(msg, c)) {
- c->deliver(msg);
- return true;
- } else {
+ Stoppable::Scope doDispatch(dispatching);
+ if (doDispatch) {
+ QueuedMessage msg(this);
+ if (getNextMessage(msg, c)) {
+ c->deliver(msg);
+ return true;
+ } else {
+ return false;
+ }
+ } else { // Dispatching is stopped
+ Mutex::ScopedLock locker(messageLock);
+ listeners.addListener(c); // FIXME aconway 2011-05-05:
return false;
}
}
@@ -1265,3 +1272,13 @@ void Queue::UsageBarrier::destroy()
parent.deleted = true;
while (count) parent.messageLock.wait();
}
+
+// FIXME aconway 2011-05-06: naming - only affects consumers. stopDispatch()?
+void Queue::stop() {
+ dispatching.stop();
+}
+
+void Queue::start() {
+ dispatching.start();
+ notifyListener();
+}
Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.h?rev=1165881&r1=1165880&r2=1165881&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.h (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/broker/Queue.h Tue Sep 6 21:46:33 2011
@@ -36,6 +36,7 @@
#include "qpid/framing/FieldTable.h"
#include "qpid/sys/AtomicValue.h"
#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Stoppable.h"
#include "qpid/sys/Timer.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/broker/Queue.h"
@@ -70,6 +71,7 @@ class Exchange;
class Queue : public boost::enable_shared_from_this<Queue>,
public PersistableQueue, public management::Manageable {
+ // Used to prevent destruction of the queue while it is in use.
struct UsageBarrier
{
Queue& parent;
@@ -129,6 +131,8 @@ class Queue : public boost::enable_share
UsageBarrier barrier;
int autoDeleteTimeout;
boost::intrusive_ptr<qpid::sys::TimerTask> autoDeleteTask;
+ // Allow dispatching consumer threads to be stopped.
+ sys::Stoppable dispatching;
void push(boost::intrusive_ptr<Message>& msg, bool isRecovery=false);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
@@ -385,9 +389,21 @@ class Queue : public boost::enable_share
uint32_t getDequeueSincePurge() { return dequeueSincePurge.get(); }
void setDequeueSincePurge(uint32_t value);
+
+ /** Stop consumers. Return when all consumer threads are stopped.
+ *@pre Queue is active and not already stopping.
+ */
+ void stop();
+
+ /** Start consumers.
+ *@pre Queue is stopped and idle: no thread in dispatch.
+ */
+ void start();
+
+ /** Context data attached and used by cluster code. */
+ boost::intrusive_ptr<qpid::RefCounted> clusterContext;
};
-}
-}
+}} // qpid::broker
#endif /*!_broker_Queue_h*/
Added: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h?rev=1165881&view=auto
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h (added)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h Tue Sep 6 21:46:33 2011
@@ -0,0 +1,91 @@
+#ifndef QPID_SYS_STOPPABLE_H
+#define QPID_SYS_STOPPABLE_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+namespace qpid {
+namespace sys {
+
+/**
+ * An activity that may be executed by multiple threads, and can be stopped.
+ * Stopping prevents new threads from entering and waits till exiting busy threads leave.
+ */
+class Stoppable {
+ public:
+ Stoppable() : busy(0), stopped(false) {}
+ ~Stoppable() { stop(); }
+
+ /** Mark the scope of a busy thread like this:
+ * <pre>
+ * {
+ * Stoppable::Scope working(stoppable);
+ * if (working) { do stuff }
+ * }
+ * </pre>
+ */
+ class Scope {
+ Stoppable& state;
+ bool entered;
+ public:
+ Scope(Stoppable& s) : state(s) { entered = s.enter(); }
+ ~Scope() { if (entered) state.exit(); }
+ operator bool() const { return entered; }
+ };
+
+ friend class Scope;
+
+ /** Mark stopped, wait for all threads to leave their busy scope. */
+ void stop() {
+ sys::Monitor::ScopedLock l(lock);
+ stopped = true;
+ while (busy > 0) lock.wait();
+ }
+
+ /** Set the state to started.
+ *@pre state is stopped and no theads are busy.
+ */
+ void start() {
+ sys::Monitor::ScopedLock l(lock);
+ assert(stopped && busy == 0); // FIXME aconway 2011-05-06: error handling.
+ stopped = false;
+ }
+
+ private:
+ uint busy;
+ bool stopped;
+ sys::Monitor lock;
+
+ bool enter() {
+ sys::Monitor::ScopedLock l(lock);
+ if (!stopped) ++busy;
+ return !stopped;
+ }
+
+ void exit() {
+ sys::Monitor::ScopedLock l(lock);
+ assert(busy > 0);
+ if (--busy == 0) lock.notifyAll();
+ }
+};
+
+}} // namespace qpid::sys
+
+#endif /*!QPID_SYS_STOPPABLE_H*/
Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/branches/qpid-2920-1/qpid/cpp/src/qpid/sys/Stoppable.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: qpid/branches/qpid-2920-1/qpid/cpp/src/tests/QueueTest.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-2920-1/qpid/cpp/src/tests/QueueTest.cpp?rev=1165881&r1=1165880&r2=1165881&view=diff
==============================================================================
--- qpid/branches/qpid-2920-1/qpid/cpp/src/tests/QueueTest.cpp (original)
+++ qpid/branches/qpid-2920-1/qpid/cpp/src/tests/QueueTest.cpp Tue Sep 6 21:46:33 2011
@@ -1,4 +1,4 @@
- /*
+/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -41,6 +41,7 @@
#include <iostream>
#include "boost/format.hpp"
+#include <boost/enable_shared_from_this.hpp>
using boost::intrusive_ptr;
using namespace qpid;
@@ -57,16 +58,22 @@ public:
typedef boost::shared_ptr<TestConsumer> shared_ptr;
intrusive_ptr<Message> last;
- bool received;
- TestConsumer(bool acquire = true):Consumer(acquire), received(false) {};
+ bool received, notified;
+
+ TestConsumer(bool acquire = true):
+ Consumer(acquire), received(false), notified(false) {};
virtual bool deliver(QueuedMessage& msg){
last = msg.payload;
received = true;
return true;
};
- void notify() {}
+ void notify() {
+ notified = true;
+ }
+
OwnershipToken* getSession() { return 0; }
+ void reset() { last = intrusive_ptr<Message>(); received = false; }
};
class FailOnDeliver : public Deliverable
@@ -1114,6 +1121,30 @@ QPID_AUTO_TEST_CASE(testFlowToDiskBlocki
BOOST_CHECK_EQUAL(5u, tq9->getMessageCount());
}
+QPID_AUTO_TEST_CASE(testStopStart) {
+ boost::shared_ptr<Queue> q(new Queue("foo"));
+ boost::shared_ptr<TestConsumer> c(new TestConsumer);
+ intrusive_ptr<Message> m = create_message("x","y");
+ q->consume(c);
+ // Initially q is started.
+ q->deliver(m);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK(c->received);
+ c->reset();
+ // Stop q, should not receive message
+ q->stop();
+ q->deliver(m);
+ BOOST_CHECK(!q->dispatch(c));
+ BOOST_CHECK(!c->received);
+ BOOST_CHECK(!c->notified);
+ // Start q, should be notified and delivered
+ q->start();
+ q->deliver(m);
+ BOOST_CHECK(c->notified);
+ BOOST_CHECK(q->dispatch(c));
+ BOOST_CHECK(c->received);
+}
+
QPID_AUTO_TEST_SUITE_END()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org