You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/01/09 05:50:35 UTC
svn commit: r732925 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp
Cluster.h ClusterPlugin.cpp Multicaster.cpp Multicaster.h
OutputInterceptor.cpp
Author: aconway
Date: Thu Jan 8 20:50:35 2009
New Revision: 732925
URL: http://svn.apache.org/viewvc?rev=732925&view=rev
Log:
Added --cluster-read-max: max number of outstanding mcasts in CPG buffers.
Work around problems with CPG flow control.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=732925&r1=732924&r2=732925&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Jan 8 20:50:35 2009
@@ -83,7 +83,7 @@
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_) :
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_, size_t mcastMax) :
broker(b),
poller(b.getPoller()),
cpg(*this),
@@ -98,7 +98,7 @@
0, // write
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
- mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
+ mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)),
mgmtObject(0),
deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
state(INIT),
@@ -193,7 +193,9 @@
void Cluster::deliver(const Event& e, Lock&) {
if (state == LEFT) return;
QPID_LOG(trace, *this << " PUSH: " << e);
- deliverQueue.push(e); // Otherwise enqueue for processing.
+ if (e.getMemberId() == myId)
+ mcast.delivered(e); // Note delivery for flow control
+ deliverQueue.push(e);
}
// Entry point: called when deliverQueue has events to process.
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=732925&r1=732924&r2=732925&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Jan 8 20:50:35 2009
@@ -70,7 +70,7 @@
* Join a cluster.
*/
Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum,
- size_t readMax, size_t writeEstimate);
+ size_t readMax, size_t writeEstimate, size_t mcastMax);
virtual ~Cluster();
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=732925&r1=732924&r2=732925&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Thu Jan 8 20:50:35 2009
@@ -41,10 +41,10 @@
string name;
string url;
bool quorum;
- size_t readMax, writeEstimate;
+ size_t readMax, writeEstimate, mcastMax;
// FIXME aconway 2008-12-09: revisit default.
- ClusterValues() : quorum(false), readMax(0), writeEstimate(64) {}
+ ClusterValues() : quorum(false), readMax(10), writeEstimate(64), mcastMax(10) {}
Url getUrl(uint16_t port) const {
if (url.empty()) return Url::getIpAddressesUrl(port);
@@ -69,10 +69,11 @@
("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
("cluster-read-max", optValue(values.readMax,"N"),
- "Experimental: Throttle read rate from client connections.")
+ "Experimental: Max unreplicated reads per connetion connection. 0=no limit.")
+ ("cluster-mcast-max", optValue(values.mcastMax,"N"),
+ "Experimental: Max outstanding multicasts per broker. 0=no limit.")
("cluster-write-estimate", optValue(values.writeEstimate, "Kb"),
- "Experimental: initial estimate for connection write per multicast cycle")
- ;
+ "Experimental: initial estimate for connection writes rate per multicast cycle");
}
};
@@ -91,7 +92,13 @@
if (values.name.empty()) return; // Only if --cluster-name option was specified.
Broker* broker = dynamic_cast<Broker*>(&target);
if (!broker) return;
- cluster = new Cluster(values.name, values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)), *broker, values.quorum, values.readMax, values.writeEstimate*1024);
+ cluster = new Cluster(
+ values.name,
+ values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)),
+ *broker,
+ values.quorum,
+ values.readMax, values.writeEstimate*1024, values.mcastMax
+ );
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
new ConnectionCodec::Factory(broker->getConnectionFactory(), *cluster)));
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=732925&r1=732924&r2=732925&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Thu Jan 8 20:50:35 2009
@@ -28,10 +28,14 @@
namespace qpid {
namespace cluster {
-Multicaster::Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>& poller, boost::function<void()> onError_) :
- onError(onError_), cpg(cpg_),
+Multicaster::Multicaster(Cpg& cpg_, size_t mcastMax_,
+ const boost::shared_ptr<sys::Poller>& poller,
+ boost::function<void()> onError_) :
+ onError(onError_), cpg(cpg_),
queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
- holding(true)
+ holding(true),
+ mcastMax(mcastMax_),
+ pending(0)
{
queue.start();
}
@@ -56,6 +60,7 @@
}
}
queue.push(e);
+
}
@@ -64,9 +69,18 @@
PollableEventQueue::Queue::iterator i = values.begin();
while( i != values.end()) {
iovec iov = { const_cast<char*>(i->getStore()), i->getStoreSize() };
- if (!cpg.mcast(&iov, 1)) break; // returns false for flow control
+ if (!cpg.mcast(&iov, 1))
+ break; // cpg.mcast returns false for flow control
QPID_LOG(trace, " MCAST " << *i);
++i;
+ if (mcastMax) {
+ sys::Mutex::ScopedLock l(lock);
+ assert(pending < mcastMax);
+ if (++pending == mcastMax) {
+ queue.stop();
+ break ;
+ }
+ }
}
values.erase(values.begin(), i);
}
@@ -84,4 +98,14 @@
holdingQueue.clear();
}
+void Multicaster::delivered(const Event&) {
+ sys::Mutex::ScopedLock l(lock);
+ if (mcastMax) {
+ assert(pending <= mcastMax);
+ if (pending == mcastMax)
+ queue.start();
+ --pending;
+ }
+}
+
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=732925&r1=732924&r2=732925&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Thu Jan 8 20:50:35 2009
@@ -46,13 +46,19 @@
{
public:
/** Starts in holding mode: connection data events are held, other events are mcast */
- Multicaster(Cpg& cpg_, const boost::shared_ptr<sys::Poller>&, boost::function<void()> onError );
+ Multicaster(Cpg& cpg_,
+ size_t mcastMax,
+ const boost::shared_ptr<sys::Poller>&,
+ boost::function<void()> onError
+ );
void mcastControl(const framing::AMQBody& controlBody, const ConnectionId&);
void mcastBuffer(const char*, size_t, const ConnectionId&);
void mcast(const Event& e);
/** End holding mode, held events are mcast */
void release();
-
+ /** Call when events are self-delivered to manage flow control. */
+ void delivered(const Event& e);
+
private:
typedef sys::PollableQueue<Event> PollableEventQueue;
typedef std::deque<Event> PlainEventQueue;
@@ -66,6 +72,7 @@
bool holding;
PlainEventQueue holdingQueue;
std::vector<struct ::iovec> ioVector;
+ size_t mcastMax, pending;
};
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=732925&r1=732924&r2=732925&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu Jan 8 20:50:35 2009
@@ -77,7 +77,7 @@
// which tranfers frames to the codec for writing.
//
void OutputInterceptor::deliverDoOutput(size_t requested) {
- size_t buf = next->getBuffered();
+ size_t buf = getBuffered();
if (parent.isLocal())
writeEstimate.delivered(requested, sent, buf); // Update the estimate.