You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/08/12 23:03:43 UTC
svn commit: r685317 - in /incubator/qpid/trunk/qpid/cpp/src/qpid/cluster:
Cluster.cpp Cluster.h
Author: aconway
Date: Tue Aug 12 14:03:43 2008
New Revision: 685317
URL: http://svn.apache.org/viewvc?rev=685317&view=rev
Log:
Queue cluster send frames, do cpg_mcast in separate thread, batching if possible.
5x thruput improvement :)
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=685317&r1=685316&r2=685317&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Aug 12 14:03:43 2008
@@ -68,7 +68,8 @@
0, // write
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
- deliverQueue(boost::bind(&Cluster::deliverFrames, this, _1, _2))
+ deliverQueue(boost::bind(&Cluster::deliverQueueCb, this, _1, _2)),
+ mcastQueue(boost::bind(&Cluster::mcastQueueCb, this, _1, _2))
{
broker->addFinalizer(boost::bind(&Cluster::leave, this));
QPID_LOG(trace, "Joining cluster: " << name_);
@@ -83,6 +84,7 @@
// Start dispatching from the poller.
cpgDispatchHandle.startWatch(poller);
deliverQueue.start(poller);
+ mcastQueue.start(poller);
}
Cluster::~Cluster() {}
@@ -124,12 +126,26 @@
void Cluster::send(const AMQFrame& frame, ConnectionInterceptor* connection) {
QPID_LOG(trace, "MCAST [" << connection << "] " << frame);
- char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
- Buffer buf(buffer);
- frame.encode(buf);
- encodePtr(buf, connection);
- iovec iov = { buffer, buf.getPosition() };
- cpg.mcast(name, &iov, 1);
+ mcastQueue.push(Message(frame, self, connection));
+}
+
+void Cluster::mcastQueueCb(const MessageQueue::iterator& begin,
+ const MessageQueue::iterator& end)
+{
+ // Static is OK because there is only one cluster allowed per
+ // process and only one thread in mcastQueueCb at a time.
+ static char buffer[64*1024]; // FIXME aconway 2008-07-04: buffer management.
+ MessageQueue::iterator i = begin;
+ while (i != end) {
+ Buffer buf(buffer, sizeof(buffer));
+ while (i != end && buf.available() > i->frame.size() + sizeof(uint64_t)) {
+ i->frame.encode(buf);
+ encodePtr(buf, i->connection);
+ ++i;
+ }
+ iovec iov = { buffer, buf.getPosition() };
+ cpg.mcast(name, &iov, 1);
+ }
}
void Cluster::notify() {
@@ -181,12 +197,14 @@
Id from(nodeid, pid);
try {
Buffer buf(static_cast<char*>(msg), msg_len);
- AMQFrame frame;
- if (!frame.decode(buf)) // Not enough data.
- throw Exception("Received incomplete cluster event."); // FIXME aconway 2008-08-05: error handling.
- void* connection;
- decodePtr(buf, connection);
- deliverQueue.push(DeliveredFrame(frame, from, connection));
+ while (buf.available() > 0) {
+ AMQFrame frame;
+ if (!frame.decode(buf)) // Not enough data.
+ throw Exception("Received incomplete cluster event.");
+ void* connection;
+ decodePtr(buf, connection);
+ deliverQueue.push(Message(frame, from, connection));
+ }
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
@@ -196,10 +214,10 @@
}
}
-void Cluster::deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin,
- const PollableQueue<DeliveredFrame>::iterator& end)
+void Cluster::deliverQueueCb(const MessageQueue::iterator& begin,
+ const MessageQueue::iterator& end)
{
- for (PollableQueue<DeliveredFrame>::iterator i = begin; i != end; ++i) {
+ for (MessageQueue::iterator i = begin; i != end; ++i) {
AMQFrame& frame(i->frame);
Id from(i->from);
ConnectionInterceptor* connection = reinterpret_cast<ConnectionInterceptor*>(i->connection);
@@ -220,7 +238,7 @@
}
catch (const std::exception& e) {
// FIXME aconway 2008-01-30: exception handling.
- QPID_LOG(critical, "Error in cluster deliverFrame: " << e.what());
+ QPID_LOG(critical, "Error in cluster deliverQueueCb: " << e.what());
assert(0);
throw;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=685317&r1=685316&r2=685317&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Aug 12 14:03:43 2008
@@ -97,11 +97,13 @@
typedef std::map<Id, Member> MemberMap;
typedef std::map<ShadowConnectionId, ConnectionInterceptor*> ShadowConnectionMap;
- struct DeliveredFrame {
+ /** Message sent over the cluster. */
+ struct Message {
framing::AMQFrame frame; Id from; void* connection;
- DeliveredFrame(const framing::AMQFrame& f, const Id i, void* c)
+ Message(const framing::AMQFrame& f, const Id i, void* c)
: frame(f), from(i), connection(c) {}
};
+ typedef PollableQueue<Message> MessageQueue;
boost::function<void()> shutdownNext;
@@ -126,10 +128,17 @@
);
/** Callback to handle delivered frames from the deliverQueue. */
- void deliverFrames(const PollableQueue<DeliveredFrame>::iterator& begin,
- const PollableQueue<DeliveredFrame>::iterator& end);
+ void deliverQueueCb(const MessageQueue::iterator& begin,
+ const MessageQueue::iterator& end);
+ /** Callback to multi-cast frames from mcastQueue */
+ void mcastQueueCb(const MessageQueue::iterator& begin,
+ const MessageQueue::iterator& end);
+
+
+ /** Callback to dispatch CPG events. */
void dispatch(sys::DispatchHandle&);
+ /** Callback if CPG fd is disconnected. */
void disconnect(sys::DispatchHandle&);
void handleMethod(Id from, ConnectionInterceptor* connection, framing::AMQMethodBody& method);
@@ -147,7 +156,8 @@
ShadowConnectionMap shadowConnectionMap;
ShadowConnectionOutputHandler shadowOut;
sys::DispatchHandle cpgDispatchHandle;
- PollableQueue<DeliveredFrame> deliverQueue;
+ MessageQueue deliverQueue;
+ MessageQueue mcastQueue;
friend std::ostream& operator <<(std::ostream&, const Cluster&);
friend std::ostream& operator <<(std::ostream&, const MemberMap::value_type&);