You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/05/26 23:41:52 UTC

svn commit: r778896 - in /qpid/trunk/qpid/cpp: src/qpid/cluster/ xml/

Author: aconway
Date: Tue May 26 21:41:52 2009
New Revision: 778896

URL: http://svn.apache.org/viewvc?rev=778896&view=rev
Log:
Improved doOutput algorithm.

Simpler & more robust algorithm based on message count rather than byte size.
Self-tuning, removes 2 hard-to-explain cluster options.
Similar or marginally better performance.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Tue May 26 21:41:52 2009
@@ -70,13 +70,8 @@
 #if HAVE_LIBCMAN_H
             ("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
 #endif
-            ("cluster-read-max", optValue(settings.readMax,"N"),
-             "Experimental: Limit per-client-connection queue of read buffers. 0=no limit.")
-            ("cluster-write-estimate", optValue(settings.writeEstimate, "Kb"),
-             "Experimental: initial estimate for write rate per multicast cycle")
-            ("cluster-write-min", optValue(settings.writeMin, "Kb"),
-             "Experimental: minimum estimate for write rate per multicast cycle")
-            // FIXME aconway 2009-05-20: temporary
+            ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit  reads per connection. 0=no limit.")
+            // FIXME aconway 2009-05-20: temporary 
             ("cluster-check-errors", optValue(settings.checkErrors, "yes|no"), "Enable/disable cluster error checks. Normally should be enabled.")
             ;
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h Tue May 26 21:41:52 2009
@@ -32,11 +32,11 @@
     std::string name;
     std::string url;
     bool quorum;
-    size_t readMax, writeEstimate, writeMin;
+    size_t readMax;
     std::string username, password, mechanism;
     bool checkErrors;
 
-    ClusterSettings() : quorum(false), readMax(10), writeEstimate(1), writeMin(1),
+    ClusterSettings() : quorum(false), readMax(10),
                         checkErrors(true) // FIXME aconway 2009-05-20: temporary
     {}
   

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue May 26 21:41:52 2009
@@ -113,14 +113,6 @@
     return output.doOutput();
 }
 
-// Delivery of doOutput allows us to run the real connection doOutput()
-// which stocks up the write buffers with data.
-//
-void Connection::deliverDoOutput(uint32_t requested) {
-    assert(!catchUp);
-    output.deliverDoOutput(requested);
-}
-
 // Received from a directly connected client.
 void Connection::received(framing::AMQFrame& f) {
     QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
@@ -279,7 +271,7 @@
     QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
 }
     
-void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment) {
+void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) {
     ConnectionId shadowId = ConnectionId(memberId, connectionId);
     QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
     self = shadowId;
@@ -287,6 +279,7 @@
     // OK to use decoder here because cluster is stalled for update.
     cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
     connection.setErrorListener(this);
+    output.setSendMax(sendMax);
 }
 
 void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue May 26 21:41:52 2009
@@ -115,7 +115,7 @@
                       const framing::SequenceNumber& received,
                       const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
     
-    void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
+    void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax);
 
     void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
 
@@ -150,6 +150,8 @@
 
     void deliverClose();
 
+    OutputInterceptor& getOutput() { return output; }
+
   private:
     struct NullFrameHandler : public framing::FrameHandler {
         void handle(framing::AMQFrame&) {}
@@ -164,8 +166,7 @@
     
     void init();
     bool checkUnsupported(const framing::AMQBody& body);
-    void deliverDoOutput(uint32_t requested);
-    void sendDoOutput();
+    void deliverDoOutput(uint32_t limit) { output.deliverDoOutput(limit); }
 
     boost::shared_ptr<broker::Queue> findQueue(const std::string& qname);
     broker::SessionState& sessionState();

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Tue May 26 21:41:52 2009
@@ -32,17 +32,17 @@
 namespace cluster {
 
 using namespace framing;
+using namespace std;
 
 NoOpConnectionOutputHandler OutputInterceptor::discardHandler;
 
 OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h)
-    : parent(p), closing(false), next(&h), sent(),
-      estimate(p.getCluster().getSettings().writeEstimate*1024),
-      minimum(p.getCluster().getSettings().writeMin*1024),
-      moreOutput(), doingOutput()
+    : parent(p), closing(false), next(&h), sendMax(1), sent(0), sentDoOutput(false)
 {}
 
-LATENCY_TRACK(extern sys::LatencyTracker<const AMQBody*> doOutputTracker;)
+#if defined QPID_LATENCY_TRACKER
+extern sys::LatencyTracker<const AMQBody*> doOutputTracker;
+#endif
 
 void OutputInterceptor::send(framing::AMQFrame& f) {
     LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
@@ -53,8 +53,6 @@
         sys::Mutex::ScopedLock l(lock);
         next->send(f);
     }
-    if (!parent.isCatchUp())
-        sent += f.encodedSize();
 }
 
 void OutputInterceptor::activateOutput() {
@@ -62,11 +60,8 @@
         sys::Mutex::ScopedLock l(lock);
         next->activateOutput();
     }
-    else if (!closing) {        // Don't send do ouput after output stopped.
-        QPID_LOG(trace,  parent << " activateOutput - sending doOutput");
-        moreOutput = true;
-        sendDoOutput(estimate);
-    }
+    else
+        sendDoOutput(sendMax);
 }
 
 void OutputInterceptor::giveReadCredit(int32_t credit) {
@@ -77,43 +72,33 @@
 // Called in write thread when the IO layer has no more data to write.
 // We do nothing in the write thread, we run doOutput only on delivery
 // of doOutput requests.
-bool  OutputInterceptor::doOutput() { return false; }
-
-// Delivery of doOutput allows us to run the real connection doOutput()
-// which tranfers frames to the codec for writing.
-// 
-void OutputInterceptor::deliverDoOutput(size_t requested) {
-    size_t buf = getBuffered();
-    if (parent.isLocal()) {  // Adjust estimate for next sendDoOutput 
-        sent = sent > buf ? sent - buf : 0; // Buffered data was not sent.
-        if (buf > 0)          // Wrote to capacity, move estimate towards sent.
-            estimate = (estimate + sent) /2;  
-        else if (sent >= estimate) // Last estimate was too small, increase it.
-            estimate *= 2;  
-        if (estimate < minimum) estimate = minimum;
-    }
-    // Run the real doOutput() till we have added the requested data
-    // or there's nothing to output. Record how much we send.
-    sent = 0;
-    do {
-        moreOutput = parent.getBrokerConnection().doOutput();
-    } while (sent < requested && moreOutput);
-    sent += buf;                // Include data previously in the buffer
+bool OutputInterceptor::doOutput() { return false; }
 
+// Send output up to limit, calculate new limit. 
+void OutputInterceptor::deliverDoOutput(uint32_t limit) {
+    sentDoOutput = false;
+    sendMax = limit;
+    size_t newLimit = limit;
     if (parent.isLocal()) {
-        // Send the next doOutput request
-        doingOutput = false;
-        sendDoOutput(estimate); // FIXME aconway 2009-04-28: account for data in buffer?
+        size_t buffered = getBuffered();
+        if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit.
+            newLimit = sendMax*2; 
+        else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit.
+            newLimit = sent-1;
     }
+    sent = 0;
+    while (sent < limit && parent.getBrokerConnection().doOutput())
+        ++sent;
+    if (sent == limit) sendDoOutput(newLimit);
 }
 
-// Send a doOutput request if one is not already in flight.
-void OutputInterceptor::sendDoOutput(size_t request) {
-    if (!parent.isLocal() || doingOutput || !moreOutput) return;
-    doingOutput = true;
-    parent.getCluster().getMulticast().mcastControl(
-        ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), estimate), parent.getId());
-    QPID_LOG(trace, parent << "Send doOutput request for " << request);
+void OutputInterceptor::sendDoOutput(size_t newLimit) {
+    if (parent.isLocal() && !sentDoOutput && !closing) {
+        sentDoOutput = true;
+        parent.getCluster().getMulticast().mcastControl(
+            ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit),
+            parent.getId());
+    }
 }
 
 void OutputInterceptor::closeOutput() {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h Tue May 26 21:41:52 2009
@@ -49,28 +49,28 @@
     size_t getBuffered() const;
 
     // Delivery point for doOutput requests.
-    void deliverDoOutput(size_t requested);
+    void deliverDoOutput(uint32_t limit);
     // Intercept doOutput requests on Connection.
     bool doOutput();
 
     void closeOutput();
 
+    uint32_t getSendMax() const { return sendMax; }
+    void setSendMax(uint32_t sendMax_) { sendMax=sendMax_; }
+    
     cluster::Connection& parent;
     
   private:
     typedef sys::Mutex::ScopedLock Locker;
 
-    void sendDoOutput(size_t);
+    void sendDoOutput(size_t newLimit);
 
     mutable sys::Mutex lock;
     bool closing;
     sys::ConnectionOutputHandler* next;
-    size_t sent;
-    size_t estimate;
-    size_t minimum;
-    bool moreOutput;
-    bool doingOutput;
     static NoOpConnectionOutputHandler discardHandler;
+    uint32_t sendMax, sent;
+    bool sentDoOutput;
 };
 
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Tue May 26 21:41:52 2009
@@ -41,7 +41,8 @@
                   const boost::shared_ptr<sys::Poller>& poller)
         : sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1),
                                 poller),
-          callback(f), error(err), message(msg) {}
+          callback(f), error(err), message(msg)
+    {}
 
     typename sys::PollableQueue<T>::Batch::const_iterator
     handleBatch(const typename sys::PollableQueue<T>::Batch& values) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue May 26 21:41:52 2009
@@ -253,7 +253,8 @@
         updateConnection->getId().getMember(),
         updateConnection->getId().getNumber(),
         bc.getUserId(),
-        string(fragment.first, fragment.second)
+        string(fragment.first, fragment.second),
+        updateConnection->getOutput().getSendMax()
     );
     shadowConnection.close();
     QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue May 26 21:41:52 2009
@@ -74,7 +74,7 @@
     <control name="deliver-close" code="0x2"/>
 
     <control name="deliver-do-output" code="0x3">
-      <field name="bytes" type="uint32"/>
+      <field name="limit" type="uint32"/>
     </control>
 
     <!-- Update controls. Sent to a new broker in joining mode.
@@ -139,6 +139,7 @@
       <field name="connection-id" type="uint64"/>
       <field name="user-name" type="str8"/>
       <field name="fragment" type="str32"/>
+      <field name="send-max" type="uint32"/>
     </control>
 
     <!-- Complete a cluster state update. -->



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org