You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2009/01/16 22:34:47 UTC
svn commit: r735151 - in /qpid/trunk/qpid/cpp/src: cluster.mk
qpid/cluster/Cluster.cpp qpid/cluster/Cluster.h qpid/cluster/Event.cpp
qpid/cluster/Event.h
Author: aconway
Date: Fri Jan 16 13:34:46 2009
New Revision: 735151
URL: http://svn.apache.org/viewvc?rev=735151&view=rev
Log:
cluster refactor: separate out dispatch strategy, implement poller and thread dispatch.
Modified:
qpid/trunk/qpid/cpp/src/cluster.mk
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
Modified: qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/cluster.mk?rev=735151&r1=735150&r2=735151&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ qpid/trunk/qpid/cpp/src/cluster.mk Fri Jan 16 13:34:46 2009
@@ -38,36 +38,40 @@
cluster_la_SOURCES = \
$(CMAN_SOURCES) \
- qpid/cluster/types.h \
qpid/cluster/Cluster.cpp \
qpid/cluster/Cluster.h \
- qpid/cluster/Cpg.cpp \
- qpid/cluster/Cpg.h \
- qpid/cluster/Dispatchable.h \
+ qpid/cluster/ClusterLeaveException.h \
+ qpid/cluster/ClusterMap.cpp \
+ qpid/cluster/ClusterMap.h \
qpid/cluster/ClusterPlugin.cpp \
- qpid/cluster/ConnectionCodec.h \
- qpid/cluster/ConnectionCodec.cpp \
- qpid/cluster/Connection.h \
qpid/cluster/Connection.cpp \
+ qpid/cluster/Connection.h \
+ qpid/cluster/ConnectionCodec.cpp \
+ qpid/cluster/ConnectionCodec.h \
qpid/cluster/ConnectionMap.h \
- qpid/cluster/NoOpConnectionOutputHandler.h \
- qpid/cluster/WriteEstimate.h \
- qpid/cluster/WriteEstimate.cpp \
- qpid/cluster/OutputInterceptor.h \
- qpid/cluster/OutputInterceptor.cpp \
- qpid/cluster/ProxyInputHandler.h \
- qpid/cluster/Event.h \
- qpid/cluster/Event.cpp \
- qpid/cluster/DumpClient.h \
+ qpid/cluster/Cpg.cpp \
+ qpid/cluster/Cpg.h \
+ qpid/cluster/Dispatchable.h \
qpid/cluster/DumpClient.cpp \
- qpid/cluster/ClusterMap.h \
- qpid/cluster/ClusterMap.cpp \
- qpid/cluster/FailoverExchange.h \
+ qpid/cluster/DumpClient.h \
+ qpid/cluster/Event.cpp \
+ qpid/cluster/Event.h \
qpid/cluster/FailoverExchange.cpp \
- qpid/cluster/Multicaster.h \
+ qpid/cluster/FailoverExchange.h \
qpid/cluster/Multicaster.cpp \
- qpid/cluster/ClusterLeaveException.h \
- qpid/cluster/Quorum.h
+ qpid/cluster/Multicaster.h \
+ qpid/cluster/NoOpConnectionOutputHandler.h \
+ qpid/cluster/OutputInterceptor.cpp \
+ qpid/cluster/OutputInterceptor.h \
+ qpid/cluster/PollerDispatch.cpp \
+ qpid/cluster/PollerDispatch.h \
+ qpid/cluster/ThreadDispatch.cpp \
+ qpid/cluster/ThreadDispatch.h \
+ qpid/cluster/ProxyInputHandler.h \
+ qpid/cluster/Quorum.h \
+ qpid/cluster/WriteEstimate.cpp \
+ qpid/cluster/WriteEstimate.h \
+ qpid/cluster/types.h
cluster_la_LIBADD= -lcpg $(libcman) libqpidbroker.la libqpidclient.la
cluster_la_LDFLAGS = $(PLUGINLDFLAGS)
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=735151&r1=735150&r2=735151&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Fri Jan 16 13:34:46 2009
@@ -85,6 +85,7 @@
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b, bool quorum_, size_t readMax_, size_t writeEstimate_, size_t mcastMax) :
broker(b),
+ mgmtObject(0),
poller(b.getPoller()),
cpg(*this),
name(name_),
@@ -92,14 +93,8 @@
myId(cpg.self()),
readMax(readMax_),
writeEstimate(writeEstimate_),
- cpgDispatchHandle(
- cpg,
- boost::bind(&Cluster::dispatch, this, _1), // read
- 0, // write
- boost::bind(&Cluster::disconnect, this, _1) // disconnect
- ),
mcast(cpg, mcastMax, poller, boost::bind(&Cluster::leave, this)),
- mgmtObject(0),
+ dispatcher(cpg, poller, boost::bind(&Cluster::leave, this)),
deliverQueue(boost::bind(&Cluster::delivered, this, _1), poller),
state(INIT),
lastSize(0),
@@ -114,7 +109,7 @@
}
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
failoverExchange.reset(new FailoverExchange(this));
- cpgDispatchHandle.startWatch(poller);
+ dispatcher.start();
deliverQueue.start();
QPID_LOG(notice, *this << " joining cluster " << name << " with url=" << myUrl);
if (quorum_) quorum.init();
@@ -153,14 +148,13 @@
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
- if (!deliverQueue.isStopped()) deliverQueue.stop();
try { cpg.leave(); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error leaving process group: " << e.what());
}
try { broker.shutdown(); }
catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error during shutdown: " << e.what());
+ QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
}
}
}
@@ -202,7 +196,8 @@
// Entry point: called when deliverQueue has events to process.
void Cluster::delivered(PollableEventQueue::Queue& events) {
try {
- for_each(events.begin(), events.end(), boost::bind(&Cluster::deliveredEvent, this, _1));
+ for (PollableEventQueue::Queue::iterator i = events.begin(); i != events.end(); ++i)
+ deliveredEvent(*i, i->getData());
events.clear();
} catch (const std::exception& e) {
QPID_LOG(critical, *this << " error in cluster delivery: " << e.what());
@@ -210,8 +205,8 @@
}
}
-void Cluster::deliveredEvent(const Event& e) {
- Buffer buf(e);
+void Cluster::deliveredEvent(const EventHeader& e, const char* data) {
+ Buffer buf(const_cast<char*>(data), e.getSize());
AMQFrame frame;
if (e.isCluster()) {
while (frame.decode(buf)) {
@@ -270,27 +265,6 @@
return o << a.suffix;
}
-// Entry point: called by IO to dispatch CPG events.
-void Cluster::dispatch(sys::DispatchHandle& h) {
- try {
- cpg.dispatchAll();
- h.rewatch();
- } catch (const std::exception& e) {
- QPID_LOG(critical, *this << " error in cluster dispatch: " << e.what());
- leave();
- }
-}
-
-// Entry point: called if disconnected from CPG.
-void Cluster::disconnect(sys::DispatchHandle& ) {
- QPID_LOG(critical, *this << " error disconnected from cluster");
- try {
- broker.shutdown();
- } catch (const std::exception& e) {
- QPID_LOG(error, *this << " error in shutdown: " << e.what());
- }
-}
-
void Cluster::configChange (
cpg_handle_t /*handle*/,
cpg_name */*group*/,
@@ -358,7 +332,7 @@
if (state != LEFT) {
try { cpg.shutdown(); }
catch (const std::exception& e) {
- QPID_LOG(error, *this << " during shutdown: " << e.what());
+ QPID_LOG(error, *this << " shutting down CPG: " << e.what());
}
}
delete this;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=735151&r1=735150&r2=735151&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Fri Jan 16 13:34:46 2009
@@ -19,19 +19,19 @@
*
*/
-#include "Cpg.h"
-#include "Event.h"
-#include "NoOpConnectionOutputHandler.h"
#include "ClusterMap.h"
#include "ConnectionMap.h"
+#include "Cpg.h"
+#include "Event.h"
#include "FailoverExchange.h"
-#include "Quorum.h"
#include "Multicaster.h"
+#include "NoOpConnectionOutputHandler.h"
+#include "PollerDispatch.h"
+#include "Quorum.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Monitor.h"
-#include "qpid/sys/LockPtr.h"
#include "qpid/management/Manageable.h"
#include "qpid/Url.h"
#include "qmf/org/apache/qpid/cluster/Cluster.h"
@@ -99,8 +99,6 @@
size_t getWriteEstimate() { return writeEstimate; }
private:
- typedef sys::LockPtr<Cluster,sys::Monitor> LockPtr;
- typedef sys::LockPtr<const Cluster,sys::Monitor> ConstLockPtr;
typedef sys::Monitor::ScopedLock Lock;
typedef sys::PollableQueue<Event> PollableEventQueue;
@@ -129,15 +127,11 @@
void configChange(const MemberId&, const std::string& addresses, Lock& l);
void shutdown(const MemberId&, Lock&);
void delivered(PollableEventQueue::Queue&); // deliverQueue callback
- void deliveredEvent(const Event&);
+ void deliveredEvent(const EventHeader&, const char*);
// Helper, called in deliver thread.
void dumpStart(const MemberId& dumpee, const Url& url, Lock&);
- // CPG callbacks, called in CPG IO thread.
- void dispatch(sys::DispatchHandle&); // Dispatch CPG events.
- void disconnect(sys::DispatchHandle&); // PG was disconnected
-
void deliver( // CPG deliver callback.
cpg_handle_t /*handle*/,
struct cpg_name *group,
@@ -177,6 +171,7 @@
// Immutable members set on construction, never changed.
broker::Broker& broker;
+ qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
boost::shared_ptr<sys::Poller> poller;
Cpg cpg;
const std::string name;
@@ -186,12 +181,10 @@
const size_t writeEstimate;
framing::Uuid clusterId;
NoOpConnectionOutputHandler shadowOut;
- sys::DispatchHandle cpgDispatchHandle;
-
// Thread safe members
Multicaster mcast;
- qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
+ PollerDispatch dispatcher;
PollableEventQueue deliverQueue;
ConnectionMap connections;
boost::shared_ptr<FailoverExchange> failoverExchange;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp?rev=735151&r1=735150&r2=735151&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.cpp Fri Jan 16 13:34:46 2009
@@ -93,7 +93,7 @@
static const char* EVENT_TYPE_NAMES[] = { "data", "control" };
-std::ostream& operator << (std::ostream& o, const Event& e) {
+std::ostream& operator << (std::ostream& o, const EventHeader& e) {
o << "[event " << e.getConnectionId()
<< " " << EVENT_TYPE_NAMES[e.getType()]
<< " " << e.getSize() << " bytes]";
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h?rev=735151&r1=735150&r2=735151&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Event.h Fri Jan 16 13:34:46 2009
@@ -91,7 +91,7 @@
RefCountedBuffer::pointer store;
};
-std::ostream& operator << (std::ostream&, const Event&);
+std::ostream& operator << (std::ostream&, const EventHeader&);
}} // namespace qpid::cluster
#endif /*!QPID_CLUSTER_EVENT_H*/