You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/01/28 21:48:23 UTC

svn commit: r738618 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp Cluster.h ClusterPlugin.cpp Multicaster.cpp Multicaster.h

Author: aconway
Date: Wed Jan 28 20:48:23 2009
New Revision: 738618

URL: http://svn.apache.org/viewvc?rev=738618&view=rev
Log: (empty)

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=738618&r1=738617&r2=738618&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jan 28 20:48:23 2009
@@ -86,7 +86,7 @@
     bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
 };
 
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_, size_t mcastMax) :
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_) :
     broker(b),
     mgmtObject(0),
     poller(b.getPoller()),
@@ -96,7 +96,7 @@
     myId(cpg.self()),
     readMax(readMax_),
     writeEstimate(writeEstimate_),
-    mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)),
+    mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
     dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
     deliverEventQueue(ClusterQueueHandler<Event>(this, boost::bind(&Cluster::deliveredEvent, this, _1), "event queue"),  poller),
     deliverFrameQueue(ClusterQueueHandler<EventFrame>(this, boost::bind(&Cluster::deliveredFrame, this, _1), "frame queue"), poller),

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=738618&r1=738617&r2=738618&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Jan 28 20:48:23 2009
@@ -71,7 +71,7 @@
      * Join a cluster. 
      */
     Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum,
-            size_t readMax, size_t writeEstimate, size_t mcastMax);
+            size_t readMax, size_t writeEstimate);
 
     virtual ~Cluster();
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=738618&r1=738617&r2=738618&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Wed Jan 28 20:48:23 2009
@@ -52,9 +52,9 @@
     string name;
     string url;
     bool quorum;
-    size_t readMax, writeEstimate, mcastMax;
+    size_t readMax, writeEstimate;
 
-    ClusterValues() : quorum(false), readMax(10), writeEstimate(64), mcastMax(0) {}
+    ClusterValues() : quorum(false), readMax(10), writeEstimate(64) {}
   
     Url getUrl(uint16_t port) const {
         if (url.empty()) return Url::getIpAddressesUrl(port);
@@ -79,11 +79,9 @@
             ("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
 #endif
             ("cluster-read-max", optValue(values.readMax,"N"),
-             "Experimental: Max unreplicated reads per connetion connection. 0=no limit.")
-            ("cluster-mcast-max", optValue(values.mcastMax,"N"),
-             "Experimental: Max outstanding multicasts per broker. 0=no limit.")
+             "Experimental: Limit per-client-connection queue of read buffers. 0=no limit.")
             ("cluster-write-estimate", optValue(values.writeEstimate, "Kb"),
-             "Experimental: initial estimate for connection writes rate per multicast cycle");
+             "Experimental: initial estimate for connection write rate per multicast cycle");
     }
 };
 
@@ -147,7 +145,7 @@
             values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)),
             *broker,
             values.quorum,
-            values.readMax, values.writeEstimate*1024, values.mcastMax
+            values.readMax, values.writeEstimate*1024
         );
         broker->setConnectionFactory(
             boost::shared_ptr<sys::ConnectionCodec::Factory>(

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=738618&r1=738617&r2=738618&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Wed Jan 28 20:48:23 2009
@@ -28,14 +28,12 @@
 namespace qpid {
 namespace cluster {
 
-Multicaster::Multicaster(Cpg& cpg_, size_t mcastMax_,
+Multicaster::Multicaster(Cpg& cpg_, 
                          const boost::shared_ptr<sys::Poller>& poller,
                          boost::function<void()> onError_) :
     onError(onError_), cpg(cpg_), 
     queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
-    holding(true),
-    mcastMax(mcastMax_),
-    pending(0)
+    holding(true)
 {
     queue.start();
 }
@@ -69,22 +67,10 @@
     try {
         PollableEventQueue::Queue::iterator i = values.begin();
         while( i != values.end()) {
-            if (mcastMax) {
-                sys::Mutex::ScopedLock l(lock);
-                if (pending == mcastMax) {
-                    queue.stop();
-                    break ;
-                }
-                ++pending;
-            }
             QPID_LATENCY_RECORD("mcast send queue", *i);
             iovec iov = i->toIovec();
             if (!cpg.mcast(&iov, 1)) {
                 // cpg didn't send because of CPG flow control.
-                if (mcastMax) {
-                    sys::Mutex::ScopedLock l(lock);
-                    --pending;
-                }
                 break; 
             }
             ++i;
@@ -108,13 +94,6 @@
 void Multicaster::selfDeliver(const Event& e) {
     sys::Mutex::ScopedLock l(lock);
     QPID_LATENCY_RECORD("cpg self deliver", e);
-    if (mcastMax) {
-        assert(pending > 0);
-        assert(pending <= mcastMax);
-        if  (pending == mcastMax) 
-            queue.start();
-        --pending;
-    }
 }
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=738618&r1=738617&r2=738618&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Wed Jan 28 20:48:23 2009
@@ -46,7 +46,6 @@
   public:
     /** Starts in holding mode: connection data events are held, other events are mcast */
     Multicaster(Cpg& cpg_,
-                size_t mcastMax, 
                 const boost::shared_ptr<sys::Poller>&,
                 boost::function<void()> onError
     );
@@ -71,7 +70,6 @@
     bool holding;
     PlainEventQueue holdingQueue;
     std::vector<struct ::iovec> ioVector;
-    size_t mcastMax, pending;
 };
 }} // namespace qpid::cluster
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org