You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/08/12 23:03:43 UTC

svn commit: r685317 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp Cluster.h

Author: aconway
Date: Tue Aug 12 14:03:43 2008
New Revision: 685317

URL: http://svn.apache.org/viewvc?rev=685317&view=rev
Log:
Queue cluster send frames, do cpg_mcast in separate thread, batching if possible.
5x thruput improvement :)

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=685317&r1=685316&r2=685317&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Aug 12 14:03:43 2008
@@ -68,7 +68,8 @@
                       0,                                         // write
                       boost::bind(&Cluster::disconnect, this, _1) // disconnect
     ),
-    deliverQueue(boost::bind(&Cluster::deliverFrames, this, _1, _2))
+    deliverQueue(boost::bind(&Cluster::deliverQueueCb, this, _1, _2)),
+    mcastQueue(boost::bind(&Cluster::mcastQueueCb, this, _1, _2))
 {
     broker->addFinalizer(boost::bind(&Cluster::leave, this));
     QPID_LOG(trace, "Joining cluster: " << name_);
@@ -83,6 +84,7 @@
     // Start dispatching from the poller.
     cpgDispatchHandle.startWatch(poller);
     deliverQueue.start(poller);
+    mcastQueue.start(poller);
 }
 
 Cluster::~Cluster() {}
@@ -124,12 +126,26 @@
 
 void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) {
     QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
-    char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
-    Buffer buf(buffer);
-    frame.encode(buf);
-    encodePtr(buf, connection);
-    iovec iov = { buffer, buf.getPosition() };
-    cpg.mcast(name, &iov, 1);
+    mcastQueue.push(Message(frame, self, connection));
+}
+
+void Cluster::mcastQueueCb(const MessageQueue::iterator& begin,
+                           const MessageQueue::iterator& end)
+{
+    // Static is OK because there is only one cluster allowed per
+    // process and only one thread in mcastQueueCb at a time.
+    static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
+    MessageQueue::iterator i = begin;
+    while (i != end) {
+        Buffer buf(buffer, sizeof(buffer));
+        while (i != end && buf.available() > i->frame.size() + sizeof(uint64_t)) {
+            i->frame.encode(buf);
+            encodePtr(buf, i->connection);
+            ++i;
+        }
+        iovec iov = { buffer, buf.getPosition() };
+        cpg.mcast(name, &iov, 1);
+    }
 }
 
 void Cluster::notify() {
@@ -181,12 +197,14 @@
     Id from(nodeid, pid);
     try {
         Buffer buf(static_cast<char*>(msg), msg_len);
-        AMQFrame frame;
-        if (!frame.decode(buf))  // Not enough data.
-            throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: error handling.
-        void* connection;
-        decodePtr(buf, connection);
-        deliverQueue.push(DeliveredFrame(frame, from, connection));
+        while (buf.available() > 0) {
+            AMQFrame frame;
+            if (!frame.decode(buf))  // Not enough data.
+                throw Exception("Received incomplete cluster event.");
+            void* connection;
+            decodePtr(buf, connection);
+            deliverQueue.push(Message(frame, from, connection));
+        }
     }
     catch (const std::exception& e) {
         // FIXME aconway 2008-01-30: exception handling.
@@ -196,10 +214,10 @@
     }
 }
 
-void Cluster::deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin,
-                            const PollableQueue<DeliveredFrame>::iterator& end)
+void Cluster::deliverQueueCb(const MessageQueue::iterator& begin,
+                             const MessageQueue::iterator& end)
 {
-    for (PollableQueue<DeliveredFrame>::iterator i = begin; i != end; ++i) {
+    for (MessageQueue::iterator i = begin; i != end; ++i) {
         AMQFrame& frame(i->frame);
         Id from(i->from);
         ConnectionInterceptor* connection = reinterpret_cast<ConnectionInterceptor*>(i->connection);
@@ -220,7 +238,7 @@
         }
         catch (const std::exception& e) {
             // FIXME aconway 2008-01-30: exception handling.
-            QPID_LOG(critical, "Error in cluster deliverFrame: " << e.what());
+            QPID_LOG(critical, "Error in cluster deliverQueueCb: " << e.what());
             assert(0);
             throw;
         }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=685317&r1=685316&r2=685317&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Aug 12 14:03:43 2008
@@ -97,11 +97,13 @@
     typedef std::map<Id, Member>  MemberMap;
     typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap;
 
-    struct DeliveredFrame {
+    /** Message sent over the cluster. */
+    struct Message {
         framing::AMQFrame frame; Id from; void* connection;
-        DeliveredFrame(const framing::AMQFrame& f, const Id i, void* c)
+        Message(const framing::AMQFrame& f, const Id i, void* c)
             : frame(f), from(i), connection(c) {}
     };
+    typedef PollableQueue<Message> MessageQueue;
 
     boost::function<void()> shutdownNext;
     
@@ -126,10 +128,17 @@
     );
 
     /** Callback to handle delivered frames from the deliverQueue. */
-    void deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin,
-                       const PollableQueue<DeliveredFrame>::iterator& end);
+    void deliverQueueCb(const MessageQueue::iterator& begin,
+                      const MessageQueue::iterator& end);
 
+    /** Callback to multi-cast frames from mcastQueue */
+    void mcastQueueCb(const MessageQueue::iterator& begin,
+                    const MessageQueue::iterator& end);
+
+
+    /** Callback to dispatch CPG events. */
     void dispatch(sys::DispatchHandle&);
+    /** Callback if CPG fd is disconnected. */
     void disconnect(sys::DispatchHandle&);
 
     void handleMethod(Id from, ConnectionInterceptor* connection, framing::AMQMethodBody& method);
@@ -147,7 +156,8 @@
     ShadowConnectionMap shadowConnectionMap;
     ShadowConnectionOutputHandler shadowOut;
     sys::DispatchHandle cpgDispatchHandle;
-    PollableQueue<DeliveredFrame> deliverQueue;
+    MessageQueue deliverQueue;
+    MessageQueue mcastQueue;
 
   friend std::ostream& operator <<(std::ostream&, const Cluster&);
   friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);