You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/06/22 15:29:54 UTC

svn commit: r956882 - in /qpid/trunk/qpid/cpp: src/qpid/broker/ src/qpid/cluster/ src/qpid/management/ src/qpid/sys/ src/tests/ xml/

Author: aconway
Date: Tue Jun 22 13:29:52 2010
New Revision: 956882

URL: http://svn.apache.org/viewvc?rev=956882&view=rev
Log:
Fix cluster broker crashes when management is active.

Cluser brokers were exiting with errors "modified cluster state
outside cluster context" and "confirmed < (50+0) but only sent < (49+0)"

Fix was to:
 - delay completion of incoming update till update connection closes.
 - delay addding new connections to managment until connection is announced.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests
    qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Jun 22 13:29:52 2010
@@ -76,8 +76,14 @@ struct ConnectionTimeoutTask : public sy
     }
 };
 
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_,
-                       const qpid::sys::SecuritySettings& external, bool isLink_, uint64_t objectId, bool shadow_) :
+Connection::Connection(ConnectionOutputHandler* out_,
+                       Broker& broker_, const
+                       std::string& mgmtId_,
+                       const qpid::sys::SecuritySettings& external,
+                       bool isLink_,
+                       uint64_t objectId_,
+                       bool shadow_,
+                       bool delayManagement) :
     ConnectionState(out_, broker_),
     securitySettings(external),
     adapter(*this, isLink_, shadow_),
@@ -89,26 +95,30 @@ Connection::Connection(ConnectionOutputH
     agent(0),
     timer(broker_.getTimer()),
     errorListener(0),
+    objectId(objectId_),
     shadow(shadow_)
 {
-    Manageable* parent = broker.GetVhostObject();
-
     if (isLink)
         links.notifyConnection(mgmtId, this);
+    // In a cluster, allow adding the management object to be delayed.
+    if (!delayManagement) addManagementObject();
+    if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
+}
 
-    if (parent != 0)
-    {
-        agent = broker_.getManagementAgent();
-
-        // TODO set last bool true if system connection
+void Connection::addManagementObject() {
+    assert(agent == 0);
+    assert(mgmtObject == 0);
+    Manageable* parent = broker.GetVhostObject();
+    if (parent != 0) {
+        agent = broker.getManagementAgent();
         if (agent != 0) {
+            // TODO set last bool true if system connection
             mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
             mgmtObject->set_shadow(shadow);
             agent->addObject(mgmtObject, objectId);
         }
         ConnectionState::setUrl(mgmtId);
     }
-    if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
 }
 
 void Connection::requestIOProcessing(boost::function0<void> callback)

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jun 22 13:29:52 2010
@@ -79,9 +79,15 @@ class Connection : public sys::Connectio
         virtual void connectionError(const std::string&) = 0;
     };
 
-    Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId,
+    Connection(sys::ConnectionOutputHandler* out,
+               Broker& broker,
+               const std::string& mgmtId,
                const qpid::sys::SecuritySettings&,
-               bool isLink = false, uint64_t objectId = 0, bool shadow=false);
+               bool isLink = false,
+               uint64_t objectId = 0,
+               bool shadow=false,
+               bool delayManagement = false);
+
     ~Connection ();
 
     /** Get the SessionHandler for channel. Create if it does not already exist */
@@ -139,6 +145,9 @@ class Connection : public sys::Connectio
     // Used by cluster to update connection status
     sys::AggregateOutput& getOutputTasks() { return outputTasks; }
 
+    /** Cluster delays adding management object in the constructor then calls this. */
+    void addManagementObject();
+
     const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
     { 
         return securitySettings;
@@ -166,6 +175,7 @@ class Connection : public sys::Connectio
     boost::intrusive_ptr<sys::TimerTask> heartbeatTimer;
     boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
     ErrorListener* errorListener;
+    uint64_t objectId;
     bool shadow;
 
   public:

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jun 22 13:29:52 2010
@@ -194,7 +194,7 @@ namespace _qmf = ::qmf::org::apache::qpi
  * Currently use SVN revision to avoid clashes with versions from
  * different branches.
  */
-const uint32_t Cluster::CLUSTER_VERSION = 904565;
+const uint32_t Cluster::CLUSTER_VERSION = 956001;
 
 struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
     qpid::cluster::Cluster& cluster;
@@ -269,6 +269,7 @@ Cluster::Cluster(const ClusterSettings& 
     lastAliveCount(0),
     lastBroker(false),
     updateRetracted(false),
+    updateClosed(false),
     error(*this)
 {
     // We give ownership of the timer to the broker and keep a plain pointer.
@@ -863,6 +864,14 @@ void Cluster::updateStart(const MemberId
                          connectionSettings(settings)));
 }
 
+// Called in network thread
+void Cluster::updateInClosed() {
+    Lock l(lock);
+    assert(!updateClosed);
+    updateClosed = true;
+    checkUpdateIn(l);
+}
+
 // Called in update thread.
 void Cluster::updateInDone(const ClusterMap& m) {
     Lock l(lock);
@@ -879,6 +888,7 @@ void Cluster::updateInRetracted() {
 
 void Cluster::checkUpdateIn(Lock& l) {
     if (state != UPDATEE) return; // Wait till we reach the stall point.
+    if (!updateClosed) return;  // Wait till update connection closes.
     if (updatedMap) { // We're up to date
         map = *updatedMap;
         failoverExchange->setUrls(getUrls(l));
@@ -895,6 +905,7 @@ void Cluster::checkUpdateIn(Lock& l) {
     }
     else if (updateRetracted) { // Update was retracted, request another update
         updateRetracted = false;
+        updateClosed = false;
         state = JOINER;
         QPID_LOG(notice, *this << " update retracted, sending new update request.");
         mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Jun 22 13:29:52 2010
@@ -97,6 +97,7 @@ class Cluster : private Cpg::Handler, pu
     void leave();
 
     // Update completed - called in update thread
+    void updateInClosed();
     void updateInDone(const ClusterMap&);
     void updateInRetracted();
 
@@ -277,7 +278,7 @@ class Cluster : private Cpg::Handler, pu
     bool lastBroker;
     sys::Thread updateThread;
     boost::optional<ClusterMap> updatedMap;
-    bool updateRetracted;
+    bool updateRetracted, updateClosed;
     ErrorCheck error;
     UpdateReceiver updateReceiver;
     ClusterTimer* timer;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Jun 22 13:29:52 2010
@@ -22,7 +22,6 @@
 #include "UpdateClient.h"
 #include "Cluster.h"
 #include "UpdateReceiver.h"
-
 #include "qpid/assert.h"
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/SemanticState.h"
@@ -43,7 +42,6 @@
 #include "qpid/framing/ConnectionCloseOkBody.h"
 #include "qpid/log/Statement.h"
 #include "qpid/management/ManagementAgent.h"
-
 #include <boost/current_function.hpp>
 
 
@@ -99,10 +97,9 @@ Connection::Connection(Cluster& c, sys::
 {
     cluster.addLocalConnection(this);
     if (isLocalClient()) {
-        // Local clients are announced to the cluster
-        // and initialized when the announce is received.
         giveReadCredit(cluster.getSettings().readMax); // Flow control
-        init();
+        // Delay adding the connection to the management map until announce()
+        connectionCtor.delayManagement = true;
     }
     else {
         // Catch-up shadow connections initialized using nextShadow id.
@@ -110,9 +107,9 @@ Connection::Connection(Cluster& c, sys::
         if (!updateIn.nextShadowMgmtId.empty())
             connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
         updateIn.nextShadowMgmtId.clear();
-        init();
-    }
-    QPID_LOG(info, "incoming connection " << *this);
+     }
+    init();
+    QPID_LOG(debug, cluster << " local connection " << *this);
 }
 
 void Connection::setSecureConnection(broker::SecureConnection* sc) {
@@ -152,8 +149,11 @@ void Connection::announce(
     QPID_ASSERT(ssf == connectionCtor.external.ssf);
     QPID_ASSERT(authid == connectionCtor.external.authid);
     QPID_ASSERT(nodict == connectionCtor.external.nodict);
-    // Local connections are already initialized.
-    if (isShadow()) {
+    // Local connections are already initialized but with management delayed.
+    if (isLocalClient()) {
+        connection->addManagementObject();
+    }
+    else if (isShadow()) {
         init();
         // Play initial frames into the connection.
         Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size());
@@ -162,8 +162,9 @@ void Connection::announce(
             connection->received(frame);
          connection->setUserId(username);
     }
-    // Raise the connection management event now that the connection is replicated.
+    // Do managment actions now that the connection is replicated.
     connection->raiseConnectEvent();
+    QPID_LOG(debug, cluster << " replicated connection " << *this);
 }
 
 Connection::~Connection() {
@@ -249,6 +250,7 @@ void Connection::closed() {
         if (isUpdated()) {
             QPID_LOG(debug, cluster << " update connection closed " << *this);
             close();
+            cluster.updateInClosed();
         }
         else if (catchUp) {
             QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this);
@@ -259,7 +261,8 @@ void Connection::closed() {
             // closed and process any outstanding frames from the cluster
             // until self-delivery of deliver-close.
             output.closeOutput();
-            cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
+            cluster.getMulticast().mcastControl(
+                ClusterConnectionDeliverCloseBody(ProtocolVersion(), false), self);
         }
     }
     catch (const std::exception& e) {
@@ -268,17 +271,21 @@ void Connection::closed() {
 }
 
 // Self-delivery of close message, close the connection.
-void Connection::deliverClose () {
-    assert(!catchUp);
-    close();
+void Connection::deliverClose (bool aborted) {
+    QPID_LOG(debug, cluster << " replicated close of " << *this);
+    if (connection.get()) {
+        if (aborted) connection->abort();
+        else connection->closed();
+        connection.reset();
+    }
     cluster.erase(self);
 }
 
 // Close the connection 
 void Connection::close() {
+    QPID_LOG(debug, cluster << " local close of " << *this);
     if (connection.get()) {
         connection->closed();
-        // Ensure we delete the broker::Connection in the deliver thread.
         connection.reset();
     }
 }
@@ -286,11 +293,9 @@ void Connection::close() {
 // The connection has been killed for misbehaving, called in connection thread.
 void Connection::abort() {
     if (connection.get()) {
-        connection->abort();
-        // Ensure we delete the broker::Connection in the deliver thread.
-        connection.reset();
+        cluster.getMulticast().mcastControl(
+            ClusterConnectionDeliverCloseBody(ProtocolVersion(), true), self);
     }
-    cluster.erase(self);
 }
 
 // ConnectionCodec::decode receives read buffers from  directly-connected clients.

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Jun 22 13:29:52 2010
@@ -170,7 +170,7 @@ class Connection :
                   const std::string& initFrames);
     void close();
     void abort();
-    void deliverClose();
+    void deliverClose(bool);
 
     OutputInterceptor& getOutput() { return output; }
 
@@ -194,6 +194,7 @@ class Connection :
         bool isLink;
         uint64_t objectId;
         bool shadow;
+        bool delayManagement;
 
         ConnectionCtor(
             sys::ConnectionOutputHandler* out_,
@@ -202,14 +203,19 @@ class Connection :
             const qpid::sys::SecuritySettings& external_,
             bool isLink_=false,
             uint64_t objectId_=0,
-            bool shadow_=false
+            bool shadow_=false,
+            bool delayManagement_=false
         ) : out(out_), broker(broker_), mgmtId(mgmtId_), external(external_),
-            isLink(isLink_), objectId(objectId_), shadow(shadow_)
+            isLink(isLink_), objectId(objectId_), shadow(shadow_),
+            delayManagement(delayManagement_)
         {}
 
         std::auto_ptr<broker::Connection> construct() {
             return std::auto_ptr<broker::Connection>(
-                new broker::Connection(out, broker, mgmtId, external, isLink, objectId, shadow));
+                new broker::Connection(
+                    out, broker, mgmtId, external, isLink, objectId,
+                    shadow, delayManagement)
+            );
         }
     };
 

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Tue Jun 22 13:29:52 2010
@@ -2321,6 +2321,23 @@ void ManagementAgent::importAgents(qpid:
     }
 }
 
+namespace {
+bool isNotDeleted(const ManagementObjectMap::value_type& value) {
+    return !value.second->isDeleted();
+}
+
+size_t countNotDeleted(const ManagementObjectMap& map) {
+    return std::count_if(map.begin(), map.end(), isNotDeleted);
+}
+
+void dumpMap(std::ostream& o, const ManagementObjectMap& map) {
+    for (ManagementObjectMap::const_iterator i = map.begin(); i != map.end(); ++i) {
+        if (!i->second->isDeleted())
+            o << endl << "   " << i->second->getObjectId().getV2Key();
+    }
+}
+} // namespace
+
 string ManagementAgent::debugSnapshot() {
     ostringstream msg;
     msg << " management snapshot:";
@@ -2328,8 +2345,8 @@ string ManagementAgent::debugSnapshot() 
          i != remoteAgents.end(); ++i)
         msg << " " << i->second->routingKey;
     msg << " packages: " << packages.size();
-    msg << " objects: " << managementObjects.size();
-    msg << " new objects: " << newManagementObjects.size();
+    msg << " objects: " << countNotDeleted(managementObjects);
+    msg << " new objects: " << countNotDeleted(newManagementObjects);
     return msg.str();
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp Tue Jun 22 13:29:52 2010
@@ -43,8 +43,15 @@ void assertClusterSafe()  {
     }
 }
 
-ClusterSafeScope::ClusterSafeScope() { inContext = true; }
-ClusterSafeScope::~ClusterSafeScope() { inContext = false; }
+ClusterSafeScope::ClusterSafeScope() {
+    assert(!inContext);
+    inContext = true;
+}
+
+ClusterSafeScope::~ClusterSafeScope() {
+    assert(inContext);
+    inContext = false;
+}
 
 void enableClusterSafe() { inCluster = true; }
 

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue Jun 22 13:29:52 2010
@@ -255,7 +255,7 @@ class LongTests(BrokerTest):
                 StoppableThread.stop(self)
 
         # def test_management
-        args=["--mgmt-pub-interval", 1] # Publish management information every second.
+        args = ["--mgmt-pub-interval", 1] # Publish management information every second.
         # Use store if present.
         if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
         cluster = self.cluster(3, args)

Modified: qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests Tue Jun 22 13:29:52 2010
@@ -20,5 +20,5 @@
 #
 
 srcdir=`dirname $0`
-$srcdir/run_cluster_tests 'cluster_tests.LongTests.*' -DDURATION=2
+$srcdir/run_cluster_tests 'cluster_tests.LongTests.*' -DDURATION=4
 

Modified: qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects (original)
+++ qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects Tue Jun 22 13:29:52 2010
@@ -1,6 +1,5 @@
 #!/usr/bin/env python
 
-#
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements.  See the NOTICE file
 # distributed with this work for additional information
@@ -19,390 +18,83 @@
 # under the License.
 #
 
-import os
-import getopt
-import sys
-import locale
-import socket
-import re
-from qmf.console import Session, SchemaClass
-
-_host = "localhost"
-_connTimeout = 10
-_verbose = 0
-_del_test = False;
-pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
-_debug_recursion = 0
-
-def Usage ():
-    print "Usage:  verify_cluster_objects [OPTIONS] [broker-addr]"
-    print
-    print "             broker-addr is in the form:   [username/password@] hostname | ip-address [:<port>]"
-    print "             ex:  localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
-    print
-    print "    This program contacts every node of a cluster, loads all manageable objects from"
-    print "    those nodes and verifies that the management data is identical across the clusters."
-    print
-    print "Options:"
-    print "    --timeout seconds (10)  Maximum time to wait for broker connection"
-    print "    --verbose level (0)     Show details of objects and their IDs"
-    print "    --delete                Delete some objects after creation, to test synchup"
-    print
-    sys.exit (1)
-
-class IpAddr:
-    def __init__(self, text):
-        if text.find("@") != -1:
-            tokens = text.split("@")
-            text = tokens[1]
-        if text.find(":") != -1:
-            tokens = text.split(":")
-            text = tokens[0]
-            self.port = int(tokens[1])
-        else:
-            self.port = 5672
-        self.dottedQuad = socket.gethostbyname(text)
-        nums = self.dottedQuad.split(".")
-        self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
-
-    def bestAddr(self, addrPortList):
-        bestDiff = 0xFFFFFFFFL
-        bestAddr = None
-        for addrPort in addrPortList:
-            diff = IpAddr(addrPort[0]).addr ^ self.addr
-            if diff < bestDiff:
-                bestDiff = diff
-                bestAddr = addrPort
-        return bestAddr
-
-class ObjectId:
-    """Object identity, use for dictionaries by object id"""
-    def __init__(self, object): self.object = object
-    def __eq__(self, other): return self.object is other.object
-    def __hash__(self): return hash(id(self.object))
-
-class Broker(object):
-    def __init__(self, qmf, broker):
-        self.broker = broker
-        self.qmf = qmf
+# Verify managment objects are consistent in a cluster.
+# Arguments: url of one broker in the cluster.
 
-        agents = qmf.getAgents()
-        for a in agents:
-            if a.getAgentBank() == '0':
-                self.brokerAgent = a
-
-        bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker",
-                              _agent=self.brokerAgent)[0]
-        self.currentTime = bobj.getTimestamps()[0]
-        try:
-            self.uptime = bobj.uptime
-        except:
-            self.uptime = 0
-        self.tablesByName = {}
-        self.package = "org.apache.qpid.broker"
-        self.id_cache = {}              # Cache for getAbstractId
-
-    def getUrl(self):
-        return self.broker.getUrl()
-
-    def getData(self):
-        if _verbose > 1:
-            print "Broker:", self.broker
-
-        classList = self.qmf.getClasses(self.package)
-        for cls in classList:
-            if self.qmf.getSchema(cls).kind == SchemaClass.CLASS_KIND_TABLE:
-                self.loadTable(cls)
-
-
-    #
-    # this should be a method on an object, but is kept here for now, until
-    # we finish sorting out the treatment of names in qmfv2
-    #
-    def getAbstractId(self, object):
-      """ return a string the of the hierarchical name """
-      if (ObjectId(object) in self.id_cache): return self.id_cache[ObjectId(object)]
-      global _debug_recursion
-      result = u""
-      valstr = u""
-      _debug_recursion += 1
-      debug_prefix = _debug_recursion
-      if (_verbose > 9):
-          print debug_prefix, "  enter gai: props ", object._properties
-      for property, value in object._properties:
-
-          # we want to recurse on things which are refs.  we tell by
-          # asking each property if it's an index.  I think...
-          if (_verbose > 9):
-              print debug_prefix, "  prop ", property, " val " , value, " idx ", 
-              property.index, " type ", property.type
-
-          # property is an instance, you can ask its type, name, etc.
-
-          # special case system refs, as they will never be the same on
-          # distinct cluster nodes.  later we probably want a different
-          # way of representing these objects, like for instance don't
-          # include the system ref in the hierarchy.
-
-          if property.name == "systemRef":
-              _debug_recursion -= 1
-              self.id_cache[ObjectId(object)] = ""
-              return ""
-
-          if property.index:
-              if result != u"":
-                  result += u":"
-              if property.type == 10:
-                  try:
-                      recursive_objects = object._session.getObjects(_objectId = value, _broker=object._broker)
-                      if (_verbose > 9):
-                          print debug_prefix, "   r ", recursive_objects[0]
-                          for rp, rv in recursive_objects[0]._properties:
-                              print debug_prefix, "   rrr ", rp, " idx-p ", rp.index, " v ", rv
-                          print debug_prefix, "    recursing on ", recursive_objects[0]
-                      valstr = self.getAbstractId(recursive_objects[0])
-                      if (_verbose > 9):
-                          print debug_prefix,  "    recursing on ", recursive_objects[0],
-                          " -> ", valstr
-                  except Exception, e:
-                      if (_verbose > 9):
-                          print debug_prefix, "          except ", e
-                      valstr = u"<undecodable>"
-              else:
-                  # this yields UUID-blah.  not good.  try something else
-                  # valstr = value.__repr__()
-                  # print debug_prefix, " val ", value
-          
-                  # yetch.  this needs to be abstracted someplace?  I don't
-                  # think we have the infrastructure we need to make these id
-                  # strings be sensible in the general case
-                  if property.name == "systemId":
-                      # special case.  try to do something sensible about systemref objects
-                      valstr = object.nodeName
-                  else:
-                      valstr = value.__repr__() # I think...
-          result += valstr
-          if (_verbose > 9):
-              print debug_prefix, "    id ", self, " -> ", result
-      _debug_recursion -= 1
-      self.id_cache[ObjectId(object)] = result
-      return result
-
-    def loadTable(self, cls):
-        if _verbose > 1:
-            print "  Class:", cls.getClassName()
-        list = self.qmf.getObjects(_class=cls.getClassName(),
-                                   _package=cls.getPackageName(),
-                                   _agent=self.brokerAgent)
-
-        # tables-by-name maps class name to a table by object-name of
-        # objects.  ie use the class name ("broker", "queue", etc) to
-        # index tables-by-name, returning a second table, use the
-        # object name to index that to get an object.
-
-        self.tablesByName[cls.getClassName()] = {}
-        for obj in list:
-            # make sure we aren't colliding on name.  it's an internal
-            # error (ie, the name-generation code is busted) if we do
-            key = self.getAbstractId(obj)
-            if key in self.tablesByName[cls.getClassName()]:
-                raise Exception("internal error: collision for %s on key %s\n"
-                                % (obj, key))
-                
-            self.tablesByName[cls.getClassName()][key] = obj
-            if _verbose > 1:
-                print "   ", obj.getObjectId(), " ", obj.getIndex(), " ", key
+import qmf.console, sys, re
 
+class Session(qmf.console.Session):
+    """A qmf.console.Session that caches useful values"""
 
-class BrokerManager:
     def __init__(self):
-        self.brokerName = None
-        self.qmf        = None
-        self.broker     = None
-        self.brokers    = []
-        self.cluster    = None
-
-    def SetBroker(self, brokerUrl):
-        self.url = brokerUrl
-        self.qmf = Session()
-        self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
-        agents = self.qmf.getAgents()
-        for a in agents:
-            if a.getAgentBank() == '0':
-                self.brokerAgent = a
-
-    def Disconnect(self):
-        if self.broker:
-            self.qmf.delBroker(self.broker)
-
-    def _getCluster(self):
-        packages = self.qmf.getPackages()
-        if "org.apache.qpid.cluster" not in packages:
-            return None
-
-        clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
-        if len(clusters) == 0:
-            print "Clustering is installed but not enabled on the broker."
-            return None
-
-        self.cluster = clusters[0]
-
-    def _getHostList(self, urlList):
-        hosts = []
-        hostAddr = IpAddr(_host)
-        for url in urlList:
-            if url.find("amqp:") != 0:
-                raise Exception("Invalid URL 1")
-            url = url[5:]
-            addrs = str(url).split(",")
-            addrList = []
-            for addr in addrs:
-                tokens = addr.split(":")
-                if len(tokens) != 3:
-                    raise Exception("Invalid URL 2")
-                addrList.append((tokens[1], tokens[2]))
-
-            # Find the address in the list that is most likely to be
-            # in the same subnet as the address with which we made the
-            # original QMF connection.  This increases the probability
-            # that we will be able to reach the cluster member.
-
-            best = hostAddr.bestAddr(addrList)
-            bestUrl = best[0] + ":" + best[1]
-            hosts.append(bestUrl)
-        return hosts
-
-
-    # the main fun which tests for broker state "identity".  now that
-    # we're using qmf2 style object names across the board, that test
-    # means that we are ensuring that for all objects of a given
-    # class, an object of that class with the same object name exists
-    # on the peer broker.
-
-    def verify(self):
-        if _verbose > 0:
-            print "Connecting to the cluster..."
-        self._getCluster()
-        if self.cluster:
-            memberList = self.cluster.members.split(";")
-            hostList = self._getHostList(memberList)
-            self.qmf.delBroker(self.broker)
-            self.broker = None
-            for host in hostList:
-                b = self.qmf.addBroker(host, _connTimeout)
-                self.brokers.append(Broker(self.qmf, b))
-                if _verbose > 0:
-                    print "   ", b
-        else:
-            raise Exception("Failed - Not a cluster")
-
-        failures = []
-
-        # Wait until connections to all nodes are established before
-        # loading the management data.  This will ensure that the
-        # objects are all stable and the same.
-        if _verbose > 0:
-            print "Loading management data from nodes..."
-        for broker in self.brokers:
-            broker.getData()
-
-        # If we're testing delete-some-objects functionality, create a
-        # few widgets here and then delete them.
-        if _del_test:
-            if _verbose > 0:
-                print "Running delete test"
-            # just stick 'em in the first broker
-            b = self.brokers[0]
-            session = b.qmf.brokers[0].getAmqpSession()
-            session.queue_declare(queue="foo", exclusive=True, auto_delete=True)
-            session.exchange_bind(exchange="amq.direct",
-                                                 queue="foo", binding_key="foo")
-            session.queue_declare(queue="bar", exclusive=True, auto_delete=True)
-            session.exchange_bind(exchange="amq.direct",
-                                                 queue="bar", binding_key="bar")
-            # now delete 'em
-            session.exchange_unbind(queue="foo", exchange="amq.direct", binding_key="foo")
-            session.exchange_unbind(queue="bar", exchange="amq.direct", binding_key="bar")
-            session.queue_delete("bar")
-            session.queue_delete("foo")
-
-        # Verify that each node has the same set of objects (based on
-        # object name).
-        if _verbose > 0:
-            print "Verifying objects based on object name..."
-        base = self.brokers[0]
-        for broker in self.brokers[1:]:
-
-            # walk over the class names, for each class (with some
-            # exceptions) walk over the objects of that class, making
-            # sure they match between broker A and broker B
-
-            for className in base.tablesByName:
-                if className in ["broker", "system", "connection"]:
-                    continue
-
-                tab1 = base.tablesByName[className]
-                tab2 = broker.tablesByName[className]
-
-                for key in tab1:
-                    if key not in tab2:
-                        failures.append("%s key %s not found on node %s" %
-                                        (className, key, broker.getUrl()))
-                for key in tab2:
-                    if key not in tab1:
-                        failures.append("%s key %s not found on node %s" %
-                                        (className, key, base.getUrl()))
-
-        if len(failures) > 0:
-            print "Failures:"
-            for failure in failures:
-                print "  %s" % failure
-            raise Exception("Failures")
-
-        if _verbose > 0:
-            print "Success"
-
-##
-## Main Program
-##
-
-try:
-    longOpts = ("verbose=", "timeout=", "delete")
-    (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "", longOpts)
-except:
-    Usage()
-
-try:
-    encoding = locale.getpreferredencoding()
-    cargs = [a.decode(encoding) for a in encArgs]
-except:
-    cargs = encArgs
-
-for opt in optlist:
-    if opt[0] == "--timeout":
-        _connTimeout = int(opt[1])
-        if _connTimeout == 0:
-            _connTimeout = None
-    elif opt[0] == "--verbose":
-        _verbose = int(opt[1])
-    elif opt[0] == "--delete":
-        _del_test = True;
-    else:
-        Usage()
-
-nargs = len(cargs)
-bm    = BrokerManager()
-
-if nargs == 1:
-    _host = cargs[0]
-
-try:
-    bm.SetBroker(_host)
-    bm.verify()
-except KeyboardInterrupt:
-    print
-except Exception,e:
-    print "Failed: %s - %s" % (e.__class__.__name__, e)
-    sys.exit(1)
+        qmf.console.Session.__init__(self)
+        self.classes = None
+
+    def all_classes(self):
+        if self.classes is None:
+            self.classes = [c for p in self.getPackages() for c in self.getClasses(p)]
+        return self.classes
+
+class Broker:
+    def __init__(self, url, qmf):
+        self.url = url
+        self.qmf = qmf
+        self.broker = self.qmf.addBroker(url)
+        self.broker._waitForStable()
+        self.objects = None
+        self.ignore_list = [ re.compile("org.apache.qpid.broker:system:") ]
+
+    def get_objects(self):
+        def ignore(name):
+            for m in (m for m in self.ignore_list if m.match(name)):
+                return True
+        if self.objects is None:
+            obj_list = []
+            for c in self.qmf.all_classes():
+                for o in self.qmf.getObjects(_key=c, _broker=self.broker):
+                    name=o.getObjectId().getObject()
+                    if not ignore(name): obj_list.append(name)
+            self.objects = set(obj_list)
+            if (len(obj_list) != len(self.objects)):
+                raise Exception("Duplicates in object list for %s"%(self.url))
+        return self.objects
+
+    def compare(self,other):
+        def compare1(x,y):
+            diff = x.get_objects() - y.get_objects()
+            if diff:
+                print "ERROR: found on %s but not %s"%(x, y)
+                for o in diff: print "    %s"%(o)
+                return False
+            return True
+
+        so = compare1(self, other)
+        os = compare1(other, self)
+        return so and os
+
+    def __str__(self): return self.url
+
+    def get_cluster(self):
+        """Given one Broker, return list of all brokers in its cluster"""
+        clusters = self.qmf.getObjects(_class="cluster")
+        if not clusters: raise ("%s is not a cluster member"%(self.url))
+        def first_address(url):
+            """Python doesn't understand the brokers URL syntax. Extract a simple addres"""
+            return re.compile("amqp:tcp:([^,]*)").match(url).group(1)
+        return [Broker(first_address(url), self.qmf) for url in clusters[0].members.split(";")]
+
+    def __del__(self): self.qmf.delBroker(self.broker)
+
+def main(argv=None):
+    if argv is None: argv = sys.argv
+    qmf = Session()
+    brokers = Broker(argv[1], qmf).get_cluster()
+    base = brokers.pop(0)
+    result = 0
+    for b in brokers:
+        if not base.compare(b): result = 1
+    del base
+    del brokers
+    return result
 
-bm.Disconnect()
+if __name__ == "__main__": sys.exit(main())

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Jun 22 13:29:52 2010
@@ -134,7 +134,9 @@
     </control>
 
     <!-- Marks the cluster-wide point when a connection is considered closed. -->
-    <control name="deliver-close" code="0x2"/>
+    <control name="deliver-close" code="0x2">
+      <field name="aborted" type="bit"/>
+    </control>
 
     <!-- Permission to generate output up to the limit. -->
     <control name="deliver-do-output" code="0x3">



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org