You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/08/22 01:35:23 UTC
svn commit: r568332 - in /incubator/qpid/trunk/qpid/cpp/src:
qpid/broker/BrokerQueue.cpp qpid/broker/BrokerQueue.h
qpid/sys/Serializer.cpp qpid/sys/Serializer.h tests/Serializer.cpp
Author: aconway
Date: Tue Aug 21 16:35:23 2007
New Revision: 568332
URL: http://svn.apache.org/viewvc?rev=568332&view=rev
Log:
* src/qpid/sys/Serializer.h, .cpp:
Template Serializer on functor for execute().
Old Serializer equivalent to Serializer<boost::function<void()> >
* src/qpid/broker/BrokerQueue.h, .cpp:
Use hand-written functor for Serializer instead of boost::function.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h
incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?rev=568332&r1=568331&r2=568332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Tue Aug 21 16:35:23 2007
@@ -50,7 +50,7 @@
exclusive(0),
persistenceId(0),
serializer(false),
- dispatchCallback(boost::bind(&Queue::dispatch, this))
+ dispatchCallback(*this)
{
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?rev=568332&r1=568331&r2=568332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Tue Aug 21 16:35:23 2007
@@ -60,6 +60,12 @@
typedef std::vector<Consumer*> Consumers;
typedef std::deque<Message::shared_ptr> Messages;
+ struct DispatchFunctor {
+ Queue& queue;
+ DispatchFunctor(Queue& q) : queue(q) {}
+ void operator()() { queue.dispatch(); }
+ };
+
const string name;
const bool autodelete;
MessageStore* const store;
@@ -75,8 +81,8 @@
std::auto_ptr<QueuePolicy> policy;
QueueBindings bindings;
boost::shared_ptr<Exchange> alternateExchange;
- qpid::sys::Serializer serializer;
- qpid::sys::Serializer::Task dispatchCallback;
+ qpid::sys::Serializer<DispatchFunctor> serializer;
+ DispatchFunctor dispatchCallback;
void pop();
void push(Message::shared_ptr& msg);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.cpp?rev=568332&r1=568331&r2=568332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.cpp Tue Aug 21 16:35:23 2007
@@ -29,14 +29,14 @@
namespace qpid {
namespace sys {
-Serializer::Serializer(bool allowImmediate, Task notifyDispatchFn)
+SerializerBase::SerializerBase(bool allowImmediate, VoidFn0 notifyDispatchFn)
: state(IDLE), immediate(allowImmediate), notifyDispatch(notifyDispatchFn)
{
if (notifyDispatch.empty())
- notifyDispatch = boost::bind(&Serializer::notifyWorker, this);
+ notifyDispatch = boost::bind(&SerializerBase::notifyWorker, this);
}
-Serializer::~Serializer() {
+SerializerBase::~SerializerBase() {
{
Mutex::ScopedLock l(lock);
state = SHUTDOWN;
@@ -46,75 +46,14 @@
worker.join();
}
-void Serializer::dispatch(Task& task) {
- Mutex::ScopedUnlock u(lock);
- // Preconditions: lock is held, state is EXECUTING or DISPATCHING
- assert(state != IDLE);
- assert(state != SHUTDOWN);
- assert(state == EXECUTING || state == DISPATCHING);
- try {
- task();
- } catch (const std::exception& e) {
- QPID_LOG(critical, "Unexpected exception in Serializer::dispatch"
- << e.what());
- assert(0); // Should not happen.
- } catch (...) {
- QPID_LOG(critical, "Unexpected exception in Serializer::dispatch.");
- assert(0); // Should not happen.
- }
-}
-
-void Serializer::execute(Task& task) {
- bool needNotify = false;
- {
- Mutex::ScopedLock l(lock);
- assert(state != SHUTDOWN);
- if (immediate && state == IDLE) {
- state = EXECUTING;
- dispatch(task);
- if (state != SHUTDOWN) {
- assert(state == EXECUTING);
- state = IDLE;
- }
- }
- else
- queue.push_back(task);
-
- if (!queue.empty() && state == IDLE) {
- state = DISPATCHING;
- needNotify = true;
- }
- }
- if (needNotify)
- notifyDispatch(); // Not my function, call outside lock.
-}
-
-void Serializer::dispatch() {
- Mutex::ScopedLock l(lock);
- // TODO aconway 2007-07-16: This loop could be unbounded
- // if other threads add work while we're in dispatch(Task&).
- // If we need to bound it we could dispatch just the elements
- // that were enqueued when dispatch() was first called - save
- // begin() iterator and pop only up to that.
- while (!queue.empty() && state != SHUTDOWN) {
- assert(state == DISPATCHING);
- dispatch(queue.front());
- queue.pop_front();
- }
- if (state != SHUTDOWN) {
- assert(state == DISPATCHING);
- state = IDLE;
- }
-}
-
-void Serializer::notifyWorker() {
+void SerializerBase::notifyWorker() {
if (!worker.id())
worker = Thread(*this);
else
lock.notify();
}
-void Serializer::run() {
+void SerializerBase::run() {
Mutex::ScopedLock l(lock);
while (state != SHUTDOWN) {
dispatch();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h?rev=568332&r1=568331&r2=568332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h Tue Aug 21 16:35:23 2007
@@ -36,29 +36,69 @@
namespace qpid {
namespace sys {
+/** Abstract base class for Serializer below. */
+class SerializerBase : private boost::noncopyable, private Runnable
+{
+ public:
+ typedef boost::function<void()> VoidFn0;
+ /** @see Serializer::Serializer */
+ SerializerBase(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0());
+
+ virtual ~SerializerBase();
+
+ virtual void dispatch() = 0;
+ protected:
+ enum State {
+ IDLE, ///< No threads are active.
+ EXECUTING, ///< execute() is executing a single task.
+ DISPATCHING, ///< dispatch() is draining the queue.
+ SHUTDOWN ///< SerailizerBase is being destroyed.
+ };
+
+ void notifyWorker();
+ void run();
+ virtual bool empty() = 0;
+
+ Monitor lock;
+ State state;
+ bool immediate;
+ Thread worker;
+ boost::function<void()> notifyDispatch;
+};
+
+
/**
* Execute tasks sequentially, queuing tasks when necessary to
* ensure only one thread at a time executes a task and tasks
* are executed in order.
+ *
+ * Task is a void returning 0-arg functor. It must not throw exceptions.
+ *
+ * Note we deliberately do not use boost::function as the task type
+ * because copying a boost::functor allocates the target object on the
+ * heap.
*/
-class Serializer : private boost::noncopyable, private Runnable
-{
- public:
- typedef boost::function<void()> Task;
+template <class Task>
+class Serializer : public SerializerBase {
+ std::deque<Task> queue;
+
+ bool empty() { return queue.empty(); }
+ void dispatch(Task& task);
+
+ public:
/** Start a serializer.
*
* @param notifyDispatch Called when work is pending and there is no
* active dispatch thread. Must arrange for dispatch() to be called
* in some thread other than the calling thread and return.
- * By default the Serializer supplies its own dispatch thread.
+ * By default the Serailizer supplies its own dispatch thread.
*
* @param immediate Allow execute() to execute a task immediatly
* in the current thread.
*/
- Serializer(bool immediate=true, Task notifyDispatch=Task());
-
- ~Serializer();
+ Serializer(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0())
+ : SerializerBase(immediate, notifyDispatch) {}
/**
* Task may be executed immediately in the calling thread if there
@@ -68,33 +108,73 @@
*/
void execute(Task& task);
+
/** Execute pending tasks sequentially in calling thread.
* Drains the task queue and returns, does not block for more tasks.
*
* @exception ShutdownException if the serializer is being destroyed.
*/
void dispatch();
-
- private:
- enum State {
- IDLE, ///< No threads are active.
- EXECUTING, ///< execute() is executing a single task.
- DISPATCHING, ///< dispatch() is draining the queue.
- SHUTDOWN ///< Serializer is being destroyed.
};
- void dispatch(Task&);
- void notifyWorker();
- void run();
- Monitor lock;
+template <class Task>
+void Serializer<Task>::execute(Task& task) {
+ bool needNotify = false;
+ {
+ Mutex::ScopedLock l(lock);
+ assert(state != SHUTDOWN);
+ if (immediate && state == IDLE) {
+ state = EXECUTING;
+ dispatch(task);
+ if (state != SHUTDOWN) {
+ assert(state == EXECUTING);
+ state = IDLE;
+ }
+ }
+ else
+ queue.push_back(task);
+ if (!queue.empty() && state == IDLE) {
+ state = DISPATCHING;
+ needNotify = true;
+ }
+ }
+ if (needNotify)
+ notifyDispatch(); // Not my function, call outside lock.
+}
+
+template <class Task>
+void Serializer<Task>::dispatch() {
+ Mutex::ScopedLock l(lock);
+ // TODO aconway 2007-07-16: This loop could be unbounded
+ // if other threads add work while we're in dispatch(Task&).
+ // If we need to bound it we could dispatch just the elements
+ // that were enqueued when dispatch() was first called - save
+ // begin() iterator and pop only up to that.
+ while (!queue.empty() && state != SHUTDOWN) {
+ assert(state == DISPATCHING);
+ dispatch(queue.front());
+ queue.pop_front();
+ }
+ if (state != SHUTDOWN) {
+ assert(state == DISPATCHING);
+ state = IDLE;
+ }
+}
+
+template <class Task>
+void Serializer<Task>::dispatch(Task& task) {
+ Mutex::ScopedUnlock u(lock);
+ // Preconditions: lock is held, state is EXECUTING or DISPATCHING
+ assert(state != IDLE);
+ assert(state != SHUTDOWN);
+ assert(state == EXECUTING || state == DISPATCHING);
+ // No exceptions allowed in task.
+ try { task(); } catch (...) { assert(0); }
+}
+
+
- State state;
- bool immediate;
- std::deque<Task> queue;
- Thread worker;
- Task notifyDispatch;
-};
}} // namespace qpid::sys
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp?rev=568332&r1=568331&r2=568332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp Tue Aug 21 16:35:23 2007
@@ -38,6 +38,7 @@
using namespace qpid::framing;
using namespace std;
+typedef Serializer<boost::function<void()> > BoostFunctionSerializer;
/** Test for concurrent calls */
struct Tester {
@@ -61,7 +62,7 @@
}
};
-void execute(Serializer& s, Serializer::Task t)
+void execute(BoostFunctionSerializer& s, boost::function<void()> t)
{
s.execute(t);
}
@@ -69,7 +70,7 @@
BOOST_AUTO_TEST_CASE(testSingleThread) {
// Verify that we call in the same thread by default.
Tester tester;
- Serializer s;
+ BoostFunctionSerializer s;
for (int i = 0; i < 100; ++i)
execute(s, boost::bind(&Tester::test, &tester));
// All should be executed in this thread.
@@ -83,7 +84,7 @@
BOOST_AUTO_TEST_CASE(testSingleThreadNoImmediate) {
// Verify that we call in different thread if immediate=false.
Tester tester;
- Serializer s(false);
+ BoostFunctionSerializer s(false);
for (int i = 0; i < 100; ++i)
execute(s, boost::bind(&Tester::test, &tester));
{
@@ -99,13 +100,13 @@
}
struct Caller : public Runnable, public Tester {
- Caller(Serializer& s) : serializer(s) {}
+ Caller(BoostFunctionSerializer& s) : serializer(s) {}
void run() { execute(serializer, boost::bind(&Tester::test, this)); }
- Serializer& serializer;
+ BoostFunctionSerializer& serializer;
};
BOOST_AUTO_TEST_CASE(testDispatchThread) {
- Serializer s;
+ BoostFunctionSerializer s;
Caller caller(s);
Thread threads[100];
// Concurrent calls.
@@ -121,7 +122,7 @@
}
-std::auto_ptr<Serializer> serializer;
+std::auto_ptr<BoostFunctionSerializer> serializer;
struct CallDispatch : public Runnable {
void run() {
@@ -136,7 +137,7 @@
// Use externally created threads.
BOOST_AUTO_TEST_CASE(testExternalDispatch) {
- serializer.reset(new Serializer(false, ¬ifyDispatch));
+ serializer.reset(new BoostFunctionSerializer(false, ¬ifyDispatch));
Tester tester;
for (int i = 0; i < 100; ++i)
execute(*serializer, boost::bind(&Tester::test, &tester));