You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/01/21 21:40:38 UTC
svn commit: r736409 - in /qpid/trunk/qpid/cpp/src: cluster.mk
qpid/cluster/Cluster.cpp qpid/cluster/Cluster.h qpid/cluster/Connection.cpp
qpid/cluster/Connection.h qpid/cluster/Event.h qpid/cluster/EventFrame.h
Author: aconway
Date: Wed Jan 21 12:40:38 2009
New Revision: 736409
URL: http://svn.apache.org/viewvc?rev=736409&view=rev
Log:
cluster: Pipeline decoding. About 10% improvement in latency and throughput.
Added:
qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (with props)
Modified:
qpid/trunk/qpid/cpp/src/cluster.mk
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=736409&r1=736408&r2=736409&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Wed Jan 21 12:40:38 2009
@@ -36,29 +36,30 @@
dmodule_LTLIBRARIES += cluster.la
-cluster_la_SOURCES = \
- $(CMAN_SOURCES) \
- qpid/cluster/Cluster.cpp \
- qpid/cluster/Cluster.h \
+cluster_la_SOURCES = \
+ $(CMAN_SOURCES) \
+ qpid/cluster/Cluster.cpp \
+ qpid/cluster/Cluster.h \
qpid/cluster/ClusterLeaveException.h \
qpid/cluster/ClusterMap.cpp \
qpid/cluster/ClusterMap.h \
- qpid/cluster/ClusterPlugin.cpp \
- qpid/cluster/Connection.cpp \
+ qpid/cluster/ClusterPlugin.cpp \
+ qpid/cluster/Connection.cpp \
qpid/cluster/Connection.h \
qpid/cluster/ConnectionCodec.cpp \
qpid/cluster/ConnectionCodec.h \
- qpid/cluster/ConnectionMap.h \
+ qpid/cluster/ConnectionMap.h \
qpid/cluster/Cpg.cpp \
qpid/cluster/Cpg.h \
qpid/cluster/Dispatchable.h \
- qpid/cluster/DumpClient.cpp \
+ qpid/cluster/DumpClient.cpp \
qpid/cluster/DumpClient.h \
qpid/cluster/Event.cpp \
qpid/cluster/Event.h \
- qpid/cluster/FailoverExchange.cpp \
+ qpid/cluster/EventFrame.h \
+ qpid/cluster/FailoverExchange.cpp \
qpid/cluster/FailoverExchange.h \
- qpid/cluster/Multicaster.cpp \
+ qpid/cluster/Multicaster.cpp \
qpid/cluster/Multicaster.h \
qpid/cluster/NoOpConnectionOutputHandler.h \
qpid/cluster/OutputInterceptor.cpp \
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=736409&r1=736408&r2=736409&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Wed Jan 21 12:40:38 2009
@@ -96,7 +96,8 @@
writeEstimate(writeEstimate_),
mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)),
dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
- deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
+ deliverEventQueue(boost::bind(&Cluster::deliveredEvents, this, _1), poller),
+ deliverFrameQueue(boost::bind(&Cluster::deliveredFrames, this, _1), poller),
state(INIT),
lastSize(0),
lastBroker(false)
@@ -111,7 +112,8 @@
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
failoverExchange.reset(new FailoverExchange(this));
dispatcher.start();
- deliverQueue.start();
+ deliverEventQueue.start();
+ deliverFrameQueue.start();
QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
if (quorum_) quorum.init();
cpg.join(name);
@@ -191,14 +193,14 @@
void Cluster::deliver(const Event& e, Lock&) {
if (state == LEFT) return;
QPID_LOG(trace, *this << " PUSH: " << e);
- deliverQueue.push(e);
+ QPID_LATENCY_INIT(e);
+ deliverEventQueue.push(e);
}
-// Entry point: called when deliverQueue has events to process.
-void Cluster::delivered(PollableEventQueue::Queue& events) {
+// Entry point: called when deliverEventQueue has events to process.
+void Cluster::deliveredEvents(PollableEventQueue::Queue& events) {
try {
- for (PollableEventQueue::Queue::iterator i = events.begin(); i != events.end(); ++i)
- deliveredEvent(*i, i->getData());
+ for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1));
events.clear();
} catch (const std::exception& e) {
QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
@@ -206,41 +208,52 @@
}
}
-void Cluster::deliveredEvent(const EventHeader& e, const char* data) {
- QPID_LATENCY_RECORD("deliver queue", e);
- Buffer buf(const_cast<char*>(data), e.getSize());
- AMQFrame frame;
- if (e.isCluster()) {
- while (frame.decode(buf)) {
- QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
- Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big?
- ClusterDispatcher dispatch(*this, e.getMemberId(), l);
- if (!framing::invoke(dispatch, *frame.getBody()).wasHandled())
- throw Exception(QPID_MSG("Invalid cluster control"));
- }
+void Cluster::deliveredFrames(PollableFrameQueue::Queue& frames) {
+ try {
+ for_each(frames.begin(), frames.end(), boost::bind(&Cluster::deliveredFrame, this, _1));
+ frames.clear();
+ } catch (const std::exception& e) {
+ QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
+ leave();
}
- else { // e.isConnection()
+}
+
+void Cluster::deliveredEvent(const Event& e) {
+ QPID_LATENCY_RECORD("delivered event queue", e);
+ Buffer buf(const_cast<char*>(e.getData()), e.getSize());
+ boost::intrusive_ptr<Connection> connection;
+ if (e.isConnection()) {
if (state == NEWBIE) {
QPID_LOG(trace, *this << " DROP: " << e);
+ return;
}
- else {
- boost::intrusive_ptr<Connection> connection = getConnection(e.getConnectionId());
- if (!connection) return;
- if (e.getType() == CONTROL) {
- while (frame.decode(buf)) {
- QPID_LOG(trace, *this << " DLVR: " << e << " " << frame);
- connection->delivered(frame);
- }
- }
- else {
- QPID_LOG(trace, *this << " DLVR: " << e);
- connection->deliverBuffer(buf);
- }
+ connection = getConnection(e.getConnectionId());
+ if (!connection) return;
+ }
+ if (e.getType() == CONTROL) {
+ AMQFrame frame;
+ while (frame.decode(buf)) {
+ deliverFrameQueue.push(EventFrame(connection, e.getMemberId(), frame));
}
}
- QPID_LATENCY_RECORD("decode+execute", e);
+ else if (e.getType() == DATA) {
+ connection->deliveredEvent(e, deliverFrameQueue);
+ }
}
+void Cluster::deliveredFrame(const EventFrame& e) {
+ QPID_LOG(trace, *this << " DLVR: " << e.frame);
+ if (e.connection) {
+ e.connection->deliveredFrame(e);
+ }
+ else {
+ Mutex::ScopedLock l(lock); // FIXME aconway 2008-12-11: lock scope too big?
+ ClusterDispatcher dispatch(*this, e.member, l);
+ if (!framing::invoke(dispatch, *e.frame.getBody()).wasHandled())
+ throw Exception(QPID_MSG("Invalid cluster control"));
+ }
+}
+
struct AddrList {
const cpg_address* addrs;
int count;
@@ -379,7 +392,7 @@
setClusterId(uuid);
state = DUMPEE;
QPID_LOG(info, *this << " receiving dump from " << dumper);
- deliverQueue.stop();
+ deliverEventQueue.stop();
checkDumpIn(l);
}
}
@@ -389,7 +402,7 @@
assert(state == OFFER);
state = DUMPER;
QPID_LOG(info, *this << " stall for dump to " << dumpee << " at " << url);
- deliverQueue.stop();
+ deliverEventQueue.stop();
if (dumpThread.id()) dumpThread.join(); // Join the previous dumpthread.
dumpThread = Thread(
new DumpClient(myId, dumpee, url, broker, map, connections.values(),
@@ -411,7 +424,7 @@
mcast.mcastControl(ClusterReadyBody(ProtocolVersion(), myUrl.str()), myId);
state = CATCHUP;
QPID_LOG(info, *this << " received dump, starting catch-up");
- deliverQueue.start();
+ deliverEventQueue.start();
}
}
@@ -425,7 +438,7 @@
state = READY;
mcast.release();
QPID_LOG(info, *this << " sent dump");
- deliverQueue.start();
+ deliverEventQueue.start();
tryMakeOffer(map.firstNewbie(), l); // Try another offer
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=736409&r1=736408&r2=736409&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Wed Jan 21 12:40:38 2009
@@ -25,6 +25,7 @@
#include "Event.h"
#include "FailoverExchange.h"
#include "Multicaster.h"
+#include "EventFrame.h"
#include "NoOpConnectionOutputHandler.h"
#include "PollerDispatch.h"
#include "Quorum.h"
@@ -102,7 +103,7 @@
typedef sys::Monitor::ScopedLock Lock;
typedef sys::PollableQueue<Event> PollableEventQueue;
- typedef std::deque<Event> PlainEventQueue;
+ typedef sys::PollableQueue<EventFrame> PollableFrameQueue;
// NB: The final Lock& parameter on functions below is used to mark functions
// that should only be called by a function that already holds the lock.
@@ -126,8 +127,10 @@
void ready(const MemberId&, const std::string&, Lock&);
void configChange(const MemberId&, const std::string& addresses, Lock& l);
void shutdown(const MemberId&, Lock&);
- void delivered(PollableEventQueue::Queue&); // deliverQueue callback
- void deliveredEvent(const EventHeader&, const char*);
+ void deliveredEvents(PollableEventQueue::Queue&);
+ void deliveredFrames(PollableFrameQueue::Queue&);
+ void deliveredEvent(const Event&);
+ void deliveredFrame(const EventFrame&);
// Helper, called in deliver thread.
void dumpStart(const MemberId& dumpee, const Url& url, Lock&);
@@ -185,7 +188,8 @@
// Thread safe members
Multicaster mcast;
PollerDispatch dispatcher;
- PollableEventQueue deliverQueue;
+ PollableEventQueue deliverEventQueue;
+ PollableFrameQueue deliverFrameQueue;
ConnectionMap connections;
boost::shared_ptr<FailoverExchange> failoverExchange;
Quorum quorum;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=736409&r1=736408&r2=736409&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Jan 21 12:40:38 2009
@@ -62,14 +62,14 @@
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, ConnectionId myId)
: cluster(c), self(myId), catchUp(false), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId)
+ connection(&output, cluster.getBroker(), wrappedId), readCredit(0)
{ init(); }
// Local connections
Connection::Connection(Cluster& c, sys::ConnectionOutputHandler& out,
const std::string& wrappedId, MemberId myId, bool isCatchUp)
: cluster(c), self(myId, this), catchUp(isCatchUp), output(*this, out),
- connection(&output, cluster.getBroker(), wrappedId)
+ connection(&output, cluster.getBroker(), wrappedId), readCredit(0)
{ init(); }
void Connection::init() {
@@ -135,17 +135,35 @@
return !message.empty();
}
+// Decode buffer and put frames on frameq.
+void Connection::deliveredEvent(const Event& e, EventFrameQueue& frameq) {
+ assert(!catchUp);
+ Buffer buf(e);
+ // Set read credit on the last frame.
+ ++readCredit; // One credit per buffer.
+ if (!mcastDecoder.decode(buf)) return;
+ AMQFrame frame(mcastDecoder.frame);
+ while (mcastDecoder.decode(buf)) {
+ frameq.push(EventFrame(this, getId().getMember(), frame));
+ frame = mcastDecoder.frame;
+ }
+ frameq.push(EventFrame(this, getId().getMember(), frame, readCredit));
+ readCredit = 0;
+}
+
+
// Delivered from cluster.
-void Connection::delivered(framing::AMQFrame& f) {
- QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f);
- QPID_LATENCY_INIT(f);
+void Connection::deliveredFrame(const EventFrame& f) {
+ QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f.frame);
assert(!catchUp);
- currentChannel = f.getChannel();
- if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection contol.
- && !checkUnsupported(*f.getBody())) // Unsupported operation.
+ currentChannel = f.frame.getChannel();
+ if (!framing::invoke(*this, *f.frame.getBody()).wasHandled() // Connection contol.
+ && !checkUnsupported(*f.frame.getBody())) // Unsupported operation.
{
- connection.received(f); // Pass to broker connection.
+ connection.received(const_cast<AMQFrame&>(f.frame)); // Pass to broker connection.
}
+ if (cluster.getReadMax() && f.readCredit)
+ output.giveReadCredit(f.readCredit);
}
// A local connection is closed by the network layer.
@@ -200,15 +218,6 @@
return size;
}
-void Connection::deliverBuffer(Buffer& buf) {
- assert(!catchUp);
- ++deliverSeq;
- while (mcastDecoder.decode(buf))
- delivered(mcastDecoder.frame);
- if (cluster.getReadMax())
- output.giveReadCredit(1);
-}
-
broker::SessionState& Connection::sessionState() {
return *connection.getChannel(currentChannel).getSession();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=736409&r1=736408&r2=736409&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Jan 21 12:40:38 2009
@@ -26,6 +26,8 @@
#include "WriteEstimate.h"
#include "OutputInterceptor.h"
#include "NoOpConnectionOutputHandler.h"
+#include "Event.h"
+#include "EventFrame.h"
#include "qpid/broker/Connection.h"
#include "qpid/amqp_0_10/Connection.h"
@@ -49,6 +51,7 @@
namespace cluster {
class Cluster;
+class Event;
/** Intercept broker::Connection calls for shadow and local cluster connections. */
class Connection :
@@ -58,6 +61,8 @@
{
public:
+ typedef sys::PollableQueue<EventFrame> EventFrameQueue;
+
/** Local connection, use this in ConnectionId */
Connection(Cluster&, sys::ConnectionOutputHandler& out, const std::string& id, MemberId, bool catchUp);
/** Shadow connection */
@@ -96,8 +101,8 @@
size_t decode(const char* buffer, size_t size);
// Called for data delivered from the cluster.
- void deliverBuffer(framing::Buffer&);
- void delivered(framing::AMQFrame&);
+ void deliveredEvent(const Event&, EventFrameQueue&);
+ void deliveredFrame(const EventFrame&);
void consumerState(const std::string& name, bool blocked, bool notifyEnabled);
@@ -166,6 +171,7 @@
framing::SequenceNumber deliverSeq;
framing::ChannelId currentChannel;
boost::shared_ptr<broker::TxBuffer> txBuffer;
+ int readCredit;
friend std::ostream& operator<<(std::ostream&, const Connection&);
};
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=736409&r1=736408&r2=736409&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Wed Jan 21 12:40:38 2009
@@ -31,12 +31,15 @@
#include <sys/uio.h> // For iovec
#include <iosfwd>
+#include "types.h"
+
namespace qpid {
-namespace cluster {
-// TODO aconway 2008-09-03: more efficient solution for shared
-// byte-stream data.
-//
+namespace framing {
+class AMQBody;
+}
+
+namespace cluster {
/** Header data for a multicast event */
class EventHeader : public ::qpid::sys::LatencyMetricTimestamp {
Added: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=736409&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Wed Jan 21 12:40:38 2009
@@ -0,0 +1,53 @@
+#ifndef QPID_CLUSTER_EVENTFRAME_H
+#define QPID_CLUSTER_EVENTFRAME_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "types.h"
+#include "qpid/framing/AMQFrame.h"
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+namespace cluster {
+
+class Connection;
+
+/**
+ * A frame decoded from an Event.
+ */
+struct EventFrame
+{
+ // Connection event frame
+ EventFrame(const boost::intrusive_ptr<Connection>& c, const MemberId& m, const framing::AMQFrame& f, int rc=0)
+ : connection(c), member(m), frame(f), readCredit(rc) {}
+
+ bool isCluster() const { return !connection; }
+ bool isConnection() const { return connection; }
+
+ boost::intrusive_ptr<Connection> connection;
+ MemberId member;
+ framing::AMQFrame frame;
+ int readCredit; // restore this much read credit when frame is processed
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_EVENTFRAME_H*/
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org