You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/01/21 21:40:38 UTC

svn commit: r736409 - in /qpid/trunk/qpid/cpp/src: cluster.mk qpid/cluster/Cluster.cpp qpid/cluster/Cluster.h qpid/cluster/Connection.cpp qpid/cluster/Connection.h qpid/cluster/Event.h qpid/cluster/EventFrame.h

Author: aconway
Date: Wed Jan 21 12:40:38 2009
New Revision: 736409

URL: http://svn.apache.org/viewvc?rev=736409&view=rev
Log:
cluster: Pipeline decoding. About 10% improvement in latency and throughput.

Added:
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/cluster.mk
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h

Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=736409&r1=736408&r2=736409&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Wed Jan 21 12:40:38 2009
@@ -36,29 +36,30 @@
 
 dmodule_LTLIBRARIES += cluster.la
 
-cluster_la_SOURCES = \
-  $(CMAN_SOURCES) \
-  qpid/cluster/Cluster.cpp \
-  qpid/cluster/Cluster.h \
+cluster_la_SOURCES =				\
+  $(CMAN_SOURCES)				\
+  qpid/cluster/Cluster.cpp			\
+  qpid/cluster/Cluster.h			\
   qpid/cluster/ClusterLeaveException.h		\
   qpid/cluster/ClusterMap.cpp			\
   qpid/cluster/ClusterMap.h			\
-  qpid/cluster/ClusterPlugin.cpp \
-  qpid/cluster/Connection.cpp \
+  qpid/cluster/ClusterPlugin.cpp		\
+  qpid/cluster/Connection.cpp			\
   qpid/cluster/Connection.h			\
   qpid/cluster/ConnectionCodec.cpp		\
   qpid/cluster/ConnectionCodec.h		\
-  qpid/cluster/ConnectionMap.h \
+  qpid/cluster/ConnectionMap.h			\
   qpid/cluster/Cpg.cpp				\
   qpid/cluster/Cpg.h				\
   qpid/cluster/Dispatchable.h			\
-  qpid/cluster/DumpClient.cpp \
+  qpid/cluster/DumpClient.cpp			\
   qpid/cluster/DumpClient.h			\
   qpid/cluster/Event.cpp			\
   qpid/cluster/Event.h				\
-  qpid/cluster/FailoverExchange.cpp \
+  qpid/cluster/EventFrame.h			\
+  qpid/cluster/FailoverExchange.cpp		\
   qpid/cluster/FailoverExchange.h		\
-  qpid/cluster/Multicaster.cpp \
+  qpid/cluster/Multicaster.cpp			\
   qpid/cluster/Multicaster.h			\
   qpid/cluster/NoOpConnectionOutputHandler.h	\
   qpid/cluster/OutputInterceptor.cpp		\

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=736409&r1=736408&r2=736409&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jan 21 12:40:38 2009
@@ -96,7 +96,8 @@
     writeEstimate(writeEstimate_),
     mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)),
     dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
-    deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
+    deliverEventQueue(boost::bind(&Cluster::deliveredEvents, this, _1), poller),
+    deliverFrameQueue(boost::bind(&Cluster::deliveredFrames, this, _1), poller),
     state(INIT),
     lastSize(0),
     lastBroker(false)
@@ -111,7 +112,8 @@
     broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
     failoverExchange.reset(new FailoverExchange(this));
     dispatcher.start();
-    deliverQueue.start();
+    deliverEventQueue.start();
+    deliverFrameQueue.start();
     QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
     if (quorum_) quorum.init();
     cpg.join(name);
@@ -191,14 +193,14 @@
 void Cluster::deliver(const Event& e, Lock&) {
     if (state == LEFT) return;
     QPID_LOG(trace, *this << " PUSH: " << e);
-    deliverQueue.push(e);
+    QPID_LATENCY_INIT(e);
+    deliverEventQueue.push(e);
 }
 
-// Entry point: called when deliverQueue has events to process.
-void Cluster::delivered(PollableEventQueue::Queue& events) {
+// Entry point: called when deliverEventQueue has events to process.
+void Cluster::deliveredEvents(PollableEventQueue::Queue& events) {
     try {
-        for (PollableEventQueue::Queue::iterator i = events.begin(); i != events.end(); ++i)
-            deliveredEvent(*i, i->getData());
+        for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1));
         events.clear();
     } catch (const std::exception& e) {
         QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
@@ -206,41 +208,52 @@
     }
 }
 
-void Cluster::deliveredEvent(const EventHeader& e, const char* data) {
-    QPID_LATENCY_RECORD("deliver queue", e);
-    Buffer buf(const_cast<char*>(data), e.getSize());
-    AMQFrame frame;
-    if (e.isCluster())  {
-        while (frame.decode(buf)) {
-            QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
-            Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big?
-            ClusterDispatcher dispatch(*this, e.getMemberId(), l);
-            if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
-                throw Exception(QPID_MSG("Invalid cluster control"));
-        }
+void Cluster::deliveredFrames(PollableFrameQueue::Queue& frames) {
+    try {
+        for_each(frames.begin(), frames.end(), boost::bind(&Cluster::deliveredFrame, this, _1));
+        frames.clear();
+    } catch (const std::exception& e) {
+        QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
+        leave();
     }
-    else {                      // e.isConnection()
+}
+
+void Cluster::deliveredEvent(const Event& e) {
+    QPID_LATENCY_RECORD("delivered event queue", e);
+    Buffer buf(const_cast<char*>(e.getData()), e.getSize());
+    boost::intrusive_ptr<Connection> connection;
+    if (e.isConnection()) {
         if (state == NEWBIE) {
             QPID_LOG(trace, *this << " DROP: " << e);
+            return;
         }
-        else {
-            boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId());
-            if (!connection) return;
-            if (e.getType() == CONTROL) {              
-                while (frame.decode(buf)) {
-                    QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
-                    connection->delivered(frame);
-                }
-            }
-            else  {
-                QPID_LOG(trace, *this << " DLVR: " << e);
-                connection->deliverBuffer(buf);
-            }
+        connection = getConnection(e.getConnectionId());
+        if (!connection) return;
+    }
+    if (e.getType() == CONTROL) {
+        AMQFrame frame;
+        while (frame.decode(buf)) {
+            deliverFrameQueue.push(EventFrame(connection, e.getMemberId(), frame));
         }
     }
-    QPID_LATENCY_RECORD("decode+execute", e);
+    else if (e.getType() == DATA) { 
+        connection->deliveredEvent(e, deliverFrameQueue);
+    }
 }
 
+void Cluster::deliveredFrame(const EventFrame& e) {
+    QPID_LOG(trace, *this << " DLVR: " << e.frame);
+    if (e.connection)   {
+        e.connection->deliveredFrame(e);
+    }
+    else {
+        Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big?
+        ClusterDispatcher dispatch(*this, e.member, l);
+        if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
+            throw Exception(QPID_MSG("Invalid cluster control"));
+    }
+}
+  
 struct AddrList {
     const cpg_address* addrs;
     int count;
@@ -379,7 +392,7 @@
         setClusterId(uuid);
         state = DUMPEE;
         QPID_LOG(info, *this << " receiving dump from " << dumper);
-        deliverQueue.stop();
+        deliverEventQueue.stop();
         checkDumpIn(l);
     }
 }
@@ -389,7 +402,7 @@
     assert(state == OFFER);
     state = DUMPER;
     QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << url);
-    deliverQueue.stop();
+    deliverEventQueue.stop();
     if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
     dumpThread = Thread(
         new DumpClient(myId, dumpee, url, broker, map, connections.values(),
@@ -411,7 +424,7 @@
         mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
         state = CATCHUP;
         QPID_LOG(info, *this << " received dump, starting catch-up");
-        deliverQueue.start();
+        deliverEventQueue.start();
     }
 }
 
@@ -425,7 +438,7 @@
     state = READY;
     mcast.release();
     QPID_LOG(info, *this << " sent dump");
-    deliverQueue.start();
+    deliverEventQueue.start();
     tryMakeOffer(map.firstNewbie(), l); // Try another offer
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=736409&r1=736408&r2=736409&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Jan 21 12:40:38 2009
@@ -25,6 +25,7 @@
 #include "Event.h"
 #include "FailoverExchange.h"
 #include "Multicaster.h"
+#include "EventFrame.h"
 #include "NoOpConnectionOutputHandler.h"
 #include "PollerDispatch.h"
 #include "Quorum.h"
@@ -102,7 +103,7 @@
     typedef sys::Monitor::ScopedLock Lock;
 
     typedef sys::PollableQueue<Event> PollableEventQueue;
-    typedef std::deque<Event> PlainEventQueue;
+    typedef sys::PollableQueue<EventFrame> PollableFrameQueue;
 
     // NB: The final Lock& parameter on functions below is used to mark functions
     // that should only be called by a function that already holds the lock.
@@ -126,8 +127,10 @@
     void ready(const MemberId&, const std::string&, Lock&);
     void configChange(const MemberId&, const std::string& addresses, Lock& l);
     void shutdown(const MemberId&, Lock&);
-    void delivered(PollableEventQueue::Queue&); // deliverQueue callback
-    void deliveredEvent(const EventHeader&, const char*); 
+    void deliveredEvents(PollableEventQueue::Queue&);
+    void deliveredFrames(PollableFrameQueue::Queue&);
+    void deliveredEvent(const Event&); 
+    void deliveredFrame(const EventFrame&); 
 
     // Helper, called in deliver thread.
     void dumpStart(const MemberId& dumpee, const Url& url, Lock&);
@@ -185,7 +188,8 @@
     // Thread safe members
     Multicaster mcast;
     PollerDispatch dispatcher;
-    PollableEventQueue deliverQueue;
+    PollableEventQueue deliverEventQueue;
+    PollableFrameQueue deliverFrameQueue;
     ConnectionMap connections;
     boost::shared_ptr<FailoverExchange> failoverExchange;
     Quorum quorum;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=736409&r1=736408&r2=736409&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Jan 21 12:40:38 2009
@@ -62,14 +62,14 @@
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& wrappedId, ConnectionId myId)
     : cluster(c), self(myId), catchUp(false), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId)
+      connection(&output, cluster.getBroker(), wrappedId), readCredit(0)
 { init(); }
 
 // Local connections
 Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
                        const std::string& wrappedId, MemberId myId, bool isCatchUp)
     : cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
-      connection(&output, cluster.getBroker(), wrappedId)
+      connection(&output, cluster.getBroker(), wrappedId), readCredit(0)
 { init(); }
 
 void Connection::init() {
@@ -135,17 +135,35 @@
     return !message.empty();
 }
 
+// Decode buffer and put frames on frameq.
+void Connection::deliveredEvent(const Event& e, EventFrameQueue& frameq) {
+    assert(!catchUp);
+    Buffer buf(e);
+    // Set read credit on the last frame.
+    ++readCredit;               // One credit per buffer.
+    if (!mcastDecoder.decode(buf)) return;
+    AMQFrame frame(mcastDecoder.frame);
+    while (mcastDecoder.decode(buf)) {
+        frameq.push(EventFrame(this, getId().getMember(), frame));
+        frame = mcastDecoder.frame;
+    }
+    frameq.push(EventFrame(this, getId().getMember(), frame, readCredit));
+    readCredit = 0;
+}
+
+
 // Delivered from cluster.
-void Connection::delivered(framing::AMQFrame& f) {
-    QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f);
-    QPID_LATENCY_INIT(f);
+void Connection::deliveredFrame(const EventFrame& f) {
+    QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f.frame);
     assert(!catchUp);
-    currentChannel = f.getChannel(); 
-    if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection contol.
-        && !checkUnsupported(*f.getBody())) // Unsupported operation.
+    currentChannel = f.frame.getChannel(); 
+    if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
+        && !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
     {
-        connection.received(f); // Pass to broker connection.
+        connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker connection.
     }
+    if  (cluster.getReadMax() && f.readCredit)
+        output.giveReadCredit(f.readCredit);
 }
 
 // A local connection is closed by the network layer.
@@ -200,15 +218,6 @@
     return size;
 }
 
-void Connection::deliverBuffer(Buffer& buf) {
-    assert(!catchUp);
-    ++deliverSeq;
-    while (mcastDecoder.decode(buf))
-        delivered(mcastDecoder.frame);
-    if  (cluster.getReadMax())
-        output.giveReadCredit(1);
-}
-
 broker::SessionState& Connection::sessionState() {
     return *connection.getChannel(currentChannel).getSession();
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=736409&r1=736408&r2=736409&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Jan 21 12:40:38 2009
@@ -26,6 +26,8 @@
 #include "WriteEstimate.h"
 #include "OutputInterceptor.h"
 #include "NoOpConnectionOutputHandler.h"
+#include "Event.h"
+#include "EventFrame.h"
 
 #include "qpid/broker/Connection.h"
 #include "qpid/amqp_0_10/Connection.h"
@@ -49,6 +51,7 @@
 
 namespace cluster {
 class Cluster;
+class Event;
 
 /** Intercept broker::Connection calls for shadow and local cluster connections. */
 class Connection :
@@ -58,6 +61,8 @@
         
 {
   public:
+    typedef sys::PollableQueue<EventFrame> EventFrameQueue;
+
     /** Local connection, use this in ConnectionId */
     Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp);
     /** Shadow connection */
@@ -96,8 +101,8 @@
     size_t decode(const char* buffer, size_t size);
 
     // Called for data delivered from the cluster.
-    void deliverBuffer(framing::Buffer&);
-    void delivered(framing::AMQFrame&);
+    void deliveredEvent(const Event&, EventFrameQueue&);
+    void deliveredFrame(const EventFrame&);
 
     void consumerState(const std::string& name, bool blocked, bool notifyEnabled);
     
@@ -166,6 +171,7 @@
     framing::SequenceNumber deliverSeq;
     framing::ChannelId currentChannel;
     boost::shared_ptr<broker::TxBuffer> txBuffer;
+    int readCredit;
     
   friend std::ostream& operator<<(std::ostream&, const Connection&);
 };

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=736409&r1=736408&r2=736409&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Wed Jan 21 12:40:38 2009
@@ -31,12 +31,15 @@
 #include <sys/uio.h>            // For iovec
 #include <iosfwd>
 
+#include "types.h"
+
 namespace qpid {
-namespace cluster {
 
-// TODO aconway 2008-09-03: more efficient solution for shared
-// byte-stream data.
-// 
+namespace framing {
+class AMQBody;
+}
+
+namespace cluster {
 
 /** Header data for a multicast event */
 class EventHeader : public ::qpid::sys::LatencyMetricTimestamp {

Added: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=736409&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Wed Jan 21 12:40:38 2009
@@ -0,0 +1,53 @@
+#ifndef QPID_CLUSTER_EVENTFRAME_H
+#define QPID_CLUSTER_EVENTFRAME_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/framing/AMQFrame.h"
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+namespace cluster {
+
+class Connection;
+
+/**
+ * A frame decoded from an Event.
+ */
+struct EventFrame
+{
+    // Connection event frame
+    EventFrame(const boost::intrusive_ptr<Connection>& c, const MemberId& m, const framing::AMQFrame& f, int rc=0)
+        : connection(c), member(m), frame(f), readCredit(rc) {}
+
+    bool isCluster() const { return !connection; }
+    bool isConnection() const { return connection; }
+    
+    boost::intrusive_ptr<Connection> connection;
+    MemberId member;
+    framing::AMQFrame frame;   
+    int readCredit;             // restore this much read credit when frame is processed
+};
+}} // namespace qpid::cluster
+
+#endif  /*!QPID_CLUSTER_EVENTFRAME_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org