You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/01/28 21:48:23 UTC
svn commit: r738618 - in /qpid/trunk/qpid/cpp/src/qpid/cluster: Cluster.cpp
Cluster.h ClusterPlugin.cpp Multicaster.cpp Multicaster.h
Author: aconway
Date: Wed Jan 28 20:48:23 2009
New Revision: 738618
URL: http://svn.apache.org/viewvc?rev=738618&view=rev
Log: (empty)
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=738618&r1=738617&r2=738618&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jan 28 20:48:23 2009
@@ -86,7 +86,7 @@
bool invoke(AMQBody& body) { return framing::invoke(*this, body).wasHandled(); }
};
-Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_, size_t mcastMax) :
+Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_) :
broker(b),
mgmtObject(0),
poller(b.getPoller()),
@@ -96,7 +96,7 @@
myId(cpg.self()),
readMax(readMax_),
writeEstimate(writeEstimate_),
- mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)),
+ mcast(cpg, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
deliverEventQueue(ClusterQueueHandler<Event>(this, boost::bind(&Cluster::deliveredEvent, this, _1), "event queue"), poller),
deliverFrameQueue(ClusterQueueHandler<EventFrame>(this, boost::bind(&Cluster::deliveredFrame, this, _1), "frame queue"), poller),
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=738618&r1=738617&r2=738618&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Jan 28 20:48:23 2009
@@ -71,7 +71,7 @@
* Join a cluster.
*/
Cluster(const std::string& name, const Url& url, broker::Broker&, bool useQuorum,
- size_t readMax, size_t writeEstimate, size_t mcastMax);
+ size_t readMax, size_t writeEstimate);
virtual ~Cluster();
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=738618&r1=738617&r2=738618&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Wed Jan 28 20:48:23 2009
@@ -52,9 +52,9 @@
string name;
string url;
bool quorum;
- size_t readMax, writeEstimate, mcastMax;
+ size_t readMax, writeEstimate;
- ClusterValues() : quorum(false), readMax(10), writeEstimate(64), mcastMax(0) {}
+ ClusterValues() : quorum(false), readMax(10), writeEstimate(64) {}
Url getUrl(uint16_t port) const {
if (url.empty()) return Url::getIpAddressesUrl(port);
@@ -79,11 +79,9 @@
("cluster-cman", optValue(values.quorum), "Integrate with Cluster Manager (CMAN) cluster.")
#endif
("cluster-read-max", optValue(values.readMax,"N"),
- "Experimental: Max unreplicated reads per connetion connection. 0=no limit.")
- ("cluster-mcast-max", optValue(values.mcastMax,"N"),
- "Experimental: Max outstanding multicasts per broker. 0=no limit.")
+ "Experimental: Limit per-client-connection queue of read buffers. 0=no limit.")
("cluster-write-estimate", optValue(values.writeEstimate, "Kb"),
- "Experimental: initial estimate for connection writes rate per multicast cycle");
+ "Experimental: initial estimate for connection write rate per multicast cycle");
}
};
@@ -147,7 +145,7 @@
values.getUrl(broker->getPort(Broker::TCP_TRANSPORT)),
*broker,
values.quorum,
- values.readMax, values.writeEstimate*1024, values.mcastMax
+ values.readMax, values.writeEstimate*1024
);
broker->setConnectionFactory(
boost::shared_ptr<sys::ConnectionCodec::Factory>(
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp?rev=738618&r1=738617&r2=738618&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.cpp Wed Jan 28 20:48:23 2009
@@ -28,14 +28,12 @@
namespace qpid {
namespace cluster {
-Multicaster::Multicaster(Cpg& cpg_, size_t mcastMax_,
+Multicaster::Multicaster(Cpg& cpg_,
const boost::shared_ptr<sys::Poller>& poller,
boost::function<void()> onError_) :
onError(onError_), cpg(cpg_),
queue(boost::bind(&Multicaster::sendMcast, this, _1), poller),
- holding(true),
- mcastMax(mcastMax_),
- pending(0)
+ holding(true)
{
queue.start();
}
@@ -69,22 +67,10 @@
try {
PollableEventQueue::Queue::iterator i = values.begin();
while( i != values.end()) {
- if (mcastMax) {
- sys::Mutex::ScopedLock l(lock);
- if (pending == mcastMax) {
- queue.stop();
- break ;
- }
- ++pending;
- }
QPID_LATENCY_RECORD("mcast send queue", *i);
iovec iov = i->toIovec();
if (!cpg.mcast(&iov, 1)) {
// cpg didn't send because of CPG flow control.
- if (mcastMax) {
- sys::Mutex::ScopedLock l(lock);
- --pending;
- }
break;
}
++i;
@@ -108,13 +94,6 @@
void Multicaster::selfDeliver(const Event& e) {
sys::Mutex::ScopedLock l(lock);
QPID_LATENCY_RECORD("cpg self deliver", e);
- if (mcastMax) {
- assert(pending > 0);
- assert(pending <= mcastMax);
- if (pending == mcastMax)
- queue.start();
- --pending;
- }
}
}} // namespace qpid::cluster
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h?rev=738618&r1=738617&r2=738618&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Multicaster.h Wed Jan 28 20:48:23 2009
@@ -46,7 +46,6 @@
public:
/** Starts in holding mode: connection data events are held, other events are mcast */
Multicaster(Cpg& cpg_,
- size_t mcastMax,
const boost::shared_ptr<sys::Poller>&,
boost::function<void()> onError
);
@@ -71,7 +70,6 @@
bool holding;
PlainEventQueue holdingQueue;
std::vector<struct ::iovec> ioVector;
- size_t mcastMax, pending;
};
}} // namespace qpid::cluster
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org