You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2007/08/22 01:35:23 UTC

svn commit: r568332 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/broker/BrokerQueue.cpp qpid/broker/BrokerQueue.h qpid/sys/Serializer.cpp qpid/sys/Serializer.h tests/Serializer.cpp

Author: aconway
Date: Tue Aug 21 16:35:23 2007
New Revision: 568332

URL: http://svn.apache.org/viewvc?rev=568332&view=rev
Log:

	* src/qpid/sys/Serializer.h, .cpp:
	  Template Serializer on functor for execute().
	  Old Serializer equivalent to Serializer<boost::function<void()> >

	* src/qpid/broker/BrokerQueue.h, .cpp:
	  Use hand-written functor for Serializer instead of boost::function.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp?rev=568332&r1=568331&r2=568332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.cpp Tue Aug 21 16:35:23 2007
@@ -50,7 +50,7 @@
     exclusive(0),
     persistenceId(0),
     serializer(false),
-    dispatchCallback(boost::bind(&Queue::dispatch, this))
+    dispatchCallback(*this)
 {
 }
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h?rev=568332&r1=568331&r2=568332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerQueue.h Tue Aug 21 16:35:23 2007
@@ -60,6 +60,12 @@
             typedef std::vector<Consumer*> Consumers;
             typedef std::deque<Message::shared_ptr> Messages;
             
+            struct DispatchFunctor {
+                Queue& queue;
+                DispatchFunctor(Queue& q) : queue(q) {}
+                void operator()() { queue.dispatch(); }
+            };
+                
             const string name;
             const bool autodelete;
             MessageStore* const store;
@@ -75,8 +81,8 @@
             std::auto_ptr<QueuePolicy> policy;            
             QueueBindings bindings;
             boost::shared_ptr<Exchange> alternateExchange;
-            qpid::sys::Serializer serializer;
-            qpid::sys::Serializer::Task dispatchCallback;
+            qpid::sys::Serializer<DispatchFunctor> serializer;
+            DispatchFunctor dispatchCallback;
 
             void pop();
             void push(Message::shared_ptr& msg);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.cpp?rev=568332&r1=568331&r2=568332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.cpp Tue Aug 21 16:35:23 2007
@@ -29,14 +29,14 @@
 namespace qpid {
 namespace sys {
 
-Serializer::Serializer(bool allowImmediate, Task notifyDispatchFn)
+SerializerBase::SerializerBase(bool allowImmediate, VoidFn0 notifyDispatchFn)
     : state(IDLE), immediate(allowImmediate), notifyDispatch(notifyDispatchFn)
 {
     if (notifyDispatch.empty())
-        notifyDispatch = boost::bind(&Serializer::notifyWorker, this);
+        notifyDispatch = boost::bind(&SerializerBase::notifyWorker, this);
 }
 
-Serializer::~Serializer() {
+SerializerBase::~SerializerBase() {
     {
         Mutex::ScopedLock l(lock);
         state = SHUTDOWN;
@@ -46,75 +46,14 @@
         worker.join();
 }
 
-void Serializer::dispatch(Task& task) {
-    Mutex::ScopedUnlock u(lock);
-    // Preconditions: lock is held, state is EXECUTING or DISPATCHING
-    assert(state != IDLE);
-    assert(state != SHUTDOWN);
-    assert(state == EXECUTING || state == DISPATCHING);
-    try {
-        task();
-    } catch (const std::exception& e) {
-        QPID_LOG(critical, "Unexpected exception in Serializer::dispatch"
-                 << e.what());
-        assert(0);              // Should not happen.
-    } catch (...) {
-        QPID_LOG(critical, "Unexpected exception in Serializer::dispatch.");
-        assert(0);              // Should not happen.
-    }
-}
-
-void Serializer::execute(Task& task) {
-    bool needNotify = false;
-    {
-        Mutex::ScopedLock l(lock);
-        assert(state != SHUTDOWN);
-        if (immediate && state == IDLE) {
-            state = EXECUTING;
-            dispatch(task);
-            if (state != SHUTDOWN) {
-                assert(state == EXECUTING);
-                state = IDLE;
-            }
-        }
-        else 
-            queue.push_back(task);
-
-        if (!queue.empty() && state == IDLE) {
-            state = DISPATCHING;
-            needNotify = true;
-        }
-    }
-    if (needNotify)
-        notifyDispatch();       // Not my function, call outside lock.
-}
-
-void Serializer::dispatch() {
-    Mutex::ScopedLock l(lock);
-    // TODO aconway 2007-07-16: This loop could be unbounded
-    // if other threads add work while we're in dispatch(Task&).
-    // If we need to bound it we could dispatch just the elements
-    // that were enqueued when dispatch() was first called - save
-    // begin() iterator and pop only up to that.
-    while (!queue.empty() && state != SHUTDOWN) {
-        assert(state == DISPATCHING);
-        dispatch(queue.front());
-        queue.pop_front();
-    }
-    if (state != SHUTDOWN) {
-        assert(state == DISPATCHING);
-        state = IDLE;
-    }
-}
-
-void Serializer::notifyWorker() {
+void SerializerBase::notifyWorker() {
     if (!worker.id())
         worker = Thread(*this);
     else
         lock.notify();
 }
 
-void Serializer::run() {
+void SerializerBase::run() {
     Mutex::ScopedLock l(lock);
     while (state != SHUTDOWN) {
         dispatch();

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h?rev=568332&r1=568331&r2=568332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Serializer.h Tue Aug 21 16:35:23 2007
@@ -36,29 +36,69 @@
 namespace qpid {
 namespace sys {
 
+/** Abstract base class for Serializer below. */
+class SerializerBase : private boost::noncopyable, private Runnable
+{
+  public:
+    typedef boost::function<void()> VoidFn0;
+    /** @see Serializer::Serializer */
+    SerializerBase(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0());
+
+    virtual ~SerializerBase();
+
+    virtual void dispatch() = 0;
+  protected:
+    enum State {
+        IDLE, ///< No threads are active.
+        EXECUTING, ///< execute() is executing a single task.
+        DISPATCHING, ///< dispatch() is draining the queue.
+        SHUTDOWN ///< SerailizerBase is being destroyed.
+    };
+
+    void notifyWorker();
+    void run();
+    virtual bool empty() = 0;
+
+    Monitor lock;
+    State state;
+    bool immediate;
+    Thread worker;
+    boost::function<void()> notifyDispatch;
+};
+
+
 /**
  * Execute tasks sequentially, queuing tasks when necessary to
  * ensure only one thread at a time executes a task and tasks
  * are executed in order.
+ *
+ * Task is a void returning 0-arg functor. It must not throw exceptions.
+ * 
+ * Note we deliberately do not use boost::function as the task type
+ * because copying a boost::functor allocates the target object on the
+ * heap.
  */
-class Serializer : private boost::noncopyable, private Runnable
-{
-  public:
-    typedef boost::function<void()> Task;
+template <class Task>
+class Serializer : public SerializerBase {
 
+    std::deque<Task> queue;
+
+    bool empty() { return queue.empty(); }
+    void dispatch(Task& task);
+    
+  public:
     /** Start a serializer.
      *
      * @param notifyDispatch Called when work is pending and there is no
      * active dispatch thread. Must arrange for dispatch() to be called
      * in some thread other than the calling thread and return. 
-     * By default the Serializer supplies its own dispatch thread.
+     * By default the Serailizer supplies its own dispatch thread.
      *
      * @param immediate Allow execute() to execute a task immediatly
      * in the current thread.
      */
-    Serializer(bool immediate=true, Task notifyDispatch=Task());
-
-    ~Serializer();
+    Serializer(bool immediate=true, VoidFn0 notifyDispatch=VoidFn0())
+        : SerializerBase(immediate, notifyDispatch) {}
     
     /** 
      * Task may be executed immediately in the calling thread if there
@@ -68,33 +108,73 @@
      */
     void execute(Task& task);
 
+
     /** Execute pending tasks sequentially in calling thread.
      * Drains the task queue and returns, does not block for more tasks.
      * 
      * @exception ShutdownException if the serializer is being destroyed.
      */
     void dispatch();
-
-  private:
-    enum State {
-        IDLE, ///< No threads are active.
-        EXECUTING, ///< execute() is executing a single task.
-        DISPATCHING, ///< dispatch() is draining the queue.
-        SHUTDOWN ///< Serializer is being destroyed.
     };
 
-    void dispatch(Task&);
-    void notifyWorker();
-    void run();
 
-    Monitor lock;
+template <class Task>
+void Serializer<Task>::execute(Task& task) {
+    bool needNotify = false;
+    {
+        Mutex::ScopedLock l(lock);
+        assert(state != SHUTDOWN);
+        if (immediate && state == IDLE) {
+            state = EXECUTING;
+            dispatch(task);
+            if (state != SHUTDOWN) {
+                assert(state == EXECUTING);
+                state = IDLE;
+            }
+        }
+        else 
+            queue.push_back(task);
+        if (!queue.empty() && state == IDLE) {
+            state = DISPATCHING;
+            needNotify = true;
+        }
+    }
+    if (needNotify)
+        notifyDispatch();       // Not my function, call outside lock.
+}
+
+template <class Task>
+void Serializer<Task>::dispatch() {
+    Mutex::ScopedLock l(lock);
+    // TODO aconway 2007-07-16: This loop could be unbounded
+    // if other threads add work while we're in dispatch(Task&).
+    // If we need to bound it we could dispatch just the elements
+    // that were enqueued when dispatch() was first called - save
+    // begin() iterator and pop only up to that.
+    while (!queue.empty() && state != SHUTDOWN) {
+        assert(state == DISPATCHING);
+        dispatch(queue.front());
+        queue.pop_front();
+    }
+    if (state != SHUTDOWN) {
+        assert(state == DISPATCHING);
+        state = IDLE;
+    }
+}
+
+template <class Task>
+void Serializer<Task>::dispatch(Task& task) {
+    Mutex::ScopedUnlock u(lock);
+    // Preconditions: lock is held, state is EXECUTING or DISPATCHING
+    assert(state != IDLE);
+    assert(state != SHUTDOWN);
+    assert(state == EXECUTING || state == DISPATCHING);
+    // No exceptions allowed in task.
+    try { task(); } catch (...) { assert(0); }
+}
+
+
 
-    State state;
-    bool immediate;
-    std::deque<Task> queue;
-    Thread worker;
-    Task notifyDispatch;
-};
 
 }} // namespace qpid::sys
 

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp?rev=568332&r1=568331&r2=568332&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Serializer.cpp Tue Aug 21 16:35:23 2007
@@ -38,6 +38,7 @@
 using namespace qpid::framing;
 using namespace std;
 
+typedef Serializer<boost::function<void()> > BoostFunctionSerializer;
 
 /** Test for concurrent calls */
 struct Tester {
@@ -61,7 +62,7 @@
     }
 };
 
-void execute(Serializer& s, Serializer::Task t) 
+void execute(BoostFunctionSerializer& s, boost::function<void()> t) 
 {
     s.execute(t);
 }
@@ -69,7 +70,7 @@
 BOOST_AUTO_TEST_CASE(testSingleThread) {
     // Verify that we call in the same thread by default.
     Tester tester;
-    Serializer s;
+    BoostFunctionSerializer s;
     for (int i = 0; i < 100; ++i) 
         execute(s, boost::bind(&Tester::test, &tester));
     // All should be executed in this thread.
@@ -83,7 +84,7 @@
 BOOST_AUTO_TEST_CASE(testSingleThreadNoImmediate) {
     // Verify that we call in different thread if immediate=false.
     Tester tester;
-    Serializer s(false);
+    BoostFunctionSerializer s(false);
     for (int i = 0; i < 100; ++i)
         execute(s, boost::bind(&Tester::test, &tester));
     {
@@ -99,13 +100,13 @@
 }
 
 struct Caller : public Runnable, public Tester {
-    Caller(Serializer& s) : serializer(s) {}
+    Caller(BoostFunctionSerializer& s) : serializer(s) {}
     void run() { execute(serializer, boost::bind(&Tester::test, this)); }
-    Serializer& serializer;
+    BoostFunctionSerializer& serializer;
 };
 
 BOOST_AUTO_TEST_CASE(testDispatchThread) {
-    Serializer s;
+    BoostFunctionSerializer s;
     Caller caller(s);
     Thread threads[100];
     // Concurrent calls.
@@ -121,7 +122,7 @@
 }
 
 
-std::auto_ptr<Serializer> serializer;
+std::auto_ptr<BoostFunctionSerializer> serializer;
     
 struct CallDispatch : public Runnable {
     void run() {
@@ -136,7 +137,7 @@
 
 // Use externally created threads.
 BOOST_AUTO_TEST_CASE(testExternalDispatch) {
-    serializer.reset(new Serializer(false, &notifyDispatch));
+    serializer.reset(new BoostFunctionSerializer(false, &notifyDispatch));
     Tester tester;
     for (int i = 0; i < 100; ++i) 
         execute(*serializer, boost::bind(&Tester::test, &tester));