You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2008/09/25 14:30:15 UTC
svn commit: r698945 - in /incubator/qpid/trunk/qpid/cpp: src/qpid/
src/qpid/amqp_0_10/ src/qpid/broker/ src/qpid/cluster/ xml/
Author: aconway
Date: Thu Sep 25 05:30:14 2008
New Revision: 698945
URL: http://svn.apache.org/viewvc?rev=698945&view=rev
Log:
Enabled management, add cluster shutdown command.
Remove dead Handler methods in Cluster.
Fixed SessionException handling in broker, was throwing some SessionExceptions as "unknown exception"
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/SessionState.cpp Thu Sep 25 05:30:14 2008
@@ -147,7 +147,7 @@
void SessionState::senderConfirmed(const SessionPoint& confirmed) {
if (confirmed > sender.sendPoint)
- throw InvalidArgumentException(QPID_MSG(getId() << "Confirmed commands not yet sent."));
+ throw InvalidArgumentException(QPID_MSG(getId() << ": confirmed commands not yet sent."));
QPID_LOG(debug, getId() << ": sender confirmed point moved to " << confirmed);
ReplayList::iterator i = sender.replayList.begin();
while (i != sender.replayList.end() && sender.replayPoint.command < confirmed.command) {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.cpp Thu Sep 25 05:30:14 2008
@@ -81,6 +81,15 @@
getInHandler()->handle(f);
}
}
+ catch(const SessionException& e) {
+ QPID_LOG(error, "Execution exception: " << e.what());
+ framing::AMQP_AllProxy::Execution execution(channel);
+ AMQMethodBody* m = f.getMethod();
+ SequenceNumber commandId;
+ if (getState()) commandId = getState()->receiverGetCurrent();
+ execution.exception(e.code, commandId, m ? m->amqpClassId() : 0, m ? m->amqpMethodId() : 0, 0, e.what(), FieldTable());
+ sendDetach();
+ }
catch(const ChannelException& e){
QPID_LOG(error, "Channel exception: " << e.what());
peer.detached(name, e.code);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/amqp_0_10/SessionHandler.h Thu Sep 25 05:30:14 2008
@@ -43,8 +43,6 @@
public framing::FrameHandler::InOutHandler
{
public:
- typedef framing::AMQP_AllProxy::Session Peer;
-
SessionHandler(framing::FrameHandler* out=0, uint16_t channel=0);
~SessionHandler();
@@ -103,7 +101,7 @@
void checkName(const std::string& name);
framing::ChannelHandler channel;
- Peer peer;
+ framing::AMQP_AllProxy::Session peer;
bool ignoring;
bool sendReady, receiveReady;
std::string name;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Thu Sep 25 05:30:14 2008
@@ -212,31 +212,15 @@
void SessionState::handleIn(AMQFrame& frame) {
SequenceNumber commandId = receiverGetCurrent();
- try {
- //TODO: make command handling more uniform, regardless of whether
- //commands carry content.
- AMQMethodBody* m = frame.getMethod();
- if (m == 0 || m->isContentBearing()) {
- handleContent(frame, commandId);
- } else if (frame.getBof() && frame.getEof()) {
- handleCommand(frame.getMethod(), commandId);
- } else {
- throw InternalErrorException("Cannot handle multi-frame command segments yet");
- }
- } catch(const SessionException& e) {
- //TODO: better implementation of new exception handling mechanism
-
- //0-10 final changes the types of exceptions, 'model layer'
- //exceptions will all be session exceptions regardless of
- //current channel/connection classification
-
- AMQMethodBody* m = frame.getMethod();
- if (m) {
- getProxy().getExecution().exception(e.code, commandId, m->amqpClassId(), m->amqpMethodId(), 0, e.what(), FieldTable());
- } else {
- getProxy().getExecution().exception(e.code, commandId, 0, 0, 0, e.what(), FieldTable());
- }
- handler->sendDetach();
+ //TODO: make command handling more uniform, regardless of whether
+ //commands carry content.
+ AMQMethodBody* m = frame.getMethod();
+ if (m == 0 || m->isContentBearing()) {
+ handleContent(frame, commandId);
+ } else if (frame.getBof() && frame.getEof()) {
+ handleCommand(frame.getMethod(), commandId);
+ } else {
+ throw InternalErrorException("Cannot handle multi-frame command segments yet");
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu Sep 25 05:30:14 2008
@@ -26,6 +26,7 @@
#include "qpid/framing/ClusterDumpRequestBody.h"
#include "qpid/framing/ClusterUpdateBody.h"
#include "qpid/framing/ClusterReadyBody.h"
+#include "qpid/framing/ClusterShutdownBody.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ClusterConnectionDeliverDoOutputBody.h"
#include "qpid/log/Statement.h"
@@ -51,7 +52,7 @@
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
-namespace _qmf = qmf::org::apache::qpid::cluster;
+namespace qmf = qmf::org::apache::qpid::cluster;
Cluster::Cluster(const std::string& name_, const Url& url_, broker::Broker& b) :
broker(b),
@@ -66,6 +67,7 @@
boost::bind(&Cluster::disconnect, this, _1) // disconnect
),
connectionEventQueue(EventQueue::forEach(boost::bind(&Cluster::connectionEvent, this, _1))),
+ mgmtObject(0),
handler(&joiningHandler),
joiningHandler(*this),
memberHandler(*this),
@@ -73,30 +75,25 @@
{
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
if (agent != 0){
- _qmf::Package packageInit(agent);
- mgmtObject = new _qmf::Cluster (agent, this, &broker,name.str(),url.str());
+ qmf::Package packageInit(agent);
+ mgmtObject = new qmf::Cluster (agent, this, &broker,name.str(),url.str());
agent->addObject (mgmtObject);
mgmtObject->set_status("JOINING");
-
+
+ // FIXME aconway 2008-09-24:
// if first cluster up set new UUID to set_clusterID() else set UUID of cluster being joined.
}
QPID_LOG(notice, self << " joining cluster " << name.str());
- broker.addFinalizer(boost::bind(&Cluster::shutdown, this));
+ broker.addFinalizer(boost::bind(&Cluster::brokerShutdown, this));
cpgDispatchHandle.startWatch(poller);
cpg.join(name);
}
Cluster::~Cluster() {}
-void Cluster::insert(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(lock);
- handler->insert(c);
-}
+void Cluster::insert(const boost::intrusive_ptr<Connection>& c) { handler->insert(c); }
-void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
- Mutex::ScopedLock l(lock);
- handler->catchUpClosed(c);
-}
+void Cluster::catchUpClosed(const boost::intrusive_ptr<Connection>& c) { handler->catchUpClosed(c); }
void Cluster::erase(ConnectionId id) {
Mutex::ScopedLock l(lock);
@@ -239,10 +236,8 @@
}
void Cluster::disconnect(sys::DispatchHandle& ) {
- // FIXME aconway 2008-09-11: this should be logged as critical,
- // when we provide admin option to shut down cluster and let
- // members leave cleanly.
- stopClusterNode();
+ QPID_LOG(critical, self << " unexpectedly disconnected from cluster, shutting down");
+ broker.shutdown();
}
void Cluster::configChange(
@@ -265,27 +260,8 @@
map.left(left, nLeft);
handler->configChange(current, nCurrent, left, nLeft, joined, nJoined);
-
- // FIXME aconway 2008-09-17: management update.
- //update mgnt stats
- updateMemberStats();
}
-void Cluster::update(const MemberId& id, const framing::FieldTable& members, uint64_t dumper) {
- Mutex::ScopedLock l(lock);
- handler->update(id, members, dumper);
-}
-
-void Cluster::dumpRequest(const MemberId& dumpee, const string& urlStr) {
- Mutex::ScopedLock l(lock);
- handler->dumpRequest(dumpee, urlStr);
-}
-
-void Cluster::ready(const MemberId& member, const std::string& url) {
- Mutex::ScopedLock l(lock);
- handler->ready(member, url);
- // FIXME aconway 2008-09-17: management update.
-}
broker::Broker& Cluster::getBroker(){ return broker; }
@@ -295,12 +271,11 @@
// Stop processing connection events. We still process config changes
// and cluster controls in deliver()
connectionEventQueue.stop();
+ if (mgmtObject!=0) mgmtObject->set_status("STALLED");
// FIXME aconway 2008-09-11: Flow control, we should slow down or
// stop reading from local connections while stalled to avoid an
// unbounded queue.
- // if (mgmtObject!=0)
- // mgmtObject->set_status("STALLED");
}
void Cluster::ready() {
@@ -314,8 +289,7 @@
QPID_LOG(debug, self << " un-stalling");
handler = &memberHandler; // Member mode.
connectionEventQueue.start(poller);
- // if (mgmtObject!=0)
- // mgmtObject->set_status("ACTIVE");
+ if (mgmtObject!=0) mgmtObject->set_status("ACTIVE");
}
// Called from Broker::~Broker when broker is shut down. At this
@@ -323,61 +297,46 @@
// invoked. We must ensure that CPG has also shut down so no CPG
// callbacks will be invoked.
//
-void Cluster::shutdown() {
+void Cluster::brokerShutdown() {
QPID_LOG(notice, self << " shutting down.");
try { cpg.shutdown(); }
catch (const std::exception& e) { QPID_LOG(error, "During CPG shutdown: " << e.what()); }
delete this;
}
-ManagementObject* Cluster::GetManagementObject(void) const {
- return (ManagementObject*) mgmtObject;
-}
+ManagementObject* Cluster::GetManagementObject(void) const { return mgmtObject; }
-Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& /*args*/, string&) {
- Manageable::status_t status = Manageable::STATUS_UNKNOWN_METHOD;
+Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) {
QPID_LOG (debug, "Queue::ManagementMethod [id=" << methodId << "]");
-
- switch (methodId)
- {
- case _qmf::Cluster::METHOD_STOPCLUSTERNODE:
- stopClusterNode();
- break;
- case _qmf::Cluster::METHOD_STOPFULLCLUSTER:
- stopFullCluster();
- break;
+ switch (methodId) {
+ case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(); break;
+ case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(); break;
+ default: return Manageable::STATUS_UNKNOWN_METHOD;
}
-
- return status;
+ return Manageable::STATUS_OK;
}
-void Cluster::stopClusterNode(void)
-{
- // FIXME aconway 2008-09-18: mgmt
- QPID_LOG(notice, self << " disconnected from cluster " << name.str());
- broker.shutdown();
-}
-
-void Cluster::stopFullCluster(void)
-{
- // FIXME aconway 2008-09-17: TODO
-}
-
-void Cluster::updateMemberStats(void)
-{
- //update mgnt stats
- // FIXME aconway 2008-09-18:
-// if (mgmtObject!=0){
-// mgmtObject->set_clusterSize(size());
-// std::vector<Url> vectUrl = getUrls();
-// string urlstr;
-// for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
-// if (iter != vectUrl.begin()) urlstr += ";";
-// urlstr += iter->str();
-// }
-// mgmtObject->set_members(urlstr);
-// }
-
+void Cluster::stopClusterNode(void) {
+ QPID_LOG(notice, self << " stopped by admin");
+ leave();
+}
+
+void Cluster::stopFullCluster(void) {
+ QPID_LOG(notice, self << " sending shutdown to cluster.");
+ mcastControl(ClusterShutdownBody(), 0);
+}
+
+void Cluster::updateMemberStats(void) {
+ if (mgmtObject) {
+ mgmtObject->set_clusterSize(size());
+ std::vector<Url> vectUrl = getUrls();
+ string urlstr;
+ for(std::vector<Url>::iterator iter = vectUrl.begin(); iter != vectUrl.end(); iter++ ) {
+ if (iter != vectUrl.begin()) urlstr += "\n";
+ urlstr += iter->str();
+ }
+ mgmtObject->set_members(urlstr);
+ }
}
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu Sep 25 05:30:14 2008
@@ -83,11 +83,6 @@
/** Leave the cluster */
void leave();
- // Cluster controls.
- void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
- void dumpRequest(const MemberId&, const std::string& url);
- void ready(const MemberId&, const std::string& url);
-
MemberId getSelf() const { return self; }
MemberId getId() const { return self; }
@@ -95,7 +90,7 @@
void stall();
void unstall();
- void shutdown();
+ void brokerShutdown();
broker::Broker& getBroker();
@@ -172,6 +167,7 @@
size_t mcastId;
+ friend class ClusterHandler;
friend class JoiningHandler;
friend class MemberHandler;
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.cpp Thu Sep 25 05:30:14 2008
@@ -19,11 +19,13 @@
*
*/
-#include "qpid/framing/AllInvoker.h"
-
+#include "Cluster.h"
#include "ClusterHandler.h"
+
#include "qpid/framing/AMQFrame.h"
+#include "qpid/framing/AllInvoker.h"
#include "qpid/framing/FieldTable.h"
+#include "qpid/log/Statement.h"
@@ -38,6 +40,7 @@
void update(const framing::FieldTable& members, uint64_t dumping) { handler.update(member, members, dumping); }
void dumpRequest(const std::string& url) { handler.dumpRequest(member, url); }
void ready(const std::string& url) { handler.ready(member, url); }
+ void shutdown() { handler.shutdown(member); }
};
ClusterHandler::~ClusterHandler() {}
@@ -49,5 +52,11 @@
return framing::invoke(ops, *frame.getBody()).wasHandled();
}
+void ClusterHandler::shutdown(const MemberId& id) {
+ QPID_LOG(notice, cluster.self << " received shutdown from " << id);
+ cluster.leave();
+}
+
+
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterHandler.h Thu Sep 25 05:30:14 2008
@@ -51,6 +51,7 @@
virtual void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping) = 0;
virtual void dumpRequest(const MemberId&, const std::string& url) = 0;
virtual void ready(const MemberId&, const std::string& url) = 0;
+ virtual void shutdown(const MemberId&);
virtual void deliver(Event& e) = 0; // Deliver a connection event.
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/JoiningHandler.cpp Thu Sep 25 05:30:14 2008
@@ -37,6 +37,7 @@
cpg_address */*left*/, int nLeft,
cpg_address */*joined*/, int /*nJoined*/)
{
+ // FIXME aconway 2008-09-24: Called with lock held - volatile
if (nLeft == 0 && nCurrent == 1 && *current == cluster.self) { // First in cluster.
QPID_LOG(notice, cluster.self << " first in cluster.");
cluster.map.ready(cluster.self, cluster.url);
@@ -53,9 +54,11 @@
}
void JoiningHandler::update(const MemberId&, const framing::FieldTable& members, uint64_t dumper) {
+ Mutex::ScopedLock l(cluster.lock);
cluster.map.update(members, dumper);
QPID_LOG(debug, "Cluster update: " << cluster.map);
checkDumpRequest();
+ cluster.updateMemberStats();
}
void JoiningHandler::checkDumpRequest() {
@@ -67,6 +70,7 @@
}
void JoiningHandler::dumpRequest(const MemberId& dumpee, const std::string& ) {
+ Mutex::ScopedLock l(cluster.lock);
if (cluster.map.dumper) { // Already a dump in progress.
if (dumpee == cluster.self && state == DUMP_REQUESTED)
state = START; // Need to make another request.
@@ -96,11 +100,13 @@
}
void JoiningHandler::ready(const MemberId& id, const std::string& url) {
+ Mutex::ScopedLock l(cluster.lock);
cluster.map.ready(id, Url(url));
checkDumpRequest();
}
void JoiningHandler::insert(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(cluster.lock);
if (c->isCatchUp()) {
++catchUpConnections;
QPID_LOG(debug, "Catch-up connection " << *c << " started, total " << catchUpConnections);
@@ -109,6 +115,7 @@
}
void JoiningHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(cluster.lock);
QPID_LOG(debug, "Catch-up complete for " << *c << ", remaining catch-ups: " << catchUpConnections-1);
if (c->isShadow())
cluster.connections.insert(Cluster::ConnectionMap::value_type(c->getId(), c));
@@ -118,7 +125,7 @@
void JoiningHandler::dumpComplete() {
// FIXME aconway 2008-09-18: need to detect incomplete dump.
- //
+ // Called with lock - volatile?
if (state == STALLED) {
QPID_LOG(debug, cluster.self << " received dump and stalled at start point, unstalling.");
cluster.ready();
@@ -130,4 +137,5 @@
}
}
+
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.cpp Thu Sep 25 05:30:14 2008
@@ -43,6 +43,7 @@
cpg_address */*left*/, int /*nLeft*/,
cpg_address */*joined*/, int nJoined)
{
+ // FIXME aconway 2008-09-24: Called with lock held - volatile
if (nJoined && cluster.map.sendUpdate(cluster.self)) // New members need update
cluster.mcastControl(cluster.map.toControl(), 0);
}
@@ -51,9 +52,13 @@
cluster.connectionEventQueue.push(e);
}
-void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {}
+void MemberHandler::update(const MemberId&, const framing::FieldTable& , uint64_t) {
+ Mutex::ScopedLock l(cluster.lock);
+ cluster.updateMemberStats();
+}
void MemberHandler::dumpRequest(const MemberId& dumpee, const std::string& urlStr) {
+ Mutex::ScopedLock l(cluster.lock);
if (cluster.map.dumper) return; // dump in progress, ignore request.
cluster.map.dumper = cluster.map.first();
@@ -76,17 +81,18 @@
void MemberHandler::dumpSent() {
- QPID_LOG(debug, "Finished sending state dump.");
Mutex::ScopedLock l(cluster.lock);
+ QPID_LOG(debug, "Finished sending state dump.");
cluster.ready();
}
void MemberHandler::dumpError(const std::exception& e) {
- QPID_LOG(error, "Error sending state dump from " << cluster.self << ": " << e.what());
+ QPID_LOG(error, cluster.self << " error sending state dump: " << e.what());
dumpSent();
}
void MemberHandler::insert(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(cluster.lock);
if (c->isCatchUp()) // Not allowed in member mode
c->getBrokerConnection().close(execution::ERROR_CODE_ILLEGAL_STATE, "Not in catch-up mode.");
else
@@ -94,6 +100,7 @@
}
void MemberHandler::catchUpClosed(const boost::intrusive_ptr<Connection>& c) {
+ Mutex::ScopedLock l(cluster.lock);
QPID_LOG(warning, "Catch-up connection " << c << " closed in member mode");
assert(0);
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/MemberHandler.h Thu Sep 25 05:30:14 2008
@@ -48,7 +48,7 @@
void update(const MemberId&, const framing::FieldTable& members, uint64_t dumping);
void dumpRequest(const MemberId&, const std::string& url);
void ready(const MemberId&, const std::string& url);
-
+
void dumpSent();
void dumpError(const std::exception&);
Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=698945&r1=698944&r2=698945&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Thu Sep 25 05:30:14 2008
@@ -39,6 +39,8 @@
<control name="ready" code="0x3" label="New member is ready.">
<field name="url" type="str16" label="Url for brain dump."/>
</control>
+
+ <control name="shutdown" code="0x4" label="Shut down cluster"/>
</class>
<!-- TODO aconway 2008-09-10: support for un-attached connections. -->