You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/09/18 15:55:31 UTC
svn commit: r696657 - in /incubator/qpid/trunk/qpid/cpp/src: ./
qpid/cluster/ qpid/sys/ tests/
Author: aconway
Date: Thu Sep 18 06:55:30 2008
New Revision: 696657
URL: http://svn.apache.org/viewvc?rev=696657&view=rev
Log:
Refactor Cluster logic into separate handlers for Joining & Member modes.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/cluster.mk
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
incubator/qpid/trunk/qpid/cpp/src/tests/DumpClientTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/cluster.mk
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/cluster.mk?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/cluster.mk (original)
+++ incubator/qpid/trunk/qpid/cpp/src/cluster.mk Thu Sep 18 06:55:30 2008
@@ -28,7 +28,13 @@
qpid/cluster/DumpClient.h \
qpid/cluster/DumpClient.cpp \
qpid/cluster/ClusterMap.h \
- qpid/cluster/ClusterMap.cpp
+ qpid/cluster/ClusterMap.cpp \
+ qpid/cluster/ClusterHandler.h \
+ qpid/cluster/ClusterHandler.cpp \
+ qpid/cluster/JoiningHandler.h \
+ qpid/cluster/JoiningHandler.cpp \
+ qpid/cluster/MemberHandler.h \
+ qpid/cluster/MemberHandler.cpp
cluster_la_LIBADD= -lcpg libqpidbroker.la libqpidclient.la
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Sep 18 06:55:30 2008
@@ -23,8 +23,6 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/Connection.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/AMQP_AllOperations.h"
-#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterDumpRequestBody.h"
#include "qpid/framing/ClusterUpdateBody.h"
#include "qpid/framing/ClusterReadyBody.h"
@@ -55,17 +53,6 @@
using qpid::management::Args;
namespace _qmf = qmf::org::apache::qpid::cluster;
-struct ClusterOperations : public AMQP_AllOperations::ClusterHandler {
- Cluster& cluster;
- MemberId member;
- ClusterOperations(Cluster& c, const MemberId& id) : cluster(c), member(id) {}
- bool invoke(AMQFrame& f) { return framing::invoke(*this, *f.getBody()).wasHandled(); }
-
- void update(const FieldTable& members, uint64_t dumping) { cluster.update(members, dumping); }
- void dumpRequest(const std::string& url) { cluster.dumpRequest(member, url); }
- void ready(const std::string& url) { cluster.ready(member, url); }
-};
-
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
broker(b),
poller(b.getPoller()),
@@ -79,16 +66,18 @@
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
- state(START)
+ handler(&joiningHandler),
+ joiningHandler(*this),
+ memberHandler(*this)
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
_qmf::Package packageInit(agent);
mgmtObject = new _qmf::Cluster (agent, this, &broker,name.str(),url.str());
agent->addObject (mgmtObject);
- mgmtObject->set_status("JOINING");
+ mgmtObject->set_status("JOINING");
- // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
+ // if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
}
QPID_LOG(notice, self << " joining cluster " << name.str());
broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
@@ -108,9 +97,6 @@
connections.erase(id);
}
-// FIXME aconway 2008-09-10: call leave from cluster admin command.
-// Any other type of exit is caught in disconnect().
-//
void Cluster::leave() {
QPID_LOG(notice, self << " leaving cluster " << name.str());
cpg.leave(name);
@@ -147,6 +133,7 @@
}
// FIXME aconway 2008-09-15: volatile for locked/unlocked functions.
+// Check locking from Handler functions.
boost::intrusive_ptr<Connection> Cluster::getConnection(const ConnectionId& id) {
Mutex::ScopedLock l(lock);
if (id.getMember() == self)
@@ -179,24 +166,16 @@
AMQFrame frame;
while (frame.decode(buf)) {
QPID_LOG(trace, "DLVR [" << e.getConnectionId().getMember() << "]: " << *frame.getBody());
- if (!ClusterOperations(*this, e.getConnectionId().getMember()).invoke(frame))
+ if (!handler->invoke(e.getConnectionId().getMember(), frame))
throw Exception(QPID_MSG("Invalid cluster control"));
}
}
- else {
- // Process connection controls & data via the connectionEventQueue
- // unless we are in the DISCARD state, in which case ignore.
- if (state != DISCARD) {
- e.setConnection(getConnection(e.getConnectionId()));
- connectionEventQueue.push(e);
- }
- }
+ else
+ handler->deliver(e);
}
catch (const std::exception& e) {
- // FIXME aconway 2008-01-30: exception handling.
QPID_LOG(critical, "Error in cluster deliver: " << e.what());
- assert(0);
- throw;
+ leave();
}
}
@@ -208,17 +187,19 @@
else { // control
AMQFrame frame;
while (frame.decode(buf))
- e.getConnection()->deliver(frame);
+ e.getConnection()->received(frame);
}
}
struct AddrList {
const cpg_address* addrs;
int count;
- AddrList(const cpg_address* a, int n) : addrs(a), count(n) {}
+ const char* prefix;
+ AddrList(const cpg_address* a, int n, const char* p=0) : addrs(a), count(n), prefix(p) {}
};
ostream& operator<<(ostream& o, const AddrList& a) {
+ if (a.count && a.prefix) o << a.prefix;
for (const cpg_address* p = a.addrs; p < a.addrs+a.count; ++p) {
const char* reasonString;
switch (p->reason) {
@@ -252,82 +233,41 @@
cpg_name */*group*/,
cpg_address *current, int nCurrent,
cpg_address *left, int nLeft,
- cpg_address */*joined*/, int nJoined)
+ cpg_address *joined, int nJoined)
{
Mutex::ScopedLock l(lock);
- // FIXME aconway 2008-09-15: use group terminology not cluster. Member not node.
- QPID_LOG(info, "Current cluster: " << AddrList(current, nCurrent));
- QPID_LOG_IF(info, nLeft, "Left the cluster: " << AddrList(left, nLeft));
+ QPID_LOG(debug, "Cluster: " << AddrList(current, nCurrent) << ". "
+ << AddrList(left, nLeft, "Left: "));
- map.left(left, nLeft);
if (find(left, left+nLeft, self) != left+nLeft) {
// I have left the group, this is the final config change.
QPID_LOG(notice, self << " left cluster " << name.str());
broker.shutdown();
return;
}
+
+ map.left(left, nLeft);
+ handler->configChange(current, nCurrent, left, nLeft, joined, nJoined);
- if (state == START) {
- if (nCurrent == 1 && *current == self) { // First in cluster.
- // First in cluster
- QPID_LOG(notice, self << " first in cluster.");
- map.add(self, url);
- ready();
- }
- updateMemberStats();
- return;
- }
-
- if (state == DISCARD && !map.dumper) // try another dump request.
- mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
-
- if (nJoined && map.sendUpdate(self)) // New members need update
- mcastControl(map.toControl(), 0);
-
+ // FIXME aconway 2008-09-17: management update.
//update mgnt stats
- updateMemberStats();
+ updateMemberStats();
}
-void Cluster::update(const FieldTable& members, uint64_t dumper) {
+void Cluster::update(const MemberId& id, const framing::FieldTable& members, uint64_t dumper) {
Mutex::ScopedLock l(lock);
- map.update(members, dumper);
- QPID_LOG(debug, "Cluster update: " << map);
- if (state == START) state = DISCARD; // Got first update.
- if (state == DISCARD && !map.dumper)
- mcastControl(ClusterDumpRequestBody(ProtocolVersion(), url.str()), 0);
+ handler->update(id, members, dumper);
}
void Cluster::dumpRequest(const MemberId& dumpee, const string& urlStr) {
Mutex::ScopedLock l(lock);
- if (map.dumper) return; // Dump already in progress, ignore.
- map.dumper = map.first();
- if (dumpee == self && state == DISCARD) { // My turn to receive a dump.
- QPID_LOG(info, self << " receiving state dump from " << map.dumper);
- // FIXME aconway 2008-09-15: RECEIVE DUMP
- // state = CATCHUP;
- // stall();
- // When received
- mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
- ready();
- }
- else if (map.dumper == self && state == READY) { // My turn to send the dump
- QPID_LOG(info, self << " sending state dump to " << dumpee);
- // FIXME aconway 2008-09-15: stall & send brain dump - finish DumpClient.
- // state = DUMPING;
- // stall();
- (void)urlStr;
- // When dump complete:
- assert(map.dumper == self);
- ClusterUpdateBody b = map.toControl();
- b.setDumper(0);
- mcastControl(b, 0);
- // NB: Don't modify my own map till self-delivery.
- }
+ handler->dumpRequest(dumpee, urlStr);
}
void Cluster::ready(const MemberId& member, const std::string& url) {
Mutex::ScopedLock l(lock);
- map.add(member, Url(url));
+ handler->ready(member, url);
+ // FIXME aconway 2008-09-17: management update.
}
broker::Broker& Cluster::getBroker(){ return broker; }
@@ -341,18 +281,18 @@
// FIXME aconway 2008-09-11: Flow control, we should slow down or
// stop reading from local connections while stalled to avoid an
// unbounded queue.
- if (mgmtObject!=0)
- mgmtObject->set_status("STALLED");
+ // if (mgmtObject!=0)
+ // mgmtObject->set_status("STALLED");
}
void Cluster::ready() {
// Called with lock held
- QPID_LOG(info, self << " ready with URL " << url);
- state = READY;
+ QPID_LOG(info, self << " ready at URL " << url);
+ mcastControl(ClusterReadyBody(ProtocolVersion(), url.str()), 0);
+ handler = &memberHandler; // Member mode.
connectionEventQueue.start(poller);
- // FIXME aconway 2008-09-15: stall/unstall map?
- if (mgmtObject!=0)
- mgmtObject->set_status("ACTIVE");
+ // if (mgmtObject!=0)
+ // mgmtObject->set_status("ACTIVE");
}
// Called from Broker::~Broker when broker is shut down. At this
@@ -367,52 +307,53 @@
delete this;
}
-ManagementObject* Cluster::GetManagementObject(void) const
-{
- return (ManagementObject*) mgmtObject;
+ManagementObject* Cluster::GetManagementObject(void) const {
+ return (ManagementObject*) mgmtObject;
}
-Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&)
-{
- Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
- QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
-
- switch (methodId)
- {
- case _qmf::Cluster::METHOD_STOPCLUSTERNODE:
- stopClusterNode();
- break;
- case _qmf::Cluster::METHOD_STOPFULLCLUSTER:
- stopFullCluster();
- break;
- }
+Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&) {
+ Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+ QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
+
+ switch (methodId)
+ {
+ case _qmf::Cluster::METHOD_STOPCLUSTERNODE:
+ stopClusterNode();
+ break;
+ case _qmf::Cluster::METHOD_STOPFULLCLUSTER:
+ stopFullCluster();
+ break;
+ }
- return status;
+ return status;
}
void Cluster::stopClusterNode(void)
{
+ // FIXME aconway 2008-09-18:
QPID_LOG(notice, self << " disconnected from cluster " << name.str());
broker.shutdown();
}
void Cluster::stopFullCluster(void)
{
+ // FIXME aconway 2008-09-17: TODO
}
void Cluster::updateMemberStats(void)
{
//update mgnt stats
- if (mgmtObject!=0){
- mgmtObject->set_clusterSize(size());
- std::vector<Url> vectUrl = getUrls();
- string urlstr;
- for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
- if (iter != vectUrl.begin()) urlstr += ";";
- urlstr += iter->str();
- }
- mgmtObject->set_members(urlstr);
- }
+ // FIXME aconway 2008-09-18:
+// if (mgmtObject!=0){
+// mgmtObject->set_clusterSize(size());
+// std::vector<Url> vectUrl = getUrls();
+// string urlstr;
+// for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
+// if (iter != vectUrl.begin()) urlstr += ";";
+// urlstr += iter->str();
+// }
+// mgmtObject->set_members(urlstr);
+// }
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Sep 18 06:55:30 2008
@@ -23,11 +23,12 @@
#include "Event.h"
#include "NoOpConnectionOutputHandler.h"
#include "ClusterMap.h"
+#include "JoiningHandler.h"
+#include "MemberHandler.h"
#include "qpid/broker/Broker.h"
#include "qpid/sys/PollableQueue.h"
#include "qpid/sys/Monitor.h"
-#include "qpid/framing/AMQP_AllOperations.h"
#include "qpid/Url.h"
#include "qpid/management/Manageable.h"
#include "qmf/org/apache/qpid/cluster/Cluster.h"
@@ -78,7 +79,7 @@
void leave();
// Cluster controls.
- void update(const framing::FieldTable& members, uint64_t dumping);
+ void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
void dumpRequest(const MemberId&, const std::string& url);
void ready(const MemberId&, const std::string& url);
@@ -127,8 +128,6 @@
/** Callback if CPG fd is disconnected. */
void disconnect(sys::DispatchHandle&);
- void handleMethod(MemberId from, cluster::Connection* connection, framing::AMQMethodBody& method);
-
boost::intrusive_ptr<cluster::Connection> getConnection(const ConnectionId&);
virtual qpid::management::ManagementObject* GetManagementObject(void) const;
@@ -151,6 +150,14 @@
EventQueue connectionEventQueue;
State state;
qmf::org::apache::qpid::cluster::Cluster* mgmtObject; // mgnt owns lifecycle
+
+ // Handlers for different states.
+ ClusterHandler* handler;
+ JoiningHandler joiningHandler;
+ MemberHandler memberHandler;
+
+ friend class JoiningHandler;
+ friend class MemberHandler;
};
}} // namespace qpid::cluster
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp?rev=696657&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp Thu Sep 18 06:55:30 2008
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "qpid/framing/AllInvoker.h"
+
+#include "ClusterHandler.h"
+#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/FieldTable.h"
+
+
+
+namespace qpid {
+namespace cluster {
+
+struct Operations : public framing::AMQP_AllOperations::ClusterHandler {
+ qpid::cluster::ClusterHandler& handler;
+ MemberId member;
+ Operations(qpid::cluster::ClusterHandler& c, const MemberId& id) : handler(c), member(id) {}
+
+ void update(const framing::FieldTable& members, uint64_t dumping) { handler.update(member, members, dumping); }
+ void dumpRequest(const std::string& url) { handler.dumpRequest(member, url); }
+ void ready(const std::string& url) { handler.ready(member, url); }
+};
+
+ClusterHandler::~ClusterHandler() {}
+
+ClusterHandler::ClusterHandler(Cluster& c) : cluster (c) {}
+
+bool ClusterHandler::invoke(const MemberId& id, framing::AMQFrame& frame) {
+ Operations ops(*this, id);
+ return framing::invoke(ops, *frame.getBody()).wasHandled();
+}
+
+}} // namespace qpid::cluster
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h?rev=696657&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h Thu Sep 18 06:55:30 2008
@@ -0,0 +1,65 @@
+#ifndef QPID_CLUSTER_CLUSTERHANDLER_H
+#define QPID_CLUSTER_CLUSTERHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "Cpg.h"
+#include "types.h"
+
+namespace qpid {
+
+namespace framing { class AMQFrame; }
+
+namespace cluster {
+
+class Cluster;
+class Event;
+
+/**
+ * Interface for handing cluster events.
+ * Implementations provide different behavior for different states of a member..
+ */
+class ClusterHandler
+{
+ public:
+ ClusterHandler(Cluster& c);
+ virtual ~ClusterHandler();
+
+ virtual void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping) = 0;
+ virtual void dumpRequest(const MemberId&, const std::string& url) = 0;
+ virtual void ready(const MemberId&, const std::string& url) = 0;
+
+ virtual void deliver(Event& e) = 0; // Deliver a connection event.
+
+ virtual void configChange(cpg_address *current, int nCurrent,
+ cpg_address *left, int nLeft,
+ cpg_address *joined, int nJoined) = 0;
+
+ bool invoke(const MemberId&, framing::AMQFrame& f);
+
+ protected:
+ Cluster& cluster;
+};
+
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_CLUSTERHANDLER_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Thu Sep 18 06:55:30 2008
@@ -53,8 +53,8 @@
return b;
}
-void ClusterMap::update(const FieldTable& ftMembers, uint64_t dumper_) {
- FieldTable::ValueMap::const_iterator i;
+void ClusterMap::update(const framing::FieldTable& ftMembers, uint64_t dumper_) {
+ framing:: FieldTable::ValueMap::const_iterator i;
for (i = ftMembers.begin(); i != ftMembers.end(); ++i)
members[i->first] = Url(i->second->get<std::string>());
dumper = MemberId(dumper_);
@@ -82,8 +82,10 @@
return dumper==id || (!dumper && first() == id);
}
-void ClusterMap::add(const MemberId& id, const Url& url) {
+void ClusterMap::ready(const MemberId& id, const Url& url) {
members[id] = url;
+ if (id == dumper)
+ dumper = MemberId();
}
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Thu Sep 18 06:55:30 2008
@@ -34,8 +34,6 @@
namespace qpid {
namespace cluster {
-// FIXME aconway 2008-09-15: rename cluster status?
-
/**
* Map of established cluster members and brain-dumps in progress.
* A dumper is an established member that is sending catch-up data.
@@ -58,8 +56,8 @@
/** Convert map contents to a cluster update body. */
framing::ClusterUpdateBody toControl() const;
- /** Add a new member. */
- void add(const MemberId& id, const Url& url);
+ /** Add a new member or dump complete if id == dumper. */
+ void ready(const MemberId& id, const Url& url);
/** Apply update delivered from clsuter. */
void update(const framing::FieldTable& members, uint64_t dumper);
@@ -70,6 +68,7 @@
std::vector<Url> memberUrls() const;
size_t size() const { return members.size(); }
+ bool empty() const { return members.empty(); }
private:
friend std::ostream& operator<<(std::ostream&, const ClusterMap&);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Thu Sep 18 06:55:30 2008
@@ -21,11 +21,9 @@
#include "Connection.h"
#include "Cluster.h"
#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/Invoker.h"
-#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/log/Statement.h"
-
+#include "qpid/framing/AllInvoker.h"
#include <boost/current_function.hpp>
namespace qpid {
@@ -47,11 +45,6 @@
Connection::~Connection() {}
-void Connection::received(framing::AMQFrame& ) {
- // FIXME aconway 2008-09-02: not called, codec sends straight to deliver
- assert(0);
-}
-
bool Connection::doOutput() { return output.doOutput(); }
// Delivery of doOutput allows us to run the real connection doOutput()
@@ -62,7 +55,7 @@
}
// Handle frames delivered from cluster.
-void Connection::deliver(framing::AMQFrame& f) {
+void Connection::received(framing::AMQFrame& f) {
QPID_LOG(trace, "DLVR [" << self << "]: " << f);
// Handle connection controls, deliver other frames to connection.
if (!framing::invoke(*this, *f.getBody()).wasHandled())
@@ -95,14 +88,13 @@
size_t Connection::decode(const char* buffer, size_t size) {
++mcastSeq;
cluster.mcastBuffer(buffer, size, self);
- // FIXME aconway 2008-09-01: deserialize?
return size;
}
void Connection::deliverBuffer(Buffer& buf) {
++deliverSeq;
while (decoder.decode(buf))
- deliver(decoder.frame); // FIXME aconway 2008-09-01: Queue frames for delivery in separate thread.
+ received(decoder.frame);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Thu Sep 18 06:55:30 2008
@@ -63,7 +63,6 @@
Cluster& getCluster() { return cluster; }
// self-delivery of multicast data.
- void deliver(framing::AMQFrame& f);
void deliverClose();
void deliverDoOutput(uint32_t requested);
void deliverBuffer(framing::Buffer&);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionCodec.cpp Thu Sep 18 06:55:30 2008
@@ -36,9 +36,7 @@
return 0;
}
-// FIXME aconway 2008-08-27: outbound connections need to be made
-// with proper qpid::client code for failover, get rid of this
-// broker-side hack.
+// Used for outgoing Link connections, we don't care.
sys::ConnectionCodec*
ConnectionCodec::Factory::create(sys::OutputControl& out, const std::string& id) {
return next->create(out, id);
@@ -60,7 +58,6 @@
return interceptor->decode(buffer, size);
}
-// FIXME aconway 2008-09-02: delegate to interceptor?
size_t ConnectionCodec::encode(const char* buffer, size_t size) { return codec.encode(buffer, size); }
bool ConnectionCodec::canEncode() { return codec.canEncode(); }
void ConnectionCodec::closed() { codec.closed(); }
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Thu Sep 18 06:55:30 2008
@@ -101,9 +101,7 @@
return flowState == CPG_FLOW_CONTROL_ENABLED;
}
-// TODO aconway 2008-08-07: better handling of flow control.
-// Wait for flow control to be disabled.
-// FIXME aconway 2008-08-08: does flow control check involve a round-trip? If so maybe remove...
+// FIXME aconway 2008-08-07: better handling of cpg flow control, no sleeping.
void Cpg::waitForFlowControl() {
int delayNs=1000; // one millisecond
int tries=8; // double the delay on each try.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Thu Sep 18 06:55:30 2008
@@ -44,36 +44,39 @@
using namespace client;
-DumpClient::DumpClient(const Url& url, Broker& b, const boost::function<void(const char*)>& f)
- : donor(b), failed(f)
+DumpClient::DumpClient(const Url& url, Broker& b,
+ const boost::function<void()>& ok,
+ const boost::function<void(const std::exception&)>& fail)
+ : donor(b), done(ok), failed(fail)
{
+ // FIXME aconway 2008-09-16: Identify as DumpClient connection.
connection.open(url);
session = connection.newSession();
}
-DumpClient::~DumpClient() {
- session.close();
- connection.close();
-}
+DumpClient::~DumpClient() {}
// Catch-up exchange name: an illegal AMQP exchange name to avoid clashes.
static const char CATCH_UP_CHARS[] = "\000qpid-dump-exchange";
static const std::string CATCH_UP(CATCH_UP_CHARS, sizeof(CATCH_UP_CHARS));
void DumpClient::dump() {
- // FIXME aconway 2008-09-08: send cluster map frame first.
donor.getExchanges().eachExchange(boost::bind(&DumpClient::dumpExchange, this, _1));
// Catch-up exchange is used to route messages to the proper queue without modifying routing key.
session.exchangeDeclare(arg::exchange=CATCH_UP, arg::type="fanout", arg::autoDelete=true);
donor.getQueues().eachQueue(boost::bind(&DumpClient::dumpQueue, this, _1));
session.sync();
+ session.close();
+ // FIXME aconway 2008-09-17: send dump complete indication.
+ connection.close();
}
void DumpClient::run() {
try {
dump();
- } catch (const Exception& e) {
- failed(e.what());
+ done();
+ } catch (const std::exception& e) {
+ failed(e);
}
delete this;
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h Thu Sep 18 06:55:30 2008
@@ -54,7 +54,10 @@
*/
class DumpClient : public sys::Runnable {
public:
- DumpClient(const Url& url, broker::Broker& donor, const boost::function<void(const char*)>& onFail);
+ DumpClient(const Url& url, broker::Broker& donor,
+ const boost::function<void()>& done,
+ const boost::function<void(const std::exception&)>& fail);
+
~DumpClient();
void dump();
void run(); // Will delete this when finished.
@@ -69,7 +72,8 @@
client::Connection connection;
client::AsyncSession session;
broker::Broker& donor;
- boost::function<void(const char*)> failed;
+ boost::function<void()> done;
+ boost::function<void(const std::exception& e)> failed;
};
}} // namespace qpid::cluster
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp?rev=696657&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp Thu Sep 18 06:55:30 2008
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "JoiningHandler.h"
+#include "Cluster.h"
+#include "qpid/framing/ClusterDumpRequestBody.h"
+#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/log/Statement.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace sys;
+using namespace framing;
+
+JoiningHandler::JoiningHandler(Cluster& c) : ClusterHandler(c), state(START) {}
+
+void JoiningHandler::configChange(
+ cpg_address *current, int nCurrent,
+ cpg_address */*left*/, int nLeft,
+ cpg_address */*joined*/, int /*nJoined*/)
+{
+ if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster.
+ QPID_LOG(notice, cluster.self << " first in cluster.");
+ cluster.map.ready(cluster.self, cluster.url);
+ cluster.ready();
+ }
+}
+
+void JoiningHandler::deliver(Event& e) {
+ // Discard connection events unless we are stalled and getting a dump.
+ if (state == STALLED) {
+ e.setConnection(cluster.getConnection(e.getConnectionId()));
+ cluster.connectionEventQueue.push(e);
+ }
+}
+
+void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) {
+ cluster.map.update(members, dumper);
+ QPID_LOG(debug, "Cluster update: " << cluster.map);
+ checkDumpRequest();
+}
+
+void JoiningHandler::checkDumpRequest() {
+ if (state == START && !cluster.map.dumper) {
+ cluster.broker.getPort(); // ensure the broker is listening.
+ state = DUMP_REQUESTED;
+ cluster.mcastControl(ClusterDumpRequestBody(framing::ProtocolVersion(), cluster.url.str()), 0);
+ }
+}
+
+void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
+ if (cluster.map.dumper) { // Already a dump in progress.
+ if (dumpee == cluster.self && state == DUMP_REQUESTED)
+ state = START; // Need to make another request.
+ }
+ else { // Start a new dump
+ cluster.map.dumper = cluster.map.first();
+ if (dumpee == cluster.self) { // My turn
+
+ state = DUMP_COMPLETE; // FIXME aconway 2008-09-18: bypass dump
+
+ QPID_LOG(info, cluster.self << " receiving state dump from " << cluster.map.dumper);
+ switch (state) {
+ case START:
+ case STALLED:
+ assert(0); break;
+
+ case DUMP_REQUESTED:
+ state = STALLED;
+ cluster.stall();
+ break;
+
+ // FIXME aconway 2008-09-17: no transition to DUMP_COMPLETE state.
+ case DUMP_COMPLETE:
+ cluster.ready();
+ break;
+ }
+ }
+ }
+}
+
+void JoiningHandler::ready(const MemberId& id, const std::string& url) {
+ cluster.map.ready(id, Url(url));
+ checkDumpRequest();
+}
+
+
+}} // namespace qpid::cluster
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h?rev=696657&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h Thu Sep 18 06:55:30 2008
@@ -0,0 +1,56 @@
+#ifndef QPID_CLUSTER_JOININGHANDLER_H
+#define QPID_CLUSTER_JOININGHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ClusterHandler.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Cluster handler for the "joining" phase, before the process is a
+ * full cluster member.
+ */
+class JoiningHandler : public ClusterHandler
+{
+ public:
+ JoiningHandler(Cluster& c);
+
+ void configChange(struct cpg_address */*members*/, int /*nMembers*/,
+ struct cpg_address */*left*/, int /*nLeft*/,
+ struct cpg_address */*joined*/, int /*nJoined*/
+ );
+
+ void deliver(Event& e);
+
+ void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
+ void dumpRequest(const MemberId&, const std::string& url);
+ void ready(const MemberId&, const std::string& url);
+
+ private:
+ enum { START, DUMP_REQUESTED, STALLED, DUMP_COMPLETE } state;
+ void checkDumpRequest();
+
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_JOININGHANDLER_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp?rev=696657&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp Thu Sep 18 06:55:30 2008
@@ -0,0 +1,83 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "MemberHandler.h"
+#include "Cluster.h"
+#include "DumpClient.h"
+#include "qpid/log/Statement.h"
+#include "qpid/framing/ClusterUpdateBody.h"
+
+namespace qpid {
+namespace cluster {
+
+using namespace sys;
+using namespace framing;
+
+MemberHandler::MemberHandler(Cluster& c) : ClusterHandler(c) {}
+
+void MemberHandler::configChange(
+ cpg_address */*current*/, int /*nCurrent*/,
+ cpg_address */*left*/, int /*nLeft*/,
+ cpg_address */*joined*/, int nJoined)
+{
+ if (nJoined && cluster.map.sendUpdate(cluster.self)) // New members need update
+ cluster.mcastControl(cluster.map.toControl(), 0);
+}
+
+void MemberHandler::deliver(Event& e) {
+ e.setConnection(cluster.getConnection(e.getConnectionId()));
+ cluster.connectionEventQueue.push(e);
+}
+
+void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {}
+
+void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlStr) {
+ if (cluster.map.dumper) return; // dump in progress, ignore request.
+
+ cluster.map.dumper = cluster.map.first();
+ if (cluster.map.dumper != cluster.self) return;
+
+ QPID_LOG(info, cluster.self << " sending state dump to " << dumpee);
+ assert(!cluster.connectionEventQueue.isStopped()); // Not currently stalled.
+ cluster.stall();
+
+ cluster.ready(); // FIXME aconway 2008-09-18: Bypass dump
+ (void)urlStr;
+// dumpThread = Thread(new DumpClient(Url(urlStr), cluster.broker,
+// boost::bind(&MemberHandler::dumpDone, this),
+// boost::bind(&MemberHandler::dumpError, this, _1)));
+}
+
+void MemberHandler::ready(const MemberId& id, const std::string& url) {
+ cluster.map.ready(id, Url(url));
+}
+
+
+void MemberHandler::dumpDone() {
+ dumpThread.join(); // Clean up.
+ cluster.ready();
+}
+
+void MemberHandler::dumpError(const std::exception& e) {
+ QPID_LOG(error, "Error in state dump from " << cluster.self << ": " << e.what());
+ dumpDone();
+}
+
+}} // namespace qpid::cluster
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h?rev=696657&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h Thu Sep 18 06:55:30 2008
@@ -0,0 +1,60 @@
+#ifndef QPID_CLUSTER_MEMBERHANDLER_H
+#define QPID_CLUSTER_MEMBERHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ClusterHandler.h"
+#include "qpid/sys/Thread.h"
+
+namespace qpid {
+namespace cluster {
+
+/**
+ * Cluster handler for the "member" phase, before the process is a
+ * full cluster member.
+ */
+class MemberHandler : public ClusterHandler
+{
+ public:
+ MemberHandler(Cluster& c);
+
+ void configChange(
+ struct cpg_address */*members*/, int /*nMembers*/,
+ struct cpg_address */*left*/, int /*nLeft*/,
+ struct cpg_address */*joined*/, int /*nJoined*/
+ );
+
+ void deliver(Event& e);
+
+ void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
+ void dumpRequest(const MemberId&, const std::string& url);
+ void ready(const MemberId&, const std::string& url);
+
+ void dumpDone();
+ void dumpError(const std::exception&);
+
+ public:
+ sys::Thread dumpThread;
+};
+}} // namespace qpid::cluster
+
+#endif /*!QPID_CLUSTER_MEMBERHANDLER_H*/
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/OutputInterceptor.cpp Thu Sep 18 06:55:30 2008
@@ -88,7 +88,6 @@
// Send a doOutput request if one is not already in flight.
void OutputInterceptor::sendDoOutput() {
// Call with lock held.
- // FIXME aconway 2008-08-28: used to have || parent.getClosed())
if (!parent.isLocal()) return;
doingOutput = true;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/PollableQueue.h Thu Sep 18 06:55:30 2008
@@ -71,6 +71,9 @@
/** Stop polling and wait for the current callback, if any, to complete. */
void stop();
+
+ /** Are we currently stopped?*/
+ bool isStopped() const;
private:
typedef sys::Monitor::ScopedLock ScopedLock;
@@ -78,7 +81,7 @@
void dispatch(sys::DispatchHandle&);
- sys::Monitor lock;
+ mutable sys::Monitor lock;
Callback callback;
PollableCondition condition;
sys::DispatchHandle handle;
@@ -130,6 +133,11 @@
while (dispatching) lock.wait();
}
+template <class T> bool PollableQueue<T>::isStopped() const {
+ ScopedLock l(lock);
+ return stopped;
+}
+
}} // namespace qpid::sys
#endif /*!QPID_SYS_POLLABLEQUEUE_H*/
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/DumpClientTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/DumpClientTest.cpp?rev=696657&r1=696656&r2=696657&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/DumpClientTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/DumpClientTest.cpp Thu Sep 18 06:55:30 2008
@@ -61,7 +61,7 @@
c.connection.close();
}
Url url(Url::getIpAddressesUrl(receiver.getPort()));
- qpid::cluster::DumpClient dump(url, *donor.broker, 0);
+ qpid::cluster::DumpClient dump(url, *donor.broker, 0, 0);
dump.dump();
{
Client r(receiver.getPort());