You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/01/27 00:17:29 UTC
svn commit: r737935 - in /qpid/trunk/qpid: cpp/src/qpid/cluster/
python/commands/
Author: tross
Date: Mon Jan 26 23:17:29 2009
New Revision: 737935
URL: http://svn.apache.org/viewvc?rev=737935&view=rev
Log:
Added qpid-cluster utility plus model changes to support it.
Fixed a segfault during cluster member shutdown.
Added:
qpid/trunk/qpid/python/commands/qpid-cluster (with props)
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
qpid/trunk/qpid/cpp/src/qpid/cluster/management-schema.xml
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=737935&r1=737934&r2=737935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Jan 26 23:17:29 2009
@@ -42,6 +42,7 @@
#include "qpid/memory.h"
#include "qpid/shared_ptr.h"
#include "qmf/org/apache/qpid/cluster/Package.h"
+#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
#include <boost/bind.hpp>
#include <boost/cast.hpp>
@@ -61,7 +62,7 @@
using qpid::management::ManagementObject;
using qpid::management::Manageable;
using qpid::management::Args;
-namespace qmf = qmf::org::apache::qpid::cluster;
+namespace _qmf = ::qmf::org::apache::qpid::cluster;
/**@file
Threading notes:
@@ -102,11 +103,11 @@
lastSize(0),
lastBroker(false)
{
- ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
- if (agent != 0){
- qmf::Package packageInit(agent);
- mgmtObject = new qmf::Cluster (agent, this, &broker,name,myUrl.str());
- agent->addObject (mgmtObject);
+ mAgent = ManagementAgent::Singleton::getInstance();
+ if (mAgent != 0){
+ _qmf::Package packageInit(mAgent);
+ mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str());
+ mAgent->addObject (mgmtObject);
mgmtObject->set_status("JOINING");
}
broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
@@ -132,6 +133,15 @@
connections.erase(id);
}
+std::vector<string> Cluster::getIds() const {
+ Lock l(lock);
+ return getIds(l);
+}
+
+std::vector<string> Cluster::getIds(Lock&) const {
+ return map.memberIds();
+}
+
std::vector<Url> Cluster::getUrls() const {
Lock l(lock);
return getUrls(l);
@@ -150,11 +160,11 @@
if (state != LEFT) {
state = LEFT;
QPID_LOG(notice, *this << " leaving cluster " << name);
- if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
try { cpg.leave(); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error leaving process group: " << e.what());
}
+ connections.clear();
try { broker.shutdown(); }
catch (const std::exception& e) {
QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
@@ -173,7 +183,7 @@
return cp;
}
-void Cluster::deliver(
+void Cluster::deliver(
cpg_handle_t /*handle*/,
cpg_name* /*group*/,
uint32_t nodeid,
@@ -467,16 +477,27 @@
ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; }
-Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) {
+Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, string&) {
Lock l(lock);
QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]");
switch (methodId) {
- case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break;
- case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break;
- default: return Manageable::STATUS_UNKNOWN_METHOD;
+ case _qmf::Cluster::METHOD_STOPCLUSTERNODE :
+ {
+ _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args;
+ stringstream stream;
+ stream << myId;
+ if (iargs.i_brokerId == stream.str())
+ stopClusterNode(l);
+ }
+ break;
+ case _qmf::Cluster::METHOD_STOPFULLCLUSTER :
+ stopFullCluster(l);
+ break;
+ default:
+ return Manageable::STATUS_UNKNOWN_METHOD;
}
return Manageable::STATUS_OK;
-}
+}
void Cluster::stopClusterNode(Lock& l) {
QPID_LOG(notice, *this << " stopped by admin");
@@ -491,6 +512,7 @@
void Cluster::memberUpdate(Lock& l) {
QPID_LOG(info, *this << " member update: " << map);
std::vector<Url> urls = getUrls(l);
+ std::vector<string> ids = getIds(l);
size_t size = urls.size();
failoverExchange->setUrls(urls);
@@ -512,10 +534,16 @@
mgmtObject->set_clusterSize(size);
string urlstr;
for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) {
- if (iter != urls.begin()) urlstr += "\n";
+ if (iter != urls.begin()) urlstr += ";";
urlstr += iter->str();
}
+ string idstr;
+ for(std::vector<string>::iterator iter = ids.begin(); iter != ids.end(); iter++ ) {
+ if (iter != ids.begin()) idstr += ";";
+ idstr += (*iter);
+ }
mgmtObject->set_members(urlstr);
+ mgmtObject->set_memberIDs(idstr);
}
// Close connections belonging to members that have now been excluded
@@ -545,8 +573,12 @@
void Cluster::setClusterId(const Uuid& uuid) {
clusterId = uuid;
- if (mgmtObject)
+ if (mgmtObject) {
+ stringstream stream;
+ stream << myId;
mgmtObject->set_clusterID(clusterId.str());
+ mgmtObject->set_memberID(stream.str());
+ }
QPID_LOG(debug, *this << " cluster-id = " << clusterId);
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=737935&r1=737934&r2=737935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Jan 26 23:17:29 2009
@@ -80,6 +80,7 @@
void erase(ConnectionId);
// URLs of current cluster members - called in connection threads.
+ std::vector<std::string> getIds() const;
std::vector<Url> getUrls() const;
boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; }
@@ -111,6 +112,7 @@
// a Lock to call the unlocked functions.
void leave(Lock&);
+ std::vector<std::string> getIds(Lock&) const;
std::vector<Url> getUrls(Lock&) const;
// Make an offer if we can - called in deliver thread.
@@ -185,6 +187,7 @@
framing::Uuid clusterId;
NoOpConnectionOutputHandler shadowOut;
ClusterMap::Set myElders;
+ qpid::management::ManagementAgent* mAgent;
// Thread safe members
Multicaster mcast;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=737935&r1=737934&r2=737935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Mon Jan 26 23:17:29 2009
@@ -107,6 +107,17 @@
return newbies.empty() ? MemberId() : newbies.begin()->first;
}
+std::vector<string> ClusterMap::memberIds() const {
+ std::vector<string> ids;
+ for (Map::const_iterator iter = members.begin();
+ iter != members.end(); iter++) {
+ std::stringstream stream;
+ stream << iter->first;
+ ids.push_back(stream.str());
+ }
+ return ids;
+}
+
std::vector<Url> ClusterMap::memberUrls() const {
std::vector<Url> urls(members.size());
std::transform(members.begin(), members.end(), urls.begin(),
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=737935&r1=737934&r2=737935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Mon Jan 26 23:17:29 2009
@@ -75,6 +75,7 @@
size_t aliveCount() const { return alive.size(); }
size_t memberCount() const { return members.size(); }
+ std::vector<std::string> memberIds() const;
std::vector<Url> memberUrls() const;
Set getAlive() const;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h?rev=737935&r1=737934&r2=737935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h Mon Jan 26 23:17:29 2009
@@ -75,6 +75,11 @@
}
}
+ void clear() {
+ ScopedLock l(lock);
+ map.clear();
+ }
+
size_t size() const { return map.size(); }
private:
typedef std::map<ConnectionId, ConnectionPtr> Map;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/management-schema.xml?rev=737935&r1=737934&r2=737935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/management-schema.xml (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/management-schema.xml Mon Jan 26 23:17:29 2009
@@ -40,13 +40,17 @@
<class name="Cluster">
<property name="brokerRef" type="objId" references="Broker" access="RC" index="y" parentRef="y"/>
<property name="clusterName" type="sstr" access="RC" desc="Name of cluster this server is a member of"/>
- <property name="clusterID" type="sstr" access="RO" desc="Globally uniquie ID (UUID) for this cluster instance"/>
+ <property name="clusterID" type="sstr" access="RO" desc="Globally unique ID (UUID) for this cluster instance"/>
+ <property name="memberID" type="sstr" access="RO" desc="ID of this member of the cluster"/>
<property name="publishedURL" type="sstr" access="RC" desc="URL this node advertizes itself as"/>
<property name="clusterSize" type="uint16" access="RO" desc="Number of brokers currently in the cluster"/>
<property name="status" type="sstr" access="RO" desc="Cluster node status (STALLED,ACTIVE,JOINING)"/>
- <property name="members" type="lstr" access="RO" desc="List of member URLs delimited by ';'"/>
+ <property name="members" type="lstr" access="RO" desc="List of member URLs delimited by ';'"/>
+ <property name="memberIDs" type="lstr" access="RO" desc="List of member IDs delimited by ';'"/>
- <method name="stopClusterNode"/>
+ <method name="stopClusterNode">
+ <arg name="brokerId" type="sstr" dir="I"/>
+ </method>
<method name="stopFullCluster"/>
</class>
Added: qpid/trunk/qpid/python/commands/qpid-cluster
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-cluster?rev=737935&view=auto
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-cluster (added)
+++ qpid/trunk/qpid/python/commands/qpid-cluster Mon Jan 26 23:17:29 2009
@@ -0,0 +1,180 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import locale
+from qmf.console import Session
+
+_host = "localhost"
+_stopId = None
+_stopAll = False
+_force = False
+
+def Usage ():
+ print "Usage: qpid-cluster [OPTIONS] [broker-addr]"
+ print
+ print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
+ print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
+ print
+ print "Options:"
+ print " -s [--stop] ID Stop one member of the cluster by its ID"
+ print " -k [--all-stop] Shut down the whole cluster"
+ print " -f [--force] Suppress the 'are-you-sure?' prompt"
+ print
+ sys.exit (1)
+
+class BrokerManager:
+ def __init__(self):
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+
+ def SetBroker(self, brokerUrl):
+ self.url = brokerUrl
+ self.qmf = Session()
+ self.broker = self.qmf.addBroker(brokerUrl)
+ agents = self.qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == 0:
+ self.brokerAgent = a
+
+ def Disconnect(self):
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+
+ def overview(self):
+ packages = self.qmf.getPackages()
+ if "org.apache.qpid.cluster" not in packages:
+ print "Clustering is not installed on the broker."
+ sys.exit(0)
+
+ clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+ if len(clusters) == 0:
+ print "Clustering is installed but not enabled on the broker."
+ sys.exit(0)
+
+ cluster = clusters[0]
+ myUrl = cluster.publishedURL
+ memberList = cluster.members.split(";")
+ idList = cluster.memberIDs.split(";")
+
+ print " Cluster Name: %s" % cluster.clusterName
+ print "Cluster Status: %s" % cluster.status
+ print " Cluster Size: %d" % cluster.clusterSize
+ print " Members: ID=%s URL=%s" % (idList[0], memberList[0])
+ for idx in range(1,len(idList)):
+ print " : ID=%s URL=%s" % (idList[idx], memberList[idx])
+
+ def stopMember(self, id):
+ clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+ if len(clusters) == 0:
+ print "Clustering is installed but not enabled on the broker."
+ sys.exit(0)
+
+ cluster = clusters[0]
+ idList = cluster.memberIDs.split(";")
+ if id not in idList:
+ print "No member with matching ID found"
+ sys.exit(1)
+
+ if not _force:
+ prompt = "Warning: "
+ if len(idList) == 1:
+ prompt += "This command will shut down the last running cluster member."
+ else:
+ prompt += "This command will shut down a cluster member."
+ prompt += " Are you sure? [N]: "
+
+ confirm = raw_input(prompt)
+ if len(confirm) == 0 or confirm[0].upper() != 'Y':
+ print "Operation canceled"
+ sys.exit(1)
+
+ cluster.stopClusterNode(id)
+
+ def stopAll(self):
+ clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+ if len(clusters) == 0:
+ print "Clustering is installed but not enabled on the broker."
+ sys.exit(0)
+
+ if not _force:
+ prompt = "Warning: This command will shut down the entire cluster."
+ prompt += " Are you sure? [N]: "
+
+ confirm = raw_input(prompt)
+ if len(confirm) == 0 or confirm[0].upper() != 'Y':
+ print "Operation canceled"
+ sys.exit(1)
+
+ cluster = clusters[0]
+ cluster.stopFullCluster()
+
+##
+## Main Program
+##
+
+try:
+ longOpts = ("stop=", "all-stop", "force")
+ (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "s:kf", longOpts)
+except:
+ Usage ()
+
+try:
+ encoding = locale.getpreferredencoding()
+ cargs = [a.decode(encoding) for a in encArgs]
+except:
+ cargs = encArgs
+
+for opt in optlist:
+ if opt[0] == "-s" or opt[0] == "--stop":
+ _stopId = opt[1]
+ if opt[0] == "-k" or opt[0] == "--all-stop":
+ _stopAll = True
+ if opt[0] == "-f" or opt[0] == "--force":
+ _force = True
+
+nargs = len(cargs)
+bm = BrokerManager()
+
+if nargs == 1:
+ _host = cargs[0]
+
+try:
+ bm.SetBroker(_host)
+ if _stopId:
+ bm.stopMember(_stopId)
+ elif _stopAll:
+ bm.stopAll()
+ else:
+ bm.overview()
+except KeyboardInterrupt:
+ print
+except Exception,e:
+ if e.__repr__().find("connection aborted") > 0:
+ # we expect this when asking the connected broker to shut down
+ sys.exit(0)
+ print "Failed:", e.args
+ sys.exit(1)
+
+bm.Disconnect()
Propchange: qpid/trunk/qpid/python/commands/qpid-cluster
------------------------------------------------------------------------------
svn:executable = *
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org