You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/05/26 23:41:52 UTC
svn commit: r778896 - in /qpid/trunk/qpid/cpp: src/qpid/cluster/ xml/
Author: aconway
Date: Tue May 26 21:41:52 2009
New Revision: 778896
URL: http://svn.apache.org/viewvc?rev=778896&view=rev
Log:
Improved doOutput algorithm.
Simpler & more robust algorithm based on message count rather than byte size.
Self-tuning, removes 2 hard-to-explain cluster options.
Similar or marginally better performance.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Tue May 26 21:41:52 2009
@@ -70,13 +70,8 @@
#if HAVE_LIBCMAN_H
("cluster-cman", optValue(settings.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
- ("cluster-read-max", optValue(settings.readMax,"N"),
- "Experimental: Limit per-client-connection queue of read buffers. 0=no limit.")
- ("cluster-write-estimate", optValue(settings.writeEstimate, "Kb"),
- "Experimental: initial estimate for write rate per multicast cycle")
- ("cluster-write-min", optValue(settings.writeMin, "Kb"),
- "Experimental: minimum estimate for write rate per multicast cycle")
- // FIXME aconway 2009-05-20: temporary
+ ("cluster-read-max", optValue(settings.readMax,"N"), "Experimental: flow-control limit reads per connection. 0=no limit.")
+ // FIXME aconway 2009-05-20: temporary
("cluster-check-errors", optValue(settings.checkErrors, "yes|no"), "Enable/disable cluster error checks. Normally should be enabled.")
;
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterSettings.h Tue May 26 21:41:52 2009
@@ -32,11 +32,11 @@
std::string name;
std::string url;
bool quorum;
- size_t readMax, writeEstimate, writeMin;
+ size_t readMax;
std::string username, password, mechanism;
bool checkErrors;
- ClusterSettings() : quorum(false), readMax(10), writeEstimate(1), writeMin(1),
+ ClusterSettings() : quorum(false), readMax(10),
checkErrors(true) // FIXME aconway 2009-05-20: temporary
{}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue May 26 21:41:52 2009
@@ -113,14 +113,6 @@
return output.doOutput();
}
-// Delivery of doOutput allows us to run the real connection doOutput()
-// which stocks up the write buffers with data.
-//
-void Connection::deliverDoOutput(uint32_t requested) {
- assert(!catchUp);
- output.deliverDoOutput(requested);
-}
-
// Received from a directly connected client.
void Connection::received(framing::AMQFrame& f) {
QPID_LOG(trace, cluster << " RECV " << *this << ": " << f);
@@ -279,7 +271,7 @@
QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
}
-void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment) {
+void Connection::shadowReady(uint64_t memberId, uint64_t connectionId, const string& username, const string& fragment, uint32_t sendMax) {
ConnectionId shadowId = ConnectionId(memberId, connectionId);
QPID_LOG(debug, cluster << " catch-up connection " << *this << " becomes shadow " << shadowId);
self = shadowId;
@@ -287,6 +279,7 @@
// OK to use decoder here because cluster is stalled for update.
cluster.getDecoder().get(self).setFragment(fragment.data(), fragment.size());
connection.setErrorListener(this);
+ output.setSendMax(sendMax);
}
void Connection::membership(const FieldTable& joiners, const FieldTable& members, uint64_t frameSeq) {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue May 26 21:41:52 2009
@@ -115,7 +115,7 @@
const framing::SequenceNumber& received,
const framing::SequenceSet& unknownCompleted, const SequenceSet& receivedIncomplete);
- void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment);
+ void shadowReady(uint64_t memberId, uint64_t connectionId, const std::string& username, const std::string& fragment, uint32_t sendMax);
void membership(const framing::FieldTable&, const framing::FieldTable&, uint64_t frameSeq);
@@ -150,6 +150,8 @@
void deliverClose();
+ OutputInterceptor& getOutput() { return output; }
+
private:
struct NullFrameHandler : public framing::FrameHandler {
void handle(framing::AMQFrame&) {}
@@ -164,8 +166,7 @@
void init();
bool checkUnsupported(const framing::AMQBody& body);
- void deliverDoOutput(uint32_t requested);
- void sendDoOutput();
+ void deliverDoOutput(uint32_t limit) { output.deliverDoOutput(limit); }
boost::shared_ptr<broker::Queue> findQueue(const std::string& qname);
broker::SessionState& sessionState();
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Tue May 26 21:41:52 2009
@@ -32,17 +32,17 @@
namespace cluster {
using namespace framing;
+using namespace std;
NoOpConnectionOutputHandler OutputInterceptor::discardHandler;
OutputInterceptor::OutputInterceptor(Connection& p, sys::ConnectionOutputHandler& h)
- : parent(p), closing(false), next(&h), sent(),
- estimate(p.getCluster().getSettings().writeEstimate*1024),
- minimum(p.getCluster().getSettings().writeMin*1024),
- moreOutput(), doingOutput()
+ : parent(p), closing(false), next(&h), sendMax(1), sent(0), sentDoOutput(false)
{}
-LATENCY_TRACK(extern sys::LatencyTracker<const AMQBody*> doOutputTracker;)
+#if defined QPID_LATENCY_TRACKER
+extern sys::LatencyTracker<const AMQBody*> doOutputTracker;
+#endif
void OutputInterceptor::send(framing::AMQFrame& f) {
LATENCY_TRACK(doOutputTracker.finish(f.getBody()));
@@ -53,8 +53,6 @@
sys::Mutex::ScopedLock l(lock);
next->send(f);
}
- if (!parent.isCatchUp())
- sent += f.encodedSize();
}
void OutputInterceptor::activateOutput() {
@@ -62,11 +60,8 @@
sys::Mutex::ScopedLock l(lock);
next->activateOutput();
}
- else if (!closing) { // Don't send do ouput after output stopped.
- QPID_LOG(trace, parent << " activateOutput - sending doOutput");
- moreOutput = true;
- sendDoOutput(estimate);
- }
+ else
+ sendDoOutput(sendMax);
}
void OutputInterceptor::giveReadCredit(int32_t credit) {
@@ -77,43 +72,33 @@
// Called in write thread when the IO layer has no more data to write.
// We do nothing in the write thread, we run doOutput only on delivery
// of doOutput requests.
-bool OutputInterceptor::doOutput() { return false; }
-
-// Delivery of doOutput allows us to run the real connection doOutput()
-// which tranfers frames to the codec for writing.
-//
-void OutputInterceptor::deliverDoOutput(size_t requested) {
- size_t buf = getBuffered();
- if (parent.isLocal()) { // Adjust estimate for next sendDoOutput
- sent = sent > buf ? sent - buf : 0; // Buffered data was not sent.
- if (buf > 0) // Wrote to capacity, move estimate towards sent.
- estimate = (estimate + sent) /2;
- else if (sent >= estimate) // Last estimate was too small, increase it.
- estimate *= 2;
- if (estimate < minimum) estimate = minimum;
- }
- // Run the real doOutput() till we have added the requested data
- // or there's nothing to output. Record how much we send.
- sent = 0;
- do {
- moreOutput = parent.getBrokerConnection().doOutput();
- } while (sent < requested && moreOutput);
- sent += buf; // Include data previously in the buffer
+bool OutputInterceptor::doOutput() { return false; }
+// Send output up to limit, calculate new limit.
+void OutputInterceptor::deliverDoOutput(uint32_t limit) {
+ sentDoOutput = false;
+ sendMax = limit;
+ size_t newLimit = limit;
if (parent.isLocal()) {
- // Send the next doOutput request
- doingOutput = false;
- sendDoOutput(estimate); // FIXME aconway 2009-04-28: account for data in buffer?
+ size_t buffered = getBuffered();
+ if (buffered == 0 && sent == sendMax) // Could have sent more, increase the limit.
+ newLimit = sendMax*2;
+ else if (buffered > 0 && sent > 1) // Data left unsent, reduce the limit.
+ newLimit = sent-1;
}
+ sent = 0;
+ while (sent < limit && parent.getBrokerConnection().doOutput())
+ ++sent;
+ if (sent == limit) sendDoOutput(newLimit);
}
-// Send a doOutput request if one is not already in flight.
-void OutputInterceptor::sendDoOutput(size_t request) {
- if (!parent.isLocal() || doingOutput || !moreOutput) return;
- doingOutput = true;
- parent.getCluster().getMulticast().mcastControl(
- ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), estimate), parent.getId());
- QPID_LOG(trace, parent << "Send doOutput request for " << request);
+void OutputInterceptor::sendDoOutput(size_t newLimit) {
+ if (parent.isLocal() && !sentDoOutput && !closing) {
+ sentDoOutput = true;
+ parent.getCluster().getMulticast().mcastControl(
+ ClusterConnectionDeliverDoOutputBody(ProtocolVersion(), newLimit),
+ parent.getId());
+ }
}
void OutputInterceptor::closeOutput() {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.h Tue May 26 21:41:52 2009
@@ -49,28 +49,28 @@
size_t getBuffered() const;
// Delivery point for doOutput requests.
- void deliverDoOutput(size_t requested);
+ void deliverDoOutput(uint32_t limit);
// Intercept doOutput requests on Connection.
bool doOutput();
void closeOutput();
+ uint32_t getSendMax() const { return sendMax; }
+ void setSendMax(uint32_t sendMax_) { sendMax=sendMax_; }
+
cluster::Connection& parent;
private:
typedef sys::Mutex::ScopedLock Locker;
- void sendDoOutput(size_t);
+ void sendDoOutput(size_t newLimit);
mutable sys::Mutex lock;
bool closing;
sys::ConnectionOutputHandler* next;
- size_t sent;
- size_t estimate;
- size_t minimum;
- bool moreOutput;
- bool doingOutput;
static NoOpConnectionOutputHandler discardHandler;
+ uint32_t sendMax, sent;
+ bool sentDoOutput;
};
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/PollableQueue.h Tue May 26 21:41:52 2009
@@ -41,7 +41,8 @@
const boost::shared_ptr<sys::Poller>& poller)
: sys::PollableQueue<T>(boost::bind(&PollableQueue<T>::handleBatch, this, _1),
poller),
- callback(f), error(err), message(msg) {}
+ callback(f), error(err), message(msg)
+ {}
typename sys::PollableQueue<T>::Batch::const_iterator
handleBatch(const typename sys::PollableQueue<T>::Batch& values) {
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue May 26 21:41:52 2009
@@ -253,7 +253,8 @@
updateConnection->getId().getMember(),
updateConnection->getId().getNumber(),
bc.getUserId(),
- string(fragment.first, fragment.second)
+ string(fragment.first, fragment.second),
+ updateConnection->getOutput().getSendMax()
);
shadowConnection.close();
QPID_LOG(debug, updaterId << " updated connection " << *updateConnection);
Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=778896&r1=778895&r2=778896&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue May 26 21:41:52 2009
@@ -74,7 +74,7 @@
<control name="deliver-close" code="0x2"/>
<control name="deliver-do-output" code="0x3">
- <field name="bytes" type="uint32"/>
+ <field name="limit" type="uint32"/>
</control>
<!-- Update controls. Sent to a new broker in joining mode.
@@ -139,6 +139,7 @@
<field name="connection-id" type="uint64"/>
<field name="user-name" type="str8"/>
<field name="fragment" type="str32"/>
+ <field name="send-max" type="uint32"/>
</control>
<!-- Complete a cluster state update. -->
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org