You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/01/27 00:17:29 UTC

svn commit: r737935 - in /qpid/trunk/qpid: cpp/src/qpid/cluster/ python/commands/

Author: tross
Date: Mon Jan 26 23:17:29 2009
New Revision: 737935

URL: http://svn.apache.org/viewvc?rev=737935&view=rev
Log:
Added qpid-cluster utility plus model changes to support it.
Fixed a segfault during cluster member shutdown.

Added:
    qpid/trunk/qpid/python/commands/qpid-cluster   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/management-schema.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=737935&r1=737934&r2=737935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Mon Jan 26 23:17:29 2009
@@ -42,6 +42,7 @@
 #include "qpid/memory.h"
 #include "qpid/shared_ptr.h"
 #include "qmf/org/apache/qpid/cluster/Package.h"
+#include "qmf/org/apache/qpid/cluster/ArgsClusterStopClusterNode.h"
 
 #include <boost/bind.hpp>
 #include <boost/cast.hpp>
@@ -61,7 +62,7 @@
 using qpid::management::ManagementObject;
 using qpid::management::Manageable;
 using qpid::management::Args;
-namespace qmf = qmf::org::apache::qpid::cluster;
+namespace _qmf = ::qmf::org::apache::qpid::cluster;
 
 /**@file
    Threading notes:
@@ -102,11 +103,11 @@
     lastSize(0),
     lastBroker(false)
 {
-    ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
-    if (agent != 0){
-        qmf::Package  packageInit(agent);
-        mgmtObject = new qmf::Cluster (agent, this, &broker,name,myUrl.str());
-        agent->addObject (mgmtObject);
+    mAgent = ManagementAgent::Singleton::getInstance();
+    if (mAgent != 0){
+        _qmf::Package  packageInit(mAgent);
+        mgmtObject = new _qmf::Cluster (mAgent, this, &broker,name,myUrl.str());
+        mAgent->addObject (mgmtObject);
         mgmtObject->set_status("JOINING");
     }
     broker.getKnownBrokers = boost::bind(&Cluster::getUrls, this);
@@ -132,6 +133,15 @@
     connections.erase(id);
 }
 
+std::vector<string> Cluster::getIds() const {
+    Lock l(lock);
+    return getIds(l);
+}
+
+std::vector<string> Cluster::getIds(Lock&) const {
+    return map.memberIds();
+}
+
 std::vector<Url> Cluster::getUrls() const {
     Lock l(lock);
     return getUrls(l);
@@ -150,11 +160,11 @@
     if (state != LEFT) {
         state = LEFT;
         QPID_LOG(notice, *this << " leaving cluster " << name);
-        if (mgmtObject!=0) mgmtObject->set_status("SHUTDOWN");
         try { cpg.leave(); }
         catch (const std::exception& e) {
             QPID_LOG(critical, *this << " error leaving process group: " << e.what());
         }
+        connections.clear();
         try { broker.shutdown(); }
         catch (const std::exception& e) {
             QPID_LOG(critical, *this << " error during broker shutdown: " << e.what());
@@ -173,7 +183,7 @@
     return cp;
 }
 
-void Cluster::deliver(          
+void Cluster::deliver(
     cpg_handle_t /*handle*/,
     cpg_name* /*group*/,
     uint32_t nodeid,
@@ -467,16 +477,27 @@
 
 ManagementObject* Cluster::GetManagementObject() const { return mgmtObject; }
 
-Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args&, string&) {
+Manageable::status_t Cluster::ManagementMethod (uint32_t methodId, Args& args, string&) {
     Lock l(lock);
     QPID_LOG(debug, *this << " managementMethod [id=" << methodId << "]");
     switch (methodId) {
-      case qmf::Cluster::METHOD_STOPCLUSTERNODE: stopClusterNode(l); break;
-      case qmf::Cluster::METHOD_STOPFULLCLUSTER: stopFullCluster(l); break;
-      default: return Manageable::STATUS_UNKNOWN_METHOD;
+    case _qmf::Cluster::METHOD_STOPCLUSTERNODE :
+        {
+            _qmf::ArgsClusterStopClusterNode& iargs = (_qmf::ArgsClusterStopClusterNode&) args;
+            stringstream stream;
+            stream << myId;
+            if (iargs.i_brokerId == stream.str())
+                stopClusterNode(l);
+        }
+        break;
+    case _qmf::Cluster::METHOD_STOPFULLCLUSTER :
+        stopFullCluster(l);
+        break;
+    default:
+        return Manageable::STATUS_UNKNOWN_METHOD;
     }
     return Manageable::STATUS_OK;
-}    
+}
 
 void Cluster::stopClusterNode(Lock& l) {
     QPID_LOG(notice, *this << " stopped by admin");
@@ -491,6 +512,7 @@
 void Cluster::memberUpdate(Lock& l) {
     QPID_LOG(info, *this << " member update: " << map);
     std::vector<Url> urls = getUrls(l);
+    std::vector<string> ids = getIds(l);
     size_t size = urls.size();
     failoverExchange->setUrls(urls);
 
@@ -512,10 +534,16 @@
         mgmtObject->set_clusterSize(size); 
         string urlstr;
         for(std::vector<Url>::iterator iter = urls.begin(); iter != urls.end(); iter++ ) {
-            if (iter != urls.begin()) urlstr += "\n";
+            if (iter != urls.begin()) urlstr += ";";
             urlstr += iter->str();
         }
+        string idstr;
+        for(std::vector<string>::iterator iter = ids.begin(); iter != ids.end(); iter++ ) {
+            if (iter != ids.begin()) idstr += ";";
+            idstr += (*iter);
+        }
         mgmtObject->set_members(urlstr);
+        mgmtObject->set_memberIDs(idstr);
     }
 
     // Close connections belonging to members that have now been excluded
@@ -545,8 +573,12 @@
 
 void Cluster::setClusterId(const Uuid& uuid) {
     clusterId = uuid;
-    if (mgmtObject)
+    if (mgmtObject) {
+        stringstream stream;
+        stream << myId;
         mgmtObject->set_clusterID(clusterId.str());
+        mgmtObject->set_memberID(stream.str());
+    }
     QPID_LOG(debug, *this << " cluster-id = " << clusterId);
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=737935&r1=737934&r2=737935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Mon Jan 26 23:17:29 2009
@@ -80,6 +80,7 @@
     void erase(ConnectionId);       
     
     // URLs of current cluster members - called in connection threads.
+    std::vector<std::string> getIds() const;
     std::vector<Url> getUrls() const;
     boost::shared_ptr<FailoverExchange> getFailoverExchange() const { return failoverExchange; }
 
@@ -111,6 +112,7 @@
     // a Lock to call the unlocked functions.
 
     void leave(Lock&);
+    std::vector<std::string> getIds(Lock&) const;
     std::vector<Url> getUrls(Lock&) const;
 
     // Make an offer if we can - called in deliver thread.
@@ -185,6 +187,7 @@
     framing::Uuid clusterId;
     NoOpConnectionOutputHandler shadowOut;
     ClusterMap::Set myElders;
+    qpid::management::ManagementAgent* mAgent;
 
     // Thread safe members
     Multicaster mcast;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp?rev=737935&r1=737934&r2=737935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.cpp Mon Jan 26 23:17:29 2009
@@ -107,6 +107,17 @@
     return newbies.empty() ? MemberId() : newbies.begin()->first;
 }
 
+std::vector<string> ClusterMap::memberIds() const {
+    std::vector<string> ids;
+    for (Map::const_iterator iter = members.begin();
+         iter != members.end(); iter++) {
+        std::stringstream stream;
+        stream << iter->first;
+        ids.push_back(stream.str());
+    }
+    return ids;
+}
+
 std::vector<Url> ClusterMap::memberUrls() const {
     std::vector<Url> urls(members.size());
     std::transform(members.begin(), members.end(), urls.begin(),

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h?rev=737935&r1=737934&r2=737935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterMap.h Mon Jan 26 23:17:29 2009
@@ -75,6 +75,7 @@
 
     size_t aliveCount() const { return alive.size(); }
     size_t memberCount() const { return members.size(); }
+    std::vector<std::string> memberIds() const;
     std::vector<Url> memberUrls() const;
     Set getAlive() const;
 

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h?rev=737935&r1=737934&r2=737935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/ConnectionMap.h Mon Jan 26 23:17:29 2009
@@ -75,6 +75,11 @@
         }
     }
 
+    void clear() {
+        ScopedLock l(lock);
+        map.clear();
+    }
+
     size_t size() const { return map.size(); }
   private:
     typedef std::map<ConnectionId, ConnectionPtr> Map;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/management-schema.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/management-schema.xml?rev=737935&r1=737934&r2=737935&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/management-schema.xml (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/management-schema.xml Mon Jan 26 23:17:29 2009
@@ -40,13 +40,17 @@
   <class name="Cluster">
     <property name="brokerRef"        type="objId"  references="Broker" access="RC" index="y" parentRef="y"/>
     <property name="clusterName"      type="sstr"   access="RC" desc="Name of cluster this server is a member of"/>
-    <property name="clusterID"        type="sstr"   access="RO" desc="Globally uniquie ID (UUID) for this cluster instance"/>
+    <property name="clusterID"        type="sstr"   access="RO" desc="Globally unique ID (UUID) for this cluster instance"/>
+    <property name="memberID"         type="sstr"   access="RO" desc="ID of this member of the cluster"/>
     <property name="publishedURL"     type="sstr"   access="RC" desc="URL this node advertizes itself as"/>
     <property name="clusterSize"      type="uint16" access="RO" desc="Number of brokers currently in the cluster"/>
     <property name="status"           type="sstr"   access="RO" desc="Cluster node status (STALLED,ACTIVE,JOINING)"/>
-    <property name="members"          type="lstr"    access="RO" desc="List of member URLs delimited by ';'"/> 
+    <property name="members"          type="lstr"   access="RO" desc="List of member URLs delimited by ';'"/> 
+    <property name="memberIDs"        type="lstr"   access="RO" desc="List of member IDs delimited by ';'"/> 
 
-    <method name="stopClusterNode"/>
+    <method name="stopClusterNode">
+      <arg name="brokerId" type="sstr" dir="I"/>
+    </method>
     <method name="stopFullCluster"/>
 
   </class>

Added: qpid/trunk/qpid/python/commands/qpid-cluster
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-cluster?rev=737935&view=auto
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-cluster (added)
+++ qpid/trunk/qpid/python/commands/qpid-cluster Mon Jan 26 23:17:29 2009
@@ -0,0 +1,180 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import locale
+from qmf.console import Session
+
+_host = "localhost"
+_stopId = None
+_stopAll = False
+_force = False
+
+def Usage ():
+    print "Usage:  qpid-cluster [OPTIONS] [broker-addr]"
+    print
+    print "             broker-addr is in the form:   [username/password@] hostname | ip-address [:<port>]"
+    print "             ex:  localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
+    print
+    print "Options:"
+    print "          -s [--stop] ID   Stop one member of the cluster by its ID"
+    print "          -k [--all-stop]  Shut down the whole cluster"
+    print "          -f [--force]     Suppress the 'are-you-sure?' prompt"
+    print
+    sys.exit (1)
+
+class BrokerManager:
+    def __init__(self):
+        self.brokerName = None
+        self.qmf        = None
+        self.broker     = None
+
+    def SetBroker(self, brokerUrl):
+        self.url = brokerUrl
+        self.qmf = Session()
+        self.broker = self.qmf.addBroker(brokerUrl)
+        agents = self.qmf.getAgents()
+        for a in agents:
+            if a.getAgentBank() == 0:
+                self.brokerAgent = a
+
+    def Disconnect(self):
+        if self.broker:
+            self.qmf.delBroker(self.broker)
+
+    def overview(self):
+        packages = self.qmf.getPackages()
+        if "org.apache.qpid.cluster" not in packages:
+            print "Clustering is not installed on the broker."
+            sys.exit(0)
+
+        clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+        if len(clusters) == 0:
+            print "Clustering is installed but not enabled on the broker."
+            sys.exit(0)
+
+        cluster = clusters[0]
+        myUrl = cluster.publishedURL
+        memberList = cluster.members.split(";")
+        idList = cluster.memberIDs.split(";")
+
+        print "  Cluster Name: %s" % cluster.clusterName
+        print "Cluster Status: %s" % cluster.status
+        print "  Cluster Size: %d" % cluster.clusterSize
+        print "       Members: ID=%s URL=%s" % (idList[0], memberList[0])
+        for idx in range(1,len(idList)):
+            print "              : ID=%s URL=%s" % (idList[idx], memberList[idx])
+
+    def stopMember(self, id):
+        clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+        if len(clusters) == 0:
+            print "Clustering is installed but not enabled on the broker."
+            sys.exit(0)
+
+        cluster = clusters[0]
+        idList = cluster.memberIDs.split(";")
+        if id not in idList:
+            print "No member with matching ID found"
+            sys.exit(1)
+
+        if not _force:
+            prompt = "Warning: "
+            if len(idList) == 1:
+                prompt += "This command will shut down the last running cluster member."
+            else:
+                prompt += "This command will shut down a cluster member."
+            prompt += " Are you sure? [N]: "
+
+            confirm = raw_input(prompt)
+            if len(confirm) == 0 or confirm[0].upper() != 'Y':
+                print "Operation canceled"
+                sys.exit(1)
+
+        cluster.stopClusterNode(id)
+
+    def stopAll(self):
+        clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+        if len(clusters) == 0:
+            print "Clustering is installed but not enabled on the broker."
+            sys.exit(0)
+
+        if not _force:
+            prompt = "Warning: This command will shut down the entire cluster."
+            prompt += " Are you sure? [N]: "
+
+            confirm = raw_input(prompt)
+            if len(confirm) == 0 or confirm[0].upper() != 'Y':
+                print "Operation canceled"
+                sys.exit(1)
+
+        cluster = clusters[0]
+        cluster.stopFullCluster()
+
+##
+## Main Program
+##
+
+try:
+    longOpts = ("stop=", "all-stop", "force")
+    (optlist, encArgs) = getopt.gnu_getopt (sys.argv[1:], "s:kf", longOpts)
+except:
+    Usage ()
+
+try:
+    encoding = locale.getpreferredencoding()
+    cargs = [a.decode(encoding) for a in encArgs]
+except:
+    cargs = encArgs
+
+for opt in optlist:
+    if opt[0] == "-s" or opt[0] == "--stop":
+        _stopId = opt[1]
+    if opt[0] == "-k" or opt[0] == "--all-stop":
+        _stopAll = True
+    if opt[0] == "-f" or opt[0] == "--force":
+        _force = True
+
+nargs = len(cargs)
+bm    = BrokerManager()
+
+if nargs == 1:
+    _host = cargs[0]
+
+try:
+    bm.SetBroker(_host)
+    if _stopId:
+        bm.stopMember(_stopId)
+    elif _stopAll:
+        bm.stopAll()
+    else:
+        bm.overview()
+except KeyboardInterrupt:
+    print
+except Exception,e:
+    if e.__repr__().find("connection aborted") > 0:
+        # we expect this when asking the connected broker to shut down
+        sys.exit(0)
+    print "Failed:", e.args
+    sys.exit(1)
+
+bm.Disconnect()

Propchange: qpid/trunk/qpid/python/commands/qpid-cluster
------------------------------------------------------------------------------
    svn:executable = *



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org