You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/01/09 05:50:35 UTC

svn commit: r732925 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp Cluster.h ClusterPlugin.cpp Multicaster.cpp Multicaster.h OutputInterceptor.cpp

Author: aconway
Date: Thu Jan  8 20:50:35 2009
New Revision: 732925

URL: http://svn.apache.org/viewvc?rev=732925&view=rev
Log:
Added --cluster-read-max: max number of outstanding mcasts in CPG buffers.
Work around problems with CPG flow control.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=732925&r1=732924&r2=732925&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Jan  8 20:50:35 2009
@@ -83,7 +83,7 @@
     bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
 };
 
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_) :
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_, size_t mcastMax) :
     broker(b),
     poller(b.getPoller()),
     cpg(*this),
@@ -98,7 +98,7 @@
         0,                                         // write
         boost::bind(&Cluster::disconnect, this, _1) // disconnect
     ),
-    mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
+    mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)),
     mgmtObject(0),
     deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
     state(INIT),
@@ -193,7 +193,9 @@
 void Cluster::deliver(const Event& e, Lock&) {
     if (state == LEFT) return;
     QPID_LOG(trace, *this << " PUSH: " << e);
-    deliverQueue.push(e);       // Otherwise enqueue for processing.
+    if (e.getMemberId() == myId)
+        mcast.delivered(e);     // Note delivery for flow control
+    deliverQueue.push(e);
 }
 
 // Entry point: called when deliverQueue has events to process.

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=732925&r1=732924&r2=732925&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Jan  8 20:50:35 2009
@@ -70,7 +70,7 @@
      * Join a cluster. 
      */
     Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum,
-            size_t readMax, size_t writeEstimate);
+            size_t readMax, size_t writeEstimate, size_t mcastMax);
 
     virtual ~Cluster();
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=732925&r1=732924&r2=732925&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Thu Jan  8 20:50:35 2009
@@ -41,10 +41,10 @@
     string name;
     string url;
     bool quorum;
-    size_t readMax, writeEstimate;
+    size_t readMax, writeEstimate, mcastMax;
 
     // FIXME aconway 2008-12-09: revisit default.
-    ClusterValues() : quorum(false), readMax(0), writeEstimate(64) {}
+    ClusterValues() : quorum(false), readMax(10), writeEstimate(64), mcastMax(10) {}
   
     Url getUrl(uint16_t port) const {
         if (url.empty()) return Url::getIpAddressesUrl(port);
@@ -69,10 +69,11 @@
             ("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
 #endif
             ("cluster-read-max", optValue(values.readMax,"N"),
-             "Experimental: Throttle read rate from client connections.")
+             "Experimental: Max unreplicated reads per connetion connection. 0=no limit.")
+            ("cluster-mcast-max", optValue(values.mcastMax,"N"),
+             "Experimental: Max outstanding multicasts per broker. 0=no limit.")
             ("cluster-write-estimate", optValue(values.writeEstimate, "Kb"),
-             "Experimental: initial estimate for connection write per multicast cycle")
-            ;
+             "Experimental: initial estimate for connection writes rate per multicast cycle");
     }
 };
 
@@ -91,7 +92,13 @@
         if (values.name.empty()) return; // Only if --cluster-name option was specified.
         Broker* broker = dynamic_cast<Broker*>(&target);
         if (!broker) return;
-        cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum, values.readMax, values.writeEstimate*1024);
+        cluster = new Cluster(
+            values.name,
+            values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)),
+            *broker,
+            values.quorum,
+            values.readMax, values.writeEstimate*1024, values.mcastMax
+        );
         broker->setConnectionFactory(
             boost::shared_ptr<sys::ConnectionCodec::Factory>(
                 new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=732925&r1=732924&r2=732925&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Thu Jan  8 20:50:35 2009
@@ -28,10 +28,14 @@
 namespace qpid {
 namespace cluster {
 
-Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller, boost::function<void()> onError_) :
-    onError(onError_), cpg(cpg_),
+Multicaster::Multicaster(Cpg& cpg_, size_t mcastMax_,
+                         const boost::shared_ptr<sys::Poller>& poller,
+                         boost::function<void()> onError_) :
+    onError(onError_), cpg(cpg_), 
     queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
-    holding(true)
+    holding(true),
+    mcastMax(mcastMax_),
+    pending(0)
 {
     queue.start();
 }
@@ -56,6 +60,7 @@
         }
     }
     queue.push(e);
+
 }
 
 
@@ -64,9 +69,18 @@
         PollableEventQueue::Queue::iterator i = values.begin();
         while( i != values.end()) {
             iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() };
-            if (!cpg.mcast(&iov, 1)) break; // returns false for flow control
+            if (!cpg.mcast(&iov, 1))
+                break; // cpg.mcast returns false for flow control
             QPID_LOG(trace, " MCAST " << *i);
             ++i;
+            if (mcastMax) {
+                sys::Mutex::ScopedLock l(lock);
+                assert(pending < mcastMax);
+                if (++pending == mcastMax) {
+                    queue.stop();
+                    break ;
+                }
+            }
         }
         values.erase(values.begin(), i);
     }
@@ -84,4 +98,14 @@
     holdingQueue.clear();
 }
 
+void Multicaster::delivered(const Event&) {
+    sys::Mutex::ScopedLock l(lock);
+    if (mcastMax) {
+        assert(pending <= mcastMax);
+        if  (pending == mcastMax) 
+            queue.start();
+        --pending;
+    }
+}
+
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=732925&r1=732924&r2=732925&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Thu Jan  8 20:50:35 2009
@@ -46,13 +46,19 @@
 {
   public:
     /** Starts in holding mode: connection data events are held, other events are mcast */
-    Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>&, boost::function<void()> onError );
+    Multicaster(Cpg& cpg_,
+                size_t mcastMax, 
+                const boost::shared_ptr<sys::Poller>&,
+                boost::function<void()> onError
+    );
     void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&);
     void mcastBuffer(const char*, size_t, const ConnectionId&);
     void mcast(const Event& e);
     /** End holding mode, held events are mcast */
     void release();
-
+    /** Call when events are self-delivered to manage flow control. */
+    void delivered(const Event& e);
+    
   private:
     typedef sys::PollableQueue<Event> PollableEventQueue;
     typedef std::deque<Event> PlainEventQueue;
@@ -66,6 +72,7 @@
     bool holding;
     PlainEventQueue holdingQueue;
     std::vector<struct ::iovec> ioVector;
+    size_t mcastMax, pending;
 };
 }} // namespace qpid::cluster
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=732925&r1=732924&r2=732925&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu Jan  8 20:50:35 2009
@@ -77,7 +77,7 @@
 // which tranfers frames to the codec for writing.
 // 
 void OutputInterceptor::deliverDoOutput(size_t requested) {
-    size_t buf = next->getBuffered();
+    size_t buf = getBuffered();
     if (parent.isLocal())
         writeEstimate.delivered(requested, sent, buf); // Update the estimate.