You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/05/25 20:20:51 UTC

svn commit: r778464 - in /qpid/trunk/qpid/cpp/src/qpid: broker/QueueEvents.cpp broker/QueueEvents.h cluster/Connection.h cluster/Event.cpp cluster/Event.h cluster/Multicaster.cpp cluster/Multicaster.h cluster/PollableQueue.h sys/PollableQueue.h

Author: aconway
Date: Mon May 25 18:20:50 2009
New Revision: 778464

URL: http://svn.apache.org/viewvc?rev=778464&view=rev
Log:
PollableQueue optimization - replace deque with vector.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
    qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.cpp Mon May 25 18:20:50 2009
@@ -66,15 +66,14 @@
     }
 }
 
-void QueueEvents::handle(EventQueue::Queue& events)
-{
+QueueEvents::EventQueue::Batch::const_iterator
+QueueEvents::handle(const EventQueue::Batch& events) {
     qpid::sys::Mutex::ScopedLock l(lock);
-    while (!events.empty()) {
-        for (Listeners::iterator i = listeners.begin(); i != listeners.end(); i++) {
-            i->second(events.front());
-        }
-        events.pop_front();
+    for (EventQueue::Batch::const_iterator i = events.begin(); i != events.end(); ++i) {
+        for (Listeners::iterator j = listeners.begin(); j != listeners.end(); j++) 
+            j->second(*i);
     }
+    return events.end();
 }
 
 void QueueEvents::shutdown()

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueEvents.h Mon May 25 18:20:50 2009
@@ -74,7 +74,7 @@
     volatile bool enabled;
     qpid::sys::Mutex lock;//protect listeners from concurrent access
     
-    void handle(EventQueue::Queue& e);
+    EventQueue::Batch::const_iterator handle(const EventQueue::Batch& e);
 
 };
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Mon May 25 18:20:50 2009
@@ -61,8 +61,6 @@
         
 {
   public:
-    typedef sys::PollableQueue<EventFrame> PollableFrameQueue;
-
     /** Local connection. */
     Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& logId, MemberId, bool catchUp, bool isLink);
     /** Shadow connection. */

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Mon May 25 18:20:50 2009
@@ -87,7 +87,7 @@
     return control(framing::AMQFrame(body), cid);
 }
 
-iovec Event::toIovec() {
+iovec Event::toIovec() const {
     encodeHeader();
     iovec iov = { const_cast<char*>(getStore()), getStoreSize() };
     return iov;
@@ -103,8 +103,8 @@
 }
 
 // Encode my header in my buffer.
-void Event::encodeHeader () {
-    Buffer b(getStore(), HEADER_SIZE);
+void Event::encodeHeader () const {
+    Buffer b(const_cast<char*>(getStore()), HEADER_SIZE);
     encode(b);
     assert(b.getPosition() == HEADER_SIZE);
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Mon May 25 18:20:50 2009
@@ -54,7 +54,7 @@
     /** Size of payload data, excluding header. */
     size_t getSize() const { return size; }
     /** Size of header + payload. */ 
-    size_t getStoreSize() { return size + HEADER_SIZE; }
+    size_t getStoreSize() const { return size + HEADER_SIZE; }
 
     bool isCluster() const { return connectionId.getNumber() == 0; }
     bool isConnection() const { return connectionId.getNumber() != 0; }
@@ -99,10 +99,10 @@
     
     operator framing::Buffer() const;
 
-    iovec toIovec();
+    iovec toIovec() const;
     
   private:
-    void encodeHeader();
+    void encodeHeader() const;
 
     RefCountedBuffer::pointer store;
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Mon May 25 18:20:50 2009
@@ -71,9 +71,9 @@
 }
 
 
-void Multicaster::sendMcast(PollableEventQueue::Queue& values) {
+Multicaster::PollableEventQueue::Batch::const_iterator Multicaster::sendMcast(const PollableEventQueue::Batch& values) {
     try {
-        PollableEventQueue::Queue::iterator i = values.begin();
+        PollableEventQueue::Batch::const_iterator i = values.begin();
         while( i != values.end()) {
             iovec iov = i->toIovec();
             if (!cpg.mcast(&iov, 1)) {
@@ -82,12 +82,13 @@
             }
             ++i;
         }
-        values.erase(values.begin(), i); // Erase sent events.
+        return i;
     }
     catch (const std::exception& e) {
         QPID_LOG(critical, "Multicast error: " << e.what());
         queue.stop();
         onError();
+        return values.end();
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Mon May 25 18:20:50 2009
@@ -28,6 +28,7 @@
 #include "qpid/sys/Mutex.h"
 #include "qpid/sys/LatencyTracker.h"
 #include <boost/shared_ptr.hpp>
+#include <deque>
 
 namespace qpid {
 
@@ -63,7 +64,7 @@
     typedef sys::PollableQueue<Event> PollableEventQueue;
     typedef std::deque<Event> PlainEventQueue;
 
-    void sendMcast(PollableEventQueue::Queue& );
+    PollableEventQueue::Batch::const_iterator sendMcast(const PollableEventQueue::Batch& );
 
     sys::Mutex lock;
     boost::function<void()> onError;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Mon May 25 18:20:50 2009
@@ -37,24 +37,27 @@
     typedef boost::function<void (const T&)> Callback;
     typedef boost::function<void()> ErrorCallback;
 
-    PollableQueue(Callback f, ErrorCallback err, const std::string& msg, const boost::shared_ptr<sys::Poller>& poller)
-        : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1), poller),
+    PollableQueue(Callback f, ErrorCallback err, const std::string& msg,
+                  const boost::shared_ptr<sys::Poller>& poller)
+        : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1),
+                                poller),
           callback(f), error(err), message(msg) {}
 
-    void handleBatch(typename sys::PollableQueue<T>::Queue& values) {
+    typename sys::PollableQueue<T>::Batch::const_iterator
+    handleBatch(const typename sys::PollableQueue<T>::Batch& values) {
         try {
-            typename sys::PollableQueue<T>::Queue::iterator i = values.begin();
+            typename sys::PollableQueue<T>::Batch::const_iterator i = values.begin();
             while (i != values.end() && !this->isStopped()) {
                 callback(*i);
                 ++i;
             }
-            values.erase(values.begin(), i);
+            return i;
         }
         catch (const std::exception& e) {
             QPID_LOG(error, message << ": " << e.what());
-            values.clear();
             this->stop();
             error();
+            return values.end();
         }
     }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=778464&r1=778463&r2=778464&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Mon May 25 18:20:50 2009
@@ -28,7 +28,7 @@
 #include <boost/function.hpp>
 #include <boost/bind.hpp>
 #include <algorithm>
-#include <deque>
+#include <vector>
 
 namespace qpid {
 namespace sys {
@@ -44,16 +44,18 @@
 template <class T>
 class PollableQueue {
   public:
-    typedef std::deque<T> Queue;
+    typedef std::vector<T> Batch;
     typedef T value_type;
 
     /**
      * Callback to process a batch of items from the queue.
      *
-     * @param values  Queue of values to process. Any items remaining
+     * @param batch  Queue of values to process. Any items remaining
      *                on return from Callback are put back on the queue.
+     * @return iterator pointing to the first un-processed item in batch.
+     * Items from this point up to batch.end() are put back on the queue.
      */
-    typedef boost::function<void (Queue&)> Callback;
+    typedef boost::function<typename Batch::const_iterator (const Batch& batch)> Callback;
 
     /**
      * Constructor; sets necessary parameters.
@@ -99,7 +101,7 @@
     mutable sys::Monitor lock;
     Callback callback;
     PollableCondition condition;
-    Queue queue, batch;
+    Batch queue, batch;
     Thread dispatcher;
     bool stopped;
 };
@@ -141,17 +143,18 @@
 }
 
 template <class T> void PollableQueue<T>::process() {
+    // Called with lock held
     while (!stopped && !queue.empty()) {
         assert(batch.empty());
         batch.swap(queue);
+        typename Batch::const_iterator putBack;
         {
             ScopedUnlock u(lock);   // Allow concurrent push to queue.
-            callback(batch);
-        }
-        if (!batch.empty()) {
-            queue.insert(queue.begin(), batch.begin(), batch.end()); // put back unprocessed items.
-            batch.clear();
+            putBack = callback(batch);
         }
+        // put back unprocessed items.
+        queue.insert(queue.begin(), putBack, typename Batch::const_iterator(batch.end())); 
+        batch.clear();
     }
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org