You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2010/06/22 15:29:54 UTC
svn commit: r956882 - in /qpid/trunk/qpid/cpp: src/qpid/broker/
src/qpid/cluster/ src/qpid/management/ src/qpid/sys/ src/tests/ xml/
Author: aconway
Date: Tue Jun 22 13:29:52 2010
New Revision: 956882
URL: http://svn.apache.org/viewvc?rev=956882&view=rev
Log:
Fix cluster broker crashes when management is active.
Cluser brokers were exiting with errors "modified cluster state
outside cluster context" and "confirmed < (50+0) but only sent < (49+0)"
Fix was to:
- delay completion of incoming update till update connection closes.
- delay addding new connections to managment until connection is announced.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests
qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects
qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Tue Jun 22 13:29:52 2010
@@ -76,8 +76,14 @@ struct ConnectionTimeoutTask : public sy
}
};
-Connection::Connection(ConnectionOutputHandler* out_, Broker& broker_, const std::string& mgmtId_,
- const qpid::sys::SecuritySettings& external, bool isLink_, uint64_t objectId, bool shadow_) :
+Connection::Connection(ConnectionOutputHandler* out_,
+ Broker& broker_, const
+ std::string& mgmtId_,
+ const qpid::sys::SecuritySettings& external,
+ bool isLink_,
+ uint64_t objectId_,
+ bool shadow_,
+ bool delayManagement) :
ConnectionState(out_, broker_),
securitySettings(external),
adapter(*this, isLink_, shadow_),
@@ -89,26 +95,30 @@ Connection::Connection(ConnectionOutputH
agent(0),
timer(broker_.getTimer()),
errorListener(0),
+ objectId(objectId_),
shadow(shadow_)
{
- Manageable* parent = broker.GetVhostObject();
-
if (isLink)
links.notifyConnection(mgmtId, this);
+ // In a cluster, allow adding the management object to be delayed.
+ if (!delayManagement) addManagementObject();
+ if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
+}
- if (parent != 0)
- {
- agent = broker_.getManagementAgent();
-
- // TODO set last bool true if system connection
+void Connection::addManagementObject() {
+ assert(agent == 0);
+ assert(mgmtObject == 0);
+ Manageable* parent = broker.GetVhostObject();
+ if (parent != 0) {
+ agent = broker.getManagementAgent();
if (agent != 0) {
+ // TODO set last bool true if system connection
mgmtObject = new _qmf::Connection(agent, this, parent, mgmtId, !isLink, false);
mgmtObject->set_shadow(shadow);
agent->addObject(mgmtObject, objectId);
}
ConnectionState::setUrl(mgmtId);
}
- if (!isShadow()) broker.getConnectionCounter().inc_connectionCount();
}
void Connection::requestIOProcessing(boost::function0<void> callback)
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Connection.h Tue Jun 22 13:29:52 2010
@@ -79,9 +79,15 @@ class Connection : public sys::Connectio
virtual void connectionError(const std::string&) = 0;
};
- Connection(sys::ConnectionOutputHandler* out, Broker& broker, const std::string& mgmtId,
+ Connection(sys::ConnectionOutputHandler* out,
+ Broker& broker,
+ const std::string& mgmtId,
const qpid::sys::SecuritySettings&,
- bool isLink = false, uint64_t objectId = 0, bool shadow=false);
+ bool isLink = false,
+ uint64_t objectId = 0,
+ bool shadow=false,
+ bool delayManagement = false);
+
~Connection ();
/** Get the SessionHandler for channel. Create if it does not already exist */
@@ -139,6 +145,9 @@ class Connection : public sys::Connectio
// Used by cluster to update connection status
sys::AggregateOutput& getOutputTasks() { return outputTasks; }
+ /** Cluster delays adding management object in the constructor then calls this. */
+ void addManagementObject();
+
const qpid::sys::SecuritySettings& getExternalSecuritySettings() const
{
return securitySettings;
@@ -166,6 +175,7 @@ class Connection : public sys::Connectio
boost::intrusive_ptr<sys::TimerTask> heartbeatTimer;
boost::intrusive_ptr<ConnectionTimeoutTask> timeoutTimer;
ErrorListener* errorListener;
+ uint64_t objectId;
bool shadow;
public:
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Tue Jun 22 13:29:52 2010
@@ -194,7 +194,7 @@ namespace _qmf = ::qmf::org::apache::qpi
* Currently use SVN revision to avoid clashes with versions from
* different branches.
*/
-const uint32_t Cluster::CLUSTER_VERSION = 904565;
+const uint32_t Cluster::CLUSTER_VERSION = 956001;
struct ClusterDispatcher : public framing::AMQP_AllOperations::ClusterHandler {
qpid::cluster::Cluster& cluster;
@@ -269,6 +269,7 @@ Cluster::Cluster(const ClusterSettings&
lastAliveCount(0),
lastBroker(false),
updateRetracted(false),
+ updateClosed(false),
error(*this)
{
// We give ownership of the timer to the broker and keep a plain pointer.
@@ -863,6 +864,14 @@ void Cluster::updateStart(const MemberId
connectionSettings(settings)));
}
+// Called in network thread
+void Cluster::updateInClosed() {
+ Lock l(lock);
+ assert(!updateClosed);
+ updateClosed = true;
+ checkUpdateIn(l);
+}
+
// Called in update thread.
void Cluster::updateInDone(const ClusterMap& m) {
Lock l(lock);
@@ -879,6 +888,7 @@ void Cluster::updateInRetracted() {
void Cluster::checkUpdateIn(Lock& l) {
if (state != UPDATEE) return; // Wait till we reach the stall point.
+ if (!updateClosed) return; // Wait till update connection closes.
if (updatedMap) { // We're up to date
map = *updatedMap;
failoverExchange->setUrls(getUrls(l));
@@ -895,6 +905,7 @@ void Cluster::checkUpdateIn(Lock& l) {
}
else if (updateRetracted) { // Update was retracted, request another update
updateRetracted = false;
+ updateClosed = false;
state = JOINER;
QPID_LOG(notice, *this << " update retracted, sending new update request.");
mcast.mcastControl(ClusterUpdateRequestBody(ProtocolVersion(), myUrl.str()), self);
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Tue Jun 22 13:29:52 2010
@@ -97,6 +97,7 @@ class Cluster : private Cpg::Handler, pu
void leave();
// Update completed - called in update thread
+ void updateInClosed();
void updateInDone(const ClusterMap&);
void updateInRetracted();
@@ -277,7 +278,7 @@ class Cluster : private Cpg::Handler, pu
bool lastBroker;
sys::Thread updateThread;
boost::optional<ClusterMap> updatedMap;
- bool updateRetracted;
+ bool updateRetracted, updateClosed;
ErrorCheck error;
UpdateReceiver updateReceiver;
ClusterTimer* timer;
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Jun 22 13:29:52 2010
@@ -22,7 +22,6 @@
#include "UpdateClient.h"
#include "Cluster.h"
#include "UpdateReceiver.h"
-
#include "qpid/assert.h"
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
@@ -43,7 +42,6 @@
#include "qpid/framing/ConnectionCloseOkBody.h"
#include "qpid/log/Statement.h"
#include "qpid/management/ManagementAgent.h"
-
#include <boost/current_function.hpp>
@@ -99,10 +97,9 @@ Connection::Connection(Cluster& c, sys::
{
cluster.addLocalConnection(this);
if (isLocalClient()) {
- // Local clients are announced to the cluster
- // and initialized when the announce is received.
giveReadCredit(cluster.getSettings().readMax); // Flow control
- init();
+ // Delay adding the connection to the management map until announce()
+ connectionCtor.delayManagement = true;
}
else {
// Catch-up shadow connections initialized using nextShadow id.
@@ -110,9 +107,9 @@ Connection::Connection(Cluster& c, sys::
if (!updateIn.nextShadowMgmtId.empty())
connectionCtor.mgmtId = updateIn.nextShadowMgmtId;
updateIn.nextShadowMgmtId.clear();
- init();
- }
- QPID_LOG(info, "incoming connection " << *this);
+ }
+ init();
+ QPID_LOG(debug, cluster << " local connection " << *this);
}
void Connection::setSecureConnection(broker::SecureConnection* sc) {
@@ -152,8 +149,11 @@ void Connection::announce(
QPID_ASSERT(ssf == connectionCtor.external.ssf);
QPID_ASSERT(authid == connectionCtor.external.authid);
QPID_ASSERT(nodict == connectionCtor.external.nodict);
- // Local connections are already initialized.
- if (isShadow()) {
+ // Local connections are already initialized but with management delayed.
+ if (isLocalClient()) {
+ connection->addManagementObject();
+ }
+ else if (isShadow()) {
init();
// Play initial frames into the connection.
Buffer buf(const_cast<char*>(initialFrames.data()), initialFrames.size());
@@ -162,8 +162,9 @@ void Connection::announce(
connection->received(frame);
connection->setUserId(username);
}
- // Raise the connection management event now that the connection is replicated.
+ // Do managment actions now that the connection is replicated.
connection->raiseConnectEvent();
+ QPID_LOG(debug, cluster << " replicated connection " << *this);
}
Connection::~Connection() {
@@ -249,6 +250,7 @@ void Connection::closed() {
if (isUpdated()) {
QPID_LOG(debug, cluster << " update connection closed " << *this);
close();
+ cluster.updateInClosed();
}
else if (catchUp) {
QPID_LOG(critical, cluster << " catch-up connection closed prematurely " << *this);
@@ -259,7 +261,8 @@ void Connection::closed() {
// closed and process any outstanding frames from the cluster
// until self-delivery of deliver-close.
output.closeOutput();
- cluster.getMulticast().mcastControl(ClusterConnectionDeliverCloseBody(), self);
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionDeliverCloseBody(ProtocolVersion(), false), self);
}
}
catch (const std::exception& e) {
@@ -268,17 +271,21 @@ void Connection::closed() {
}
// Self-delivery of close message, close the connection.
-void Connection::deliverClose () {
- assert(!catchUp);
- close();
+void Connection::deliverClose (bool aborted) {
+ QPID_LOG(debug, cluster << " replicated close of " << *this);
+ if (connection.get()) {
+ if (aborted) connection->abort();
+ else connection->closed();
+ connection.reset();
+ }
cluster.erase(self);
}
// Close the connection
void Connection::close() {
+ QPID_LOG(debug, cluster << " local close of " << *this);
if (connection.get()) {
connection->closed();
- // Ensure we delete the broker::Connection in the deliver thread.
connection.reset();
}
}
@@ -286,11 +293,9 @@ void Connection::close() {
// The connection has been killed for misbehaving, called in connection thread.
void Connection::abort() {
if (connection.get()) {
- connection->abort();
- // Ensure we delete the broker::Connection in the deliver thread.
- connection.reset();
+ cluster.getMulticast().mcastControl(
+ ClusterConnectionDeliverCloseBody(ProtocolVersion(), true), self);
}
- cluster.erase(self);
}
// ConnectionCodec::decode receives read buffers from directly-connected clients.
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Jun 22 13:29:52 2010
@@ -170,7 +170,7 @@ class Connection :
const std::string& initFrames);
void close();
void abort();
- void deliverClose();
+ void deliverClose(bool);
OutputInterceptor& getOutput() { return output; }
@@ -194,6 +194,7 @@ class Connection :
bool isLink;
uint64_t objectId;
bool shadow;
+ bool delayManagement;
ConnectionCtor(
sys::ConnectionOutputHandler* out_,
@@ -202,14 +203,19 @@ class Connection :
const qpid::sys::SecuritySettings& external_,
bool isLink_=false,
uint64_t objectId_=0,
- bool shadow_=false
+ bool shadow_=false,
+ bool delayManagement_=false
) : out(out_), broker(broker_), mgmtId(mgmtId_), external(external_),
- isLink(isLink_), objectId(objectId_), shadow(shadow_)
+ isLink(isLink_), objectId(objectId_), shadow(shadow_),
+ delayManagement(delayManagement_)
{}
std::auto_ptr<broker::Connection> construct() {
return std::auto_ptr<broker::Connection>(
- new broker::Connection(out, broker, mgmtId, external, isLink, objectId, shadow));
+ new broker::Connection(
+ out, broker, mgmtId, external, isLink, objectId,
+ shadow, delayManagement)
+ );
}
};
Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Tue Jun 22 13:29:52 2010
@@ -2321,6 +2321,23 @@ void ManagementAgent::importAgents(qpid:
}
}
+namespace {
+bool isNotDeleted(const ManagementObjectMap::value_type& value) {
+ return !value.second->isDeleted();
+}
+
+size_t countNotDeleted(const ManagementObjectMap& map) {
+ return std::count_if(map.begin(), map.end(), isNotDeleted);
+}
+
+void dumpMap(std::ostream& o, const ManagementObjectMap& map) {
+ for (ManagementObjectMap::const_iterator i = map.begin(); i != map.end(); ++i) {
+ if (!i->second->isDeleted())
+ o << endl << " " << i->second->getObjectId().getV2Key();
+ }
+}
+} // namespace
+
string ManagementAgent::debugSnapshot() {
ostringstream msg;
msg << " management snapshot:";
@@ -2328,8 +2345,8 @@ string ManagementAgent::debugSnapshot()
i != remoteAgents.end(); ++i)
msg << " " << i->second->routingKey;
msg << " packages: " << packages.size();
- msg << " objects: " << managementObjects.size();
- msg << " new objects: " << newManagementObjects.size();
+ msg << " objects: " << countNotDeleted(managementObjects);
+ msg << " new objects: " << countNotDeleted(newManagementObjects);
return msg.str();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/ClusterSafe.cpp Tue Jun 22 13:29:52 2010
@@ -43,8 +43,15 @@ void assertClusterSafe() {
}
}
-ClusterSafeScope::ClusterSafeScope() { inContext = true; }
-ClusterSafeScope::~ClusterSafeScope() { inContext = false; }
+ClusterSafeScope::ClusterSafeScope() {
+ assert(!inContext);
+ inContext = true;
+}
+
+ClusterSafeScope::~ClusterSafeScope() {
+ assert(inContext);
+ inContext = false;
+}
void enableClusterSafe() { inCluster = true; }
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue Jun 22 13:29:52 2010
@@ -255,7 +255,7 @@ class LongTests(BrokerTest):
StoppableThread.stop(self)
# def test_management
- args=["--mgmt-pub-interval", 1] # Publish management information every second.
+ args = ["--mgmt-pub-interval", 1] # Publish management information every second.
# Use store if present.
if BrokerTest.store_lib: args +=["--load-module", BrokerTest.store_lib]
cluster = self.cluster(3, args)
Modified: qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_long_cluster_tests Tue Jun 22 13:29:52 2010
@@ -20,5 +20,5 @@
#
srcdir=`dirname $0`
-$srcdir/run_cluster_tests 'cluster_tests.LongTests.*' -DDURATION=2
+$srcdir/run_cluster_tests 'cluster_tests.LongTests.*' -DDURATION=4
Modified: qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects (original)
+++ qpid/trunk/qpid/cpp/src/tests/verify_cluster_objects Tue Jun 22 13:29:52 2010
@@ -1,6 +1,5 @@
#!/usr/bin/env python
-#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
@@ -19,390 +18,83 @@
# under the License.
#
-import os
-import getopt
-import sys
-import locale
-import socket
-import re
-from qmf.console import Session, SchemaClass
-
-_host = "localhost"
-_connTimeout = 10
-_verbose = 0
-_del_test = False;
-pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
-_debug_recursion = 0
-
-def Usage ():
- print "Usage: verify_cluster_objects [OPTIONS] [broker-addr]"
- print
- print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
- print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
- print
- print " This program contacts every node of a cluster, loads all manageable objects from"
- print " those nodes and verifies that the management data is identical across the clusters."
- print
- print "Options:"
- print " --timeout seconds (10) Maximum time to wait for broker connection"
- print " --verbose level (0) Show details of objects and their IDs"
- print " --delete Delete some objects after creation, to test synchup"
- print
- sys.exit (1)
-
-class IpAddr:
- def __init__(self, text):
- if text.find("@") != -1:
- tokens = text.split("@")
- text = tokens[1]
- if text.find(":") != -1:
- tokens = text.split(":")
- text = tokens[0]
- self.port = int(tokens[1])
- else:
- self.port = 5672
- self.dottedQuad = socket.gethostbyname(text)
- nums = self.dottedQuad.split(".")
- self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
-
- def bestAddr(self, addrPortList):
- bestDiff = 0xFFFFFFFFL
- bestAddr = None
- for addrPort in addrPortList:
- diff = IpAddr(addrPort[0]).addr ^ self.addr
- if diff < bestDiff:
- bestDiff = diff
- bestAddr = addrPort
- return bestAddr
-
-class ObjectId:
- """Object identity, use for dictionaries by object id"""
- def __init__(self, object): self.object = object
- def __eq__(self, other): return self.object is other.object
- def __hash__(self): return hash(id(self.object))
-
-class Broker(object):
- def __init__(self, qmf, broker):
- self.broker = broker
- self.qmf = qmf
+# Verify managment objects are consistent in a cluster.
+# Arguments: url of one broker in the cluster.
- agents = qmf.getAgents()
- for a in agents:
- if a.getAgentBank() == '0':
- self.brokerAgent = a
-
- bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker",
- _agent=self.brokerAgent)[0]
- self.currentTime = bobj.getTimestamps()[0]
- try:
- self.uptime = bobj.uptime
- except:
- self.uptime = 0
- self.tablesByName = {}
- self.package = "org.apache.qpid.broker"
- self.id_cache = {} # Cache for getAbstractId
-
- def getUrl(self):
- return self.broker.getUrl()
-
- def getData(self):
- if _verbose > 1:
- print "Broker:", self.broker
-
- classList = self.qmf.getClasses(self.package)
- for cls in classList:
- if self.qmf.getSchema(cls).kind == SchemaClass.CLASS_KIND_TABLE:
- self.loadTable(cls)
-
-
- #
- # this should be a method on an object, but is kept here for now, until
- # we finish sorting out the treatment of names in qmfv2
- #
- def getAbstractId(self, object):
- """ return a string the of the hierarchical name """
- if (ObjectId(object) in self.id_cache): return self.id_cache[ObjectId(object)]
- global _debug_recursion
- result = u""
- valstr = u""
- _debug_recursion += 1
- debug_prefix = _debug_recursion
- if (_verbose > 9):
- print debug_prefix, " enter gai: props ", object._properties
- for property, value in object._properties:
-
- # we want to recurse on things which are refs. we tell by
- # asking each property if it's an index. I think...
- if (_verbose > 9):
- print debug_prefix, " prop ", property, " val " , value, " idx ",
- property.index, " type ", property.type
-
- # property is an instance, you can ask its type, name, etc.
-
- # special case system refs, as they will never be the same on
- # distinct cluster nodes. later we probably want a different
- # way of representing these objects, like for instance don't
- # include the system ref in the hierarchy.
-
- if property.name == "systemRef":
- _debug_recursion -= 1
- self.id_cache[ObjectId(object)] = ""
- return ""
-
- if property.index:
- if result != u"":
- result += u":"
- if property.type == 10:
- try:
- recursive_objects = object._session.getObjects(_objectId = value, _broker=object._broker)
- if (_verbose > 9):
- print debug_prefix, " r ", recursive_objects[0]
- for rp, rv in recursive_objects[0]._properties:
- print debug_prefix, " rrr ", rp, " idx-p ", rp.index, " v ", rv
- print debug_prefix, " recursing on ", recursive_objects[0]
- valstr = self.getAbstractId(recursive_objects[0])
- if (_verbose > 9):
- print debug_prefix, " recursing on ", recursive_objects[0],
- " -> ", valstr
- except Exception, e:
- if (_verbose > 9):
- print debug_prefix, " except ", e
- valstr = u"<undecodable>"
- else:
- # this yields UUID-blah. not good. try something else
- # valstr = value.__repr__()
- # print debug_prefix, " val ", value
-
- # yetch. this needs to be abstracted someplace? I don't
- # think we have the infrastructure we need to make these id
- # strings be sensible in the general case
- if property.name == "systemId":
- # special case. try to do something sensible about systemref objects
- valstr = object.nodeName
- else:
- valstr = value.__repr__() # I think...
- result += valstr
- if (_verbose > 9):
- print debug_prefix, " id ", self, " -> ", result
- _debug_recursion -= 1
- self.id_cache[ObjectId(object)] = result
- return result
-
- def loadTable(self, cls):
- if _verbose > 1:
- print " Class:", cls.getClassName()
- list = self.qmf.getObjects(_class=cls.getClassName(),
- _package=cls.getPackageName(),
- _agent=self.brokerAgent)
-
- # tables-by-name maps class name to a table by object-name of
- # objects. ie use the class name ("broker", "queue", etc) to
- # index tables-by-name, returning a second table, use the
- # object name to index that to get an object.
-
- self.tablesByName[cls.getClassName()] = {}
- for obj in list:
- # make sure we aren't colliding on name. it's an internal
- # error (ie, the name-generation code is busted) if we do
- key = self.getAbstractId(obj)
- if key in self.tablesByName[cls.getClassName()]:
- raise Exception("internal error: collision for %s on key %s\n"
- % (obj, key))
-
- self.tablesByName[cls.getClassName()][key] = obj
- if _verbose > 1:
- print " ", obj.getObjectId(), " ", obj.getIndex(), " ", key
+import qmf.console, sys, re
+class Session(qmf.console.Session):
+ """A qmf.console.Session that caches useful values"""
-class BrokerManager:
def __init__(self):
- self.brokerName = None
- self.qmf = None
- self.broker = None
- self.brokers = []
- self.cluster = None
-
- def SetBroker(self, brokerUrl):
- self.url = brokerUrl
- self.qmf = Session()
- self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
- agents = self.qmf.getAgents()
- for a in agents:
- if a.getAgentBank() == '0':
- self.brokerAgent = a
-
- def Disconnect(self):
- if self.broker:
- self.qmf.delBroker(self.broker)
-
- def _getCluster(self):
- packages = self.qmf.getPackages()
- if "org.apache.qpid.cluster" not in packages:
- return None
-
- clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
- if len(clusters) == 0:
- print "Clustering is installed but not enabled on the broker."
- return None
-
- self.cluster = clusters[0]
-
- def _getHostList(self, urlList):
- hosts = []
- hostAddr = IpAddr(_host)
- for url in urlList:
- if url.find("amqp:") != 0:
- raise Exception("Invalid URL 1")
- url = url[5:]
- addrs = str(url).split(",")
- addrList = []
- for addr in addrs:
- tokens = addr.split(":")
- if len(tokens) != 3:
- raise Exception("Invalid URL 2")
- addrList.append((tokens[1], tokens[2]))
-
- # Find the address in the list that is most likely to be
- # in the same subnet as the address with which we made the
- # original QMF connection. This increases the probability
- # that we will be able to reach the cluster member.
-
- best = hostAddr.bestAddr(addrList)
- bestUrl = best[0] + ":" + best[1]
- hosts.append(bestUrl)
- return hosts
-
-
- # the main fun which tests for broker state "identity". now that
- # we're using qmf2 style object names across the board, that test
- # means that we are ensuring that for all objects of a given
- # class, an object of that class with the same object name exists
- # on the peer broker.
-
- def verify(self):
- if _verbose > 0:
- print "Connecting to the cluster..."
- self._getCluster()
- if self.cluster:
- memberList = self.cluster.members.split(";")
- hostList = self._getHostList(memberList)
- self.qmf.delBroker(self.broker)
- self.broker = None
- for host in hostList:
- b = self.qmf.addBroker(host, _connTimeout)
- self.brokers.append(Broker(self.qmf, b))
- if _verbose > 0:
- print " ", b
- else:
- raise Exception("Failed - Not a cluster")
-
- failures = []
-
- # Wait until connections to all nodes are established before
- # loading the management data. This will ensure that the
- # objects are all stable and the same.
- if _verbose > 0:
- print "Loading management data from nodes..."
- for broker in self.brokers:
- broker.getData()
-
- # If we're testing delete-some-objects functionality, create a
- # few widgets here and then delete them.
- if _del_test:
- if _verbose > 0:
- print "Running delete test"
- # just stick 'em in the first broker
- b = self.brokers[0]
- session = b.qmf.brokers[0].getAmqpSession()
- session.queue_declare(queue="foo", exclusive=True, auto_delete=True)
- session.exchange_bind(exchange="amq.direct",
- queue="foo", binding_key="foo")
- session.queue_declare(queue="bar", exclusive=True, auto_delete=True)
- session.exchange_bind(exchange="amq.direct",
- queue="bar", binding_key="bar")
- # now delete 'em
- session.exchange_unbind(queue="foo", exchange="amq.direct", binding_key="foo")
- session.exchange_unbind(queue="bar", exchange="amq.direct", binding_key="bar")
- session.queue_delete("bar")
- session.queue_delete("foo")
-
- # Verify that each node has the same set of objects (based on
- # object name).
- if _verbose > 0:
- print "Verifying objects based on object name..."
- base = self.brokers[0]
- for broker in self.brokers[1:]:
-
- # walk over the class names, for each class (with some
- # exceptions) walk over the objects of that class, making
- # sure they match between broker A and broker B
-
- for className in base.tablesByName:
- if className in ["broker", "system", "connection"]:
- continue
-
- tab1 = base.tablesByName[className]
- tab2 = broker.tablesByName[className]
-
- for key in tab1:
- if key not in tab2:
- failures.append("%s key %s not found on node %s" %
- (className, key, broker.getUrl()))
- for key in tab2:
- if key not in tab1:
- failures.append("%s key %s not found on node %s" %
- (className, key, base.getUrl()))
-
- if len(failures) > 0:
- print "Failures:"
- for failure in failures:
- print " %s" % failure
- raise Exception("Failures")
-
- if _verbose > 0:
- print "Success"
-
-##
-## Main Program
-##
-
-try:
- longOpts = ("verbose=", "timeout=", "delete")
- (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "", longOpts)
-except:
- Usage()
-
-try:
- encoding = locale.getpreferredencoding()
- cargs = [a.decode(encoding) for a in encArgs]
-except:
- cargs = encArgs
-
-for opt in optlist:
- if opt[0] == "--timeout":
- _connTimeout = int(opt[1])
- if _connTimeout == 0:
- _connTimeout = None
- elif opt[0] == "--verbose":
- _verbose = int(opt[1])
- elif opt[0] == "--delete":
- _del_test = True;
- else:
- Usage()
-
-nargs = len(cargs)
-bm = BrokerManager()
-
-if nargs == 1:
- _host = cargs[0]
-
-try:
- bm.SetBroker(_host)
- bm.verify()
-except KeyboardInterrupt:
- print
-except Exception,e:
- print "Failed: %s - %s" % (e.__class__.__name__, e)
- sys.exit(1)
+ qmf.console.Session.__init__(self)
+ self.classes = None
+
+ def all_classes(self):
+ if self.classes is None:
+ self.classes = [c for p in self.getPackages() for c in self.getClasses(p)]
+ return self.classes
+
+class Broker:
+ def __init__(self, url, qmf):
+ self.url = url
+ self.qmf = qmf
+ self.broker = self.qmf.addBroker(url)
+ self.broker._waitForStable()
+ self.objects = None
+ self.ignore_list = [ re.compile("org.apache.qpid.broker:system:") ]
+
+ def get_objects(self):
+ def ignore(name):
+ for m in (m for m in self.ignore_list if m.match(name)):
+ return True
+ if self.objects is None:
+ obj_list = []
+ for c in self.qmf.all_classes():
+ for o in self.qmf.getObjects(_key=c, _broker=self.broker):
+ name=o.getObjectId().getObject()
+ if not ignore(name): obj_list.append(name)
+ self.objects = set(obj_list)
+ if (len(obj_list) != len(self.objects)):
+ raise Exception("Duplicates in object list for %s"%(self.url))
+ return self.objects
+
+ def compare(self,other):
+ def compare1(x,y):
+ diff = x.get_objects() - y.get_objects()
+ if diff:
+ print "ERROR: found on %s but not %s"%(x, y)
+ for o in diff: print " %s"%(o)
+ return False
+ return True
+
+ so = compare1(self, other)
+ os = compare1(other, self)
+ return so and os
+
+ def __str__(self): return self.url
+
+ def get_cluster(self):
+ """Given one Broker, return list of all brokers in its cluster"""
+ clusters = self.qmf.getObjects(_class="cluster")
+ if not clusters: raise ("%s is not a cluster member"%(self.url))
+ def first_address(url):
+ """Python doesn't understand the brokers URL syntax. Extract a simple addres"""
+ return re.compile("amqp:tcp:([^,]*)").match(url).group(1)
+ return [Broker(first_address(url), self.qmf) for url in clusters[0].members.split(";")]
+
+ def __del__(self): self.qmf.delBroker(self.broker)
+
+def main(argv=None):
+ if argv is None: argv = sys.argv
+ qmf = Session()
+ brokers = Broker(argv[1], qmf).get_cluster()
+ base = brokers.pop(0)
+ result = 0
+ for b in brokers:
+ if not base.compare(b): result = 1
+ del base
+ del brokers
+ return result
-bm.Disconnect()
+if __name__ == "__main__": sys.exit(main())
Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=956882&r1=956881&r2=956882&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Jun 22 13:29:52 2010
@@ -134,7 +134,9 @@
</control>
<!-- Marks the cluster-wide point when a connection is considered closed. -->
- <control name="deliver-close" code="0x2"/>
+ <control name="deliver-close" code="0x2">
+ <field name="aborted" type="bit"/>
+ </control>
<!-- Permission to generate output up to the limit. -->
<control name="deliver-do-output" code="0x3">
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org