You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2010/03/31 23:13:14 UTC
svn commit: r929716 [4/4] - in /qpid/trunk/qpid: cpp/bindings/qmf/tests/
cpp/examples/qmf-agent/ cpp/include/qpid/agent/ cpp/include/qpid/framing/
cpp/include/qpid/management/ cpp/managementgen/ cpp/managementgen/qmfgen/
cpp/managementgen/qmfgen/templa...
Modified: qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py (original)
+++ qpid/trunk/qpid/extras/qmf/src/py/qmf/console.py Wed Mar 31 21:13:12 2010
@@ -41,6 +41,9 @@ from cStringIO import StringIO
#import qpid.log
#qpid.log.enable(name="qpid.io.cmd", level=qpid.log.DEBUG)
+#===================================================================================================
+# CONSOLE
+#===================================================================================================
class Console:
""" To access the asynchronous operations, a class must be derived from
Console with overrides of any combination of the available methods. """
@@ -94,6 +97,10 @@ class Console:
""" Invoked when a method response from an asynchronous method call is received. """
pass
+
+#===================================================================================================
+# BrokerURL
+#===================================================================================================
class BrokerURL(URL):
def __init__(self, text):
URL.__init__(self, text)
@@ -115,16 +122,30 @@ class BrokerURL(URL):
def match(self, host, port):
return socket.getaddrinfo(self.host, self.port)[0][4] == socket.getaddrinfo(host, port)[0][4]
+
+#===================================================================================================
+# Object
+#===================================================================================================
class Object(object):
- """ This class defines a 'proxy' object representing a real managed object on an agent.
- Actions taken on this proxy are remotely affected on the real managed object.
"""
- def __init__(self, session, broker, schema, codec, prop, stat, managed=True, kwargs={}):
- self._session = session
- self._broker = broker
+ This class defines a 'proxy' object representing a real managed object on an agent.
+ Actions taken on this proxy are remotely affected on the real managed object.
+ """
+ def __init__(self, agent, schema, codec=None, prop=None, stat=None, v2Map=None, agentName=None, kwargs={}):
+ self._agent = agent
+ self._session = None
+ self._broker = None
+ if agent:
+ self._session = agent.session
+ self._broker = agent.broker
self._schema = schema
- self._managed = managed
- if self._managed:
+ self._properties = []
+ self._statistics = []
+ if v2Map:
+ self.v2Init(v2Map, agentName)
+ return
+
+ if self._agent:
self._currentTime = codec.read_uint64()
self._createTime = codec.read_uint64()
self._deleteTime = codec.read_uint64()
@@ -134,8 +155,6 @@ class Object(object):
self._createTime = None
self._deleteTime = None
self._objectId = None
- self._properties = []
- self._statistics = []
if codec:
if prop:
notPresent = self._parsePresenceMasks(codec, schema)
@@ -143,18 +162,38 @@ class Object(object):
if property.name in notPresent:
self._properties.append((property, None))
else:
- self._properties.append((property, self._session._decodeValue(codec, property.type, broker)))
+ self._properties.append((property, self._session._decodeValue(codec, property.type, self._broker)))
if stat:
for statistic in schema.getStatistics():
- self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, broker)))
+ self._statistics.append((statistic, self._session._decodeValue(codec, statistic.type, self._broker)))
else:
for property in schema.getProperties():
if property.optional:
self._properties.append((property, None))
else:
- self._properties.append((property, self._session._defaultValue(property, broker, kwargs)))
+ self._properties.append((property, self._session._defaultValue(property, self._broker, kwargs)))
for statistic in schema.getStatistics():
- self._statistics.append((statistic, self._session._defaultValue(statistic, broker, kwargs)))
+ self._statistics.append((statistic, self._session._defaultValue(statistic, self._broker, kwargs)))
+
+ def v2Init(self, omap, agentName):
+ if omap.__class__ != dict:
+ raise Exception("QMFv2 object data must be a map/dict")
+ if '_values' not in omap:
+ raise Exception("QMFv2 object must have '_values' element")
+
+ values = omap['_values']
+ for prop in self._schema.getProperties():
+ if prop.name in values:
+ self._properties.append((prop, values[prop.name]))
+ for stat in self._schema.getStatistics():
+ if stat.name in values:
+ self._statistics.append((stat, values[stat.name]))
+ if '_subtypes' in omap:
+ self._subtypes = omap['_subtypes']
+ if '_object_id' in omap:
+ self._objectId = ObjectId(omap['_object_id'], agentName=agentName)
+ else:
+ self._objectId = None
def getBroker(self):
""" Return the broker from which this object was sent """
@@ -186,7 +225,7 @@ class Object(object):
def isManaged(self):
""" Return True iff this object is a proxy for a managed object on an agent. """
- return self._managed
+ return self._objectId and self._agent
def getIndex(self):
""" Return a string describing this object's primary key. """
@@ -225,7 +264,7 @@ class Object(object):
""" Contact the agent and retrieve the lastest property and statistic values for this object. """
if not self.isManaged():
raise Exception("Object is not managed")
- obj = self._session.getObjects(_objectId = self._objectId, _broker=self._broker)
+ obj = self._agent.getObjects(_objectId=self._objectId)
if obj:
self.mergeUpdate(obj[0])
else:
@@ -244,17 +283,17 @@ class Object(object):
for method in self._schema.getMethods():
if name == method.name:
return lambda *args, **kwargs : self._invoke(name, args, kwargs)
- for property, value in self._properties:
- if name == property.name:
+ for prop, value in self._properties:
+ if name == prop.name:
return value
- if name == "_" + property.name + "_" and property.type == 10: # Dereference references
- deref = self._session.getObjects(_objectId=value, _broker=self._broker)
+ if name == "_" + prop.name + "_" and prop.type == 10: # Dereference references
+ deref = self._agent.getObjects(_objectId=value)
if len(deref) != 1:
return None
else:
return deref[0]
- for statistic, value in self._statistics:
- if name == statistic.name:
+ for stat, value in self._statistics:
+ if name == stat.name:
return value
raise Exception("Type Object has no attribute '%s'" % name)
@@ -282,10 +321,6 @@ class Object(object):
aIdx = 0
sendCodec = Codec()
seq = self._session.seqMgr._reserve((method, synchronous))
- self._broker._setHeader(sendCodec, 'M', seq)
- self._objectId.encode(sendCodec)
- self._schema.getKey().encode(sendCodec)
- sendCodec.write_str8(name)
count = 0
for arg in method.arguments:
@@ -294,24 +329,64 @@ class Object(object):
if count != len(args):
raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(args)))
- for arg in method.arguments:
- if arg.dir.find("I") != -1:
- self._session._encodeValue(sendCodec, args[aIdx], arg.type)
- aIdx += 1
- if timeWait:
- ttl = timeWait * 1000
+ if self._agent.isV2:
+ #
+ # Compose and send a QMFv2 method request
+ #
+ call = {}
+ call['_object_id'] = self._objectId.asMap()
+ call['_method_name'] = name
+ argMap = {}
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ argMap[arg.name] = args[aIdx]
+ aIdx += 1
+ call['_arguments'] = argMap
+
+ dp = self._broker.amqpSession.delivery_properties()
+ dp.routing_key = self._objectId.getAgentBank()
+ mp = self._broker.amqpSession.message_properties()
+ mp.content_type = "amqp/map"
+ mp.user_id = self._broker.authUser
+ mp.correlation_id = str(seq)
+ mp.app_id = "qmf2"
+ mp.reply_to = self._broker.amqpSession.reply_to("qmf.default.direct", self._broker.v2_queue_name)
+ mp.application_headers = {'qmf.opcode':'_method_request'}
+ sendCodec.write_map(call)
+ smsg = Message(dp, mp, sendCodec.encoded)
+ exchange = "qmf.default.direct"
+
else:
- ttl = None
- smsg = self._broker._message(sendCodec.encoded, "agent.%d.%d" %
- (self._objectId.getBrokerBank(), self._objectId.getAgentBank()),
- ttl=ttl)
+ #
+ # Associate this sequence with the agent hosting the object so we can correctly
+ # route the method-response
+ #
+ agent = self._broker.getAgent(self._broker.getBrokerBank(), self._objectId.getAgentBank())
+ self._broker._setSequence(seq, agent)
+
+ #
+ # Compose and send a QMFv1 method request
+ #
+ self._broker._setHeader(sendCodec, 'M', seq)
+ self._objectId.encode(sendCodec)
+ self._schema.getKey().encode(sendCodec)
+ sendCodec.write_str8(name)
+
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ self._session._encodeValue(sendCodec, args[aIdx], arg.type)
+ aIdx += 1
+ smsg = self._broker._message(sendCodec.encoded, "agent.%d.%s" %
+ (self._objectId.getBrokerBank(), self._objectId.getAgentBank()))
+ exchange = "qpid.management"
+
if synchronous:
try:
self._broker.cv.acquire()
self._broker.syncInFlight = True
finally:
self._broker.cv.release()
- self._broker._send(smsg)
+ self._broker._send(smsg, exchange)
return seq
return None
@@ -352,7 +427,6 @@ class Object(object):
raise Exception("Invalid Method (software defect) [%s]" % name)
def _encodeUnmanaged(self, codec):
-
codec.write_uint8(20)
codec.write_str8(self._schema.getKey().getPackageName())
codec.write_str8(self._schema.getKey().getClassName())
@@ -399,6 +473,10 @@ class Object(object):
bit = 0
return excludeList
+
+#===================================================================================================
+# Session
+#===================================================================================================
class Session:
"""
An instance of the Session class represents a console session running
@@ -423,6 +501,7 @@ class Session:
list: 21
}
+
def __init__(self, console=None, rcvObjects=True, rcvEvents=True, rcvHeartbeats=True,
manageConnections=False, userBindings=False):
"""
@@ -450,7 +529,7 @@ class Session:
"""
self.console = console
self.brokers = []
- self.packages = {}
+ self.schemaCache = SchemaCache()
self.seqMgr = SequenceManager()
self.cv = Condition()
self.syncSequenceList = []
@@ -471,9 +550,35 @@ class Session:
if self.userBindings and not self.rcvObjects:
raise Exception("userBindings can't be set unless rcvObjects is set and a console is provided")
+
+ def _getBrokerForAgentAddr(self, agent_addr):
+ try:
+ self.cv.acquire()
+ key = (1, agent_addr)
+ for b in self.brokers:
+ if key in b.agents:
+ return b
+ finally:
+ self.cv.release()
+ return None
+
+
+ def _getAgentForAgentAddr(self, agent_addr):
+ try:
+ self.cv.acquire()
+ key = agent_addr
+ for b in self.brokers:
+ if key in b.agents:
+ return b.agents[key]
+ finally:
+ self.cv.release()
+ return None
+
+
def __repr__(self):
return "QMF Console Session Manager (brokers: %d)" % len(self.brokers)
+
def addBroker(self, target="localhost", timeout=None, mechanisms=None):
""" Connect to a Qpid broker. Returns an object of type Broker. """
url = BrokerURL(target)
@@ -482,9 +587,12 @@ class Session:
self.brokers.append(broker)
if not self.manageConnections:
- self.getObjects(broker=broker, _class="agent")
+ agent = broker.getAgent(1,0)
+ if agent:
+ agent.getObjects(_class="agent")
return broker
+
def delBroker(self, broker):
""" Disconnect from a broker. The 'broker' argument is the object
returned from the addBroker call """
@@ -495,34 +603,27 @@ class Session:
self.brokers.remove(broker)
del broker
+
def getPackages(self):
""" Get the list of known QMF packages """
for broker in self.brokers:
broker._waitForStable()
- list = []
- for package in self.packages:
- list.append(package)
- return list
+ return self.schemaCache.getPackages()
+
def getClasses(self, packageName):
""" Get the list of known classes within a QMF package """
for broker in self.brokers:
broker._waitForStable()
- list = []
- if packageName in self.packages:
- for pkey in self.packages[packageName]:
- list.append(self.packages[packageName][pkey].getKey())
- return list
+ return self.schemaCache.getClasses(packageName)
+
def getSchema(self, classKey):
""" Get the schema for a QMF class """
for broker in self.brokers:
broker._waitForStable()
- pname = classKey.getPackageName()
- pkey = classKey.getPackageKey()
- if pname in self.packages:
- if pkey in self.packages[pname]:
- return self.packages[pname][pkey]
+ return self.schemaCache.getSchema(classKey)
+
def bindPackage(self, packageName):
""" Request object updates for all table classes within a package. """
@@ -535,6 +636,7 @@ class Session:
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
binding_key=key)
+
def bindClass(self, pname, cname):
""" Request object updates for a particular table class by package and class name. """
if not self.userBindings or not self.rcvObjects:
@@ -545,6 +647,7 @@ class Session:
if broker.isConnected():
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
binding_key=key)
+
def bindClassKey(self, classKey):
""" Request object updates for a particular table class by class key. """
@@ -552,6 +655,7 @@ class Session:
cname = classKey.getClassName()
self.bindClass(pname, cname)
+
def getAgents(self, broker=None):
""" Get a list of currently known agents """
brokerList = []
@@ -569,12 +673,14 @@ class Session:
agentList.append(a)
return agentList
- def makeObject(self, classKey, broker=None, **kwargs):
+
+ def makeObject(self, classKey, **kwargs):
""" Create a new, unmanaged object of the schema indicated by classKey """
schema = self.getSchema(classKey)
if schema == None:
raise Exception("Schema not found for classKey")
- return Object(self, broker, schema, None, True, True, False, kwargs)
+ return Object(None, schema, None, True, True, kwargs)
+
def getObjects(self, **kwargs):
""" Get a list of objects from QMF agents.
@@ -644,81 +750,24 @@ class Session:
if len(agentList) == 0:
return []
- pname = None
- cname = None
- hash = None
- classKey = None
- if "_schema" in kwargs: classKey = kwargs["_schema"].getKey()
- elif "_key" in kwargs: classKey = kwargs["_key"]
- elif "_class" in kwargs:
- cname = kwargs["_class"]
- if "_package" in kwargs:
- pname = kwargs["_package"]
- if cname == None and classKey == None and "_objectId" not in kwargs:
- raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument")
-
- map = {}
- self.getSelect = []
- if "_objectId" in kwargs:
- map["_objectid"] = kwargs["_objectId"].__repr__()
- else:
- if cname == None:
- cname = classKey.getClassName()
- pname = classKey.getPackageName()
- hash = classKey.getHash()
- map["_class"] = cname
- if pname != None: map["_package"] = pname
- if hash != None: map["_hash"] = hash
- for item in kwargs:
- if item[0] != '_':
- self.getSelect.append((item, kwargs[item]))
-
- self.getResult = []
+ #
+ # We now have a list of agents to query, start the queries and gather the results.
+ #
+ request = SessionGetRequest(len(agentList))
for agent in agentList:
- broker = agent.broker
- sendCodec = Codec()
- try:
- self.cv.acquire()
- seq = self.seqMgr._reserve(self._CONTEXT_MULTIGET)
- self.syncSequenceList.append(seq)
- finally:
- self.cv.release()
- broker._setHeader(sendCodec, 'G', seq)
- sendCodec.write_map(map)
- smsg = broker._message(sendCodec.encoded, "agent.%d.%d" % (agent.brokerBank, agent.agentBank))
- broker._send(smsg)
-
- starttime = time()
- timeout = False
- if "_timeout" in kwargs:
- waitTime = kwargs["_timeout"]
- else:
- waitTime = self.DEFAULT_GET_WAIT_TIME
- try:
- self.cv.acquire()
- while len(self.syncSequenceList) > 0 and self.error == None:
- self.cv.wait(waitTime)
- if time() - starttime > waitTime:
- for pendingSeq in self.syncSequenceList:
- self.seqMgr._release(pendingSeq)
- self.syncSequenceList = []
- timeout = True
- finally:
- self.cv.release()
-
- if self.error:
- errorText = self.error
- self.error = None
- raise Exception(errorText)
+ agent.getObjects(request, **kwargs)
+ timeout = 60
+ if '_timeout' in kwargs:
+ timeout = kwargs['_timeout']
+ request.wait(timeout)
+ return request.result
- if len(self.getResult) == 0 and timeout:
- raise RuntimeError("No agent responded within timeout period")
- return self.getResult
def setEventFilter(self, **kwargs):
""" """
pass
+
def _bindingKeys(self):
keyList = []
keyList.append("schema.#")
@@ -735,18 +784,21 @@ class Session:
keyList.append("console.heartbeat.#")
return keyList
+
def _handleBrokerConnect(self, broker):
if self.console:
for agent in broker.getAgents():
self.console.newAgent(agent)
self.console.brokerConnected(broker)
+
def _handleBrokerDisconnect(self, broker):
if self.console:
for agent in broker.getAgents():
self.console.delAgent(agent)
self.console.brokerDisconnected(broker)
+
def _handleBrokerResp(self, broker, codec, seq):
broker.brokerId = codec.read_uuid()
if self.console != None:
@@ -760,16 +812,10 @@ class Session:
smsg = broker._message(sendCodec.encoded)
broker._send(smsg)
+
def _handlePackageInd(self, broker, codec, seq):
pname = str(codec.read_str8())
- notify = False
- try:
- self.cv.acquire()
- if pname not in self.packages:
- self.packages[pname] = {}
- notify = True
- finally:
- self.cv.release()
+ notify = self.schemaCache.declarePackage(pname)
if notify and self.console != None:
self.console.newPackage(pname)
@@ -782,7 +828,8 @@ class Session:
smsg = broker._message(sendCodec.encoded)
broker._send(smsg)
- def _handleCommandComplete(self, broker, codec, seq):
+
+ def _handleCommandComplete(self, broker, codec, seq, agent):
code = codec.read_uint32()
text = codec.read_str8()
context = self.seqMgr._release(seq)
@@ -804,20 +851,16 @@ class Session:
finally:
self.cv.release()
+ if agent:
+ agent._handleV1Completion(seq, code, text)
+
+
def _handleClassInd(self, broker, codec, seq):
kind = codec.read_uint8()
classKey = ClassKey(codec)
- unknown = False
-
- try:
- self.cv.acquire()
- if classKey.getPackageName() in self.packages:
- if classKey.getPackageKey() not in self.packages[classKey.getPackageName()]:
- unknown = True
- finally:
- self.cv.release()
+ schema = self.schemaCache.getSchema(classKey)
- if unknown:
+ if not schema:
# Send a schema request for the unknown class
broker._incOutstanding()
sendCodec = Codec()
@@ -827,30 +870,6 @@ class Session:
smsg = broker._message(sendCodec.encoded)
broker._send(smsg)
- def _handleMethodResp(self, broker, codec, seq):
- code = codec.read_uint32()
- text = codec.read_str16()
- outArgs = {}
- pair = self.seqMgr._release(seq)
- if pair == None:
- return
- method, synchronous = pair
- if code == 0:
- for arg in method.arguments:
- if arg.dir.find("O") != -1:
- outArgs[arg.name] = self._decodeValue(codec, arg.type, broker)
- result = MethodResult(code, text, outArgs)
- if synchronous:
- try:
- broker.cv.acquire()
- broker.syncResult = result
- broker.syncInFlight = False
- broker.cv.notify()
- finally:
- broker.cv.release()
- else:
- if self.console:
- self.console.methodResponse(broker, seq, result)
def _handleHeartbeatInd(self, broker, codec, seq, msg):
brokerBank = 1
@@ -873,59 +892,49 @@ class Session:
timestamp = codec.read_uint64()
if self.console != None and agent != None:
self.console.heartbeat(agent, timestamp)
+ broker._ageAgents()
- def _handleEventInd(self, broker, codec, seq):
- if self.console != None:
- event = Event(self, broker, codec)
- self.console.event(broker, event)
- def _handleSchemaResp(self, broker, codec, seq):
+ def _handleSchemaResp(self, broker, codec, seq, agent_addr):
kind = codec.read_uint8()
classKey = ClassKey(codec)
_class = SchemaClass(kind, classKey, codec, self)
- try:
- self.cv.acquire()
- self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class
- finally:
- self.cv.release()
-
- self.seqMgr._release(seq)
- broker._decOutstanding()
+ self.schemaCache.declareClass(classKey, _class)
+ ctx = self.seqMgr._release(seq)
+ if ctx:
+ broker._decOutstanding()
if self.console != None:
self.console.newClass(kind, classKey)
- def _handleContentInd(self, broker, codec, seq, prop=False, stat=False):
- classKey = ClassKey(codec)
- try:
- self.cv.acquire()
- pname = classKey.getPackageName()
- if pname not in self.packages:
- return
- pkey = classKey.getPackageKey()
- if pkey not in self.packages[pname]:
- return
- schema = self.packages[pname][pkey]
- finally:
- self.cv.release()
+ if agent_addr and (agent_addr.__class__ == str or agent_addr.__class__ == unicode):
+ agent = self._getAgentForAgentAddr(agent_addr)
+ if agent:
+ agent._schemaInfoFromV2Agent()
- object = Object(self, broker, schema, codec, prop, stat)
- if pname == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
- broker._updateAgent(object)
+ def _v2HandleHeartbeatInd(self, broker, mp, ah, content):
try:
- self.cv.acquire()
- if seq in self.syncSequenceList:
- if object.getTimestamps()[2] == 0 and self._selectMatch(object):
- self.getResult.append(object)
- return
- finally:
- self.cv.release()
+ agentName = ah["qmf.agent"]
+ values = content["_values"]
+ timestamp = values["timestamp"]
+ interval = values["heartbeat_interval"]
+ except:
+ return
+
+ agent = broker.getAgent(1, agentName)
+ if agent == None:
+ agent = Agent(broker, agentName, "QMFv2 Agent", True, interval)
+ broker._addAgent(agentName, agent)
+ else:
+ agent.touch()
+ if self.console and agent:
+ self.console.heartbeat(agent, timestamp)
+ broker._ageAgents()
+
+
+ def _v2HandleAgentLocateRsp(self, broker, mp, ah, content):
+ self._v2HandleHeartbeatInd(broker, mp, ah, content)
- if self.console and self.rcvObjects:
- if prop:
- self.console.objectProps(broker, object)
- if stat:
- self.console.objectStats(broker, object)
def _handleError(self, error):
try:
@@ -937,6 +946,7 @@ class Session:
finally:
self.cv.release()
+
def _selectMatch(self, object):
""" Check the object against self.getSelect to check for a match """
for key, value in self.getSelect:
@@ -945,6 +955,7 @@ class Session:
return False
return True
+
def _decodeValue(self, codec, typecode, broker=None):
""" Decode, from the codec, a value based on its typecode. """
if typecode == 1: data = codec.read_uint8() # U8
@@ -971,17 +982,9 @@ class Session:
inner_type_code = codec.read_uint8()
if inner_type_code == 20:
classKey = ClassKey(codec)
- try:
- self.cv.acquire()
- pname = classKey.getPackageName()
- if pname not in self.packages:
- return None
- pkey = classKey.getPackageKey()
- if pkey not in self.packages[pname]:
- return None
- schema = self.packages[pname][pkey]
- finally:
- self.cv.release()
+ schema = self.schemaCache.getSchema(classKey)
+ if not schema:
+ return None
data = Object(self, broker, schema, codec, True, True, False)
else:
data = self._decodeValue(codec, inner_type_code, broker)
@@ -999,6 +1002,7 @@ class Session:
raise ValueError("Invalid type code: %d" % typecode)
return data
+
def _encodeValue(self, codec, value, typecode):
""" Encode, into the codec, a value based on its typecode. """
if typecode == 1: codec.write_uint8 (int(value)) # U8
@@ -1033,9 +1037,11 @@ class Session:
else:
raise ValueError ("Invalid type code: %d" % typecode)
+
def encoding(self, value):
return self._encoding(value.__class__)
+
def _encoding(self, klass):
if Session.ENCODINGS.has_key(klass):
return self.ENCODINGS[klass]
@@ -1044,6 +1050,7 @@ class Session:
if result != None:
return result
+
def _displayValue(self, value, typecode):
""" """
if typecode == 1: return unicode(value)
@@ -1072,6 +1079,7 @@ class Session:
else:
raise ValueError ("Invalid type code: %d" % typecode)
+
def _defaultValue(self, stype, broker=None, kwargs={}):
""" """
typecode = stype.type
@@ -1110,6 +1118,7 @@ class Session:
else:
raise ValueError ("Invalid type code: %d" % typecode)
+
def _bestClassKey(self, pname, cname, preferredList):
""" """
if pname == None or cname == None:
@@ -1125,6 +1134,7 @@ class Session:
return c
return None
+
def _sendMethodRequest(self, broker, schemaKey, objectId, name, argList):
""" This function can be used to send a method request to an object given only the
broker, schemaKey, and objectId. This is an uncommon usage pattern as methods are
@@ -1133,14 +1143,10 @@ class Session:
schema = self.getSchema(schemaKey)
for method in schema.getMethods():
if name == method.name:
- aIdx = 0
- sendCodec = Codec()
- seq = self.seqMgr._reserve((method, False))
- broker._setHeader(sendCodec, 'M', seq)
- objectId.encode(sendCodec)
- schemaKey.encode(sendCodec)
- sendCodec.write_str8(name)
-
+ #
+ # Count the arguments supplied and validate that the number is what is expected
+ # based on the schema.
+ #
count = 0
for arg in method.arguments:
if arg.dir.find("I") != -1:
@@ -1148,25 +1154,192 @@ class Session:
if count != len(argList):
raise Exception("Incorrect number of arguments: expected %d, got %d" % (count, len(argList)))
- for arg in method.arguments:
- if arg.dir.find("I") != -1:
- self._encodeValue(sendCodec, argList[aIdx], arg.type)
- aIdx += 1
- smsg = broker._message(sendCodec.encoded, "agent.%d.%d" %
- (objectId.getBrokerBank(), objectId.getAgentBank()))
- broker._send(smsg)
+ aIdx = 0
+ sendCodec = Codec()
+ seq = self.seqMgr._reserve((method, False))
+
+ if objectId.isV2():
+ #
+ # Compose and send a QMFv2 method request
+ #
+ call = {}
+ call['_object_id'] = objectId.asMap()
+ call['_method_name'] = name
+ args = {}
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ args[arg.name] = argList[aIdx]
+ aIdx += 1
+ call['_arguments'] = args
+
+ dp = broker.amqpSession.delivery_properties()
+ dp.routing_key = objectId.getAgentBank()
+ mp = broker.amqpSession.message_properties()
+ mp.content_type = "amqp/map"
+ mp.user_id = broker.authUser
+ mp.correlation_id = str(seq)
+ mp.app_id = "qmf2"
+ mp.reply_to = broker.amqpSession.reply_to("qmf.default.direct", broker.v2_queue_name)
+ mp.application_headers = {'qmf.opcode':'_method_request'}
+ sendCodec.write_map(call)
+ msg = Message(dp, mp, sendCodec.encoded)
+ broker._send(msg, "qmf.default.direct")
+
+ else:
+ #
+ # Associate this sequence with the agent hosting the object so we can correctly
+ # route the method-response
+ #
+ agent = broker.getAgent(broker.getBrokerBank(), objectId.getAgentBank())
+ broker._setSequence(seq, agent)
+
+ #
+ # Compose and send a QMFv1 method request
+ #
+ broker._setHeader(sendCodec, 'M', seq)
+ objectId.encode(sendCodec)
+ schemaKey.encode(sendCodec)
+ sendCodec.write_str8(name)
+
+ for arg in method.arguments:
+ if arg.dir.find("I") != -1:
+ self._encodeValue(sendCodec, argList[aIdx], arg.type)
+ aIdx += 1
+ smsg = broker._message(sendCodec.encoded, "agent.%d.%s" %
+ (objectId.getBrokerBank(), objectId.getAgentBank()))
+ broker._send(smsg)
return seq
return None
-class Package:
- """ """
- def __init__(self, name):
- self.name = name
+#===================================================================================================
+# SessionGetRequest
+#===================================================================================================
+class SessionGetRequest(object):
+ """
+ This class is used to track get-object queries at the Session level.
+ """
+ def __init__(self, agentCount):
+ self.agentCount = agentCount
+ self.result = []
+ self.cv = Condition()
+ self.waiting = True
+
+ def __call__(self, **kwargs):
+ """
+ Callable entry point for gathering collected objects.
+ """
+ try:
+ self.cv.acquire()
+ if 'qmf_object' in kwargs:
+ self.result.append(kwargs['qmf_object'])
+ elif 'qmf_complete' in kwargs or 'qmf_exception' in kwargs:
+ self.agentCount -= 1
+ if self.agentCount == 0:
+ self.waiting = None
+ self.cv.notify()
+ finally:
+ self.cv.release()
+
+ def wait(self, timeout):
+ starttime = time()
+ try:
+ self.cv.acquire()
+ while self.waiting:
+ if (time() - starttime) > timeout:
+ raise Exception("Timed out after %d seconds" % timeout)
+ self.cv.wait(1)
+ finally:
+ self.cv.release()
+
+
+#===================================================================================================
+# SchemaCache
+#===================================================================================================
+class SchemaCache(object):
+ """
+ The SchemaCache is a data structure that stores learned schema information.
+ """
+ def __init__(self):
+ """
+ Create a map of schema packages and a lock to protect this data structure.
+ Note that this lock is at the bottom of any lock hierarchy. If it is held, no other
+ lock in the system should attempt to be acquired.
+ """
+ self.packages = {}
+ self.lock = Lock()
+
+ def getPackages(self):
+ """ Get the list of known QMF packages """
+ list = []
+ try:
+ self.lock.acquire()
+ for package in self.packages:
+ list.append(package)
+ finally:
+ self.lock.release()
+ return list
+
+ def getClasses(self, packageName):
+ """ Get the list of known classes within a QMF package """
+ list = []
+ try:
+ self.lock.acquire()
+ if packageName in self.packages:
+ for pkey in self.packages[packageName]:
+ list.append(self.packages[packageName][pkey].getKey())
+ finally:
+ self.lock.release()
+ return list
+
+ def getSchema(self, classKey):
+ """ Get the schema for a QMF class """
+ pname = classKey.getPackageName()
+ pkey = classKey.getPackageKey()
+ try:
+ self.lock.acquire()
+ if pname in self.packages:
+ if pkey in self.packages[pname]:
+ return self.packages[pname][pkey]
+ finally:
+ self.lock.release()
+ return None
+
+ def declarePackage(self, pname):
+ """ Maybe add a package to the cache. Return True if package was added, None if it pre-existed. """
+ try:
+ self.lock.acquire()
+ if pname in self.packages:
+ return None
+ self.packages[pname] = {}
+ finally:
+ self.lock.release()
+ return True
+
+ def declareClass(self, classKey, classDef):
+ """ Maybe add a class definition to the cache. Return True if added, None if pre-existed. """
+ pname = classKey.getPackageName()
+ pkey = classKey.getPackageKey()
+ try:
+ self.lock.acquire()
+ if pname not in self.packages:
+ self.packages[pname] = {}
+ packageMap = self.packages[pname]
+ if pkey in packageMap:
+ return None
+ packageMap[pkey] = classDef
+ finally:
+ self.lock.release()
+ return True
+
+
+#===================================================================================================
+# ClassKey
+#===================================================================================================
class ClassKey:
""" A ClassKey uniquely identifies a class from the schema. """
def __init__(self, constructor):
- if type(constructor) == str:
+ if constructor.__class__ == str:
# construct from __repr__ string
try:
self.pname, cls = constructor.split(":")
@@ -1177,20 +1350,33 @@ class ClassKey:
h1 = int(hexValues[1], 16)
h2 = int(hexValues[2], 16)
h3 = int(hexValues[3], 16)
- self.hash = struct.pack("!LLLL", h0, h1, h2, h3)
+ h4 = int(hexValues[4][0:4], 16)
+ h5 = int(hexValues[4][4:12], 16)
+ self.hash = UUID(struct.pack("!LHHHHL", h0, h1, h2, h3, h4, h5))
except:
raise Exception("Invalid ClassKey format")
+ elif constructor.__class__ == dict:
+ # construct from QMFv2 map
+ try:
+ self.pname = constructor['_package_name']
+ self.cname = constructor['_class_name']
+ self.hash = constructor['_hash']
+ except:
+ raise Exception("Invalid ClassKey map format")
else:
# construct from codec
codec = constructor
self.pname = str(codec.read_str8())
self.cname = str(codec.read_str8())
- self.hash = codec.read_bin128()
+ self.hash = UUID(codec.read_bin128())
def encode(self, codec):
codec.write_str8(self.pname)
codec.write_str8(self.cname)
- codec.write_bin128(self.hash)
+ codec.write_bin128(self.hash.bytes)
+
+ def asMap(self):
+ return {'_package_name': self.pname, '_class_name': self.cname, '_hash': self.hash}
def getPackageName(self):
return self.pname
@@ -1202,7 +1388,7 @@ class ClassKey:
return self.hash
def getHashString(self):
- return "%08x-%08x-%08x-%08x" % struct.unpack ("!LLLL", self.hash)
+ return str(self.hash)
def getPackageKey(self):
return (self.cname, self.hash)
@@ -1210,6 +1396,10 @@ class ClassKey:
def __repr__(self):
return self.pname + ":" + self.cname + "(" + self.getHashString() + ")"
+
+#===================================================================================================
+# SchemaClass
+#===================================================================================================
class SchemaClass:
""" """
CLASS_KIND_TABLE = 1
@@ -1292,6 +1482,10 @@ class SchemaClass:
else:
return self.arguments + self.session.getSchema(self.superTypeKey).getArguments()
+
+#===================================================================================================
+# SchemaProperty
+#===================================================================================================
class SchemaProperty:
""" """
def __init__(self, codec):
@@ -1321,6 +1515,10 @@ class SchemaProperty:
def __repr__(self):
return self.name
+
+#===================================================================================================
+# SchemaStatistic
+#===================================================================================================
class SchemaStatistic:
""" """
def __init__(self, codec):
@@ -1337,6 +1535,10 @@ class SchemaStatistic:
def __repr__(self):
return self.name
+
+#===================================================================================================
+# SchemaMethod
+#===================================================================================================
class SchemaMethod:
""" """
def __init__(self, codec):
@@ -1365,6 +1567,10 @@ class SchemaMethod:
result += ")"
return result
+
+#===================================================================================================
+# SchemaArgument
+#===================================================================================================
class SchemaArgument:
""" """
def __init__(self, codec, methodArg):
@@ -1392,64 +1598,113 @@ class SchemaArgument:
elif key == "refPackage" : self.refPackage = value
elif key == "refClass" : self.refClass = value
+
+#===================================================================================================
+# ObjectId
+#===================================================================================================
class ObjectId:
""" Object that represents QMF object identifiers """
- def __init__(self, codec, first=0, second=0):
- if codec:
- self.first = codec.read_uint64()
- self.second = codec.read_uint64()
- else:
- self.first = first
- self.second = second
+ def __init__(self, constructor, first=0, second=0, agentName=None):
+ if constructor.__class__ == dict:
+ self.agentName = agentName
+ self.agentEpoch = 0
+ if '_agent_name' in constructor: self.agentName = constructor['_agent_name']
+ if '_agent_epoch' in constructor: self.agentEpoch = constructor['_agent_epoch']
+ if '_object_name' not in constructor:
+ raise Exception("QMFv2 OBJECT_ID must have the '_object_name' field.")
+ self.objectName = constructor['_object_name']
+ else:
+ if not constructor:
+ first = first
+ second = second
+ else:
+ first = constructor.read_uint64()
+ second = constructor.read_uint64()
+ self.agentName = str(first & 0x000000000FFFFFFF)
+ self.agentEpoch = (first & 0x0FFF000000000000) >> 48
+ self.objectName = str(second)
def __cmp__(self, other):
if other == None or not isinstance(other, ObjectId) :
return 1
- if self.first < other.first:
+
+ if self.objectName < other.objectName:
+ return -1
+ if self.objectName > other.objectName:
+ return 1
+
+ if self.agentName < other.agentName:
return -1
- if self.first > other.first:
+ if self.agentName > other.agentName:
return 1
- if self.second < other.second:
+
+ if self.agentEpoch < other.agentEpoch:
return -1
- if self.second > other.second:
+ if self.agentEpoch > other.agentEpoch:
return 1
return 0
def __repr__(self):
- return "%d-%d-%d-%d-%d" % (self.getFlags(), self.getSequence(),
+ return "%d-%d-%d-%s-%s" % (self.getFlags(), self.getSequence(),
self.getBrokerBank(), self.getAgentBank(), self.getObject())
+ def isV2(self):
+ return not self.agentName.isdigit()
+
def index(self):
- return (self.first, self.second)
+ return self.__repr__()
def getFlags(self):
- return (self.first & 0xF000000000000000) >> 60
+ return 0
def getSequence(self):
- return (self.first & 0x0FFF000000000000) >> 48
+ return self.agentEpoch
def getBrokerBank(self):
- return (self.first & 0x0000FFFFF0000000) >> 28
+ return 1
def getAgentBank(self):
- return self.first & 0x000000000FFFFFFF
+ return self.agentName
def getObject(self):
- return self.second
+ return self.objectName
def isDurable(self):
return self.getSequence() == 0
def encode(self, codec):
- codec.write_uint64(self.first)
- codec.write_uint64(self.second)
-
- def __hash__(self):
- return (self.first, self.second).__hash__()
+ first = (self.agentEpoch << 48) + (1 << 28)
+ second = 0
+
+ try:
+ first += int(self.agentName)
+ except:
+ pass
+
+ try:
+ second = int(self.objectName)
+ except:
+ pass
+
+ codec.write_uint64(first)
+ codec.write_uint64(second)
+
+ def asMap(self):
+ omap = {'_agent_name': self.agentName, '_object_name': self.objectName}
+ if self.agentEpoch != 0:
+ omap['_agent_epoch'] = self.agentEpoch
+ return omap
+
+ def __hash__(self):
+ return self.__repr__().__hash__()
def __eq__(self, other):
- return (self.first, self.second).__eq__(other)
+ return self.__repr__().__eq__(other)
+
+#===================================================================================================
+# MethodResult
+#===================================================================================================
class MethodResult(object):
""" """
def __init__(self, status, text, outArgs):
@@ -1465,6 +1720,10 @@ class MethodResult(object):
def __repr__(self):
return "%s (%d) - %s" % (self.text, self.status, self.outArgs)
+
+#===================================================================================================
+# ManagedConnection
+#===================================================================================================
class ManagedConnection(Thread):
""" Thread class for managing a connection. """
DELAY_MIN = 1
@@ -1527,6 +1786,10 @@ class ManagedConnection(Thread):
finally:
self.cv.release()
+
+#===================================================================================================
+# Broker
+#===================================================================================================
class Broker:
""" This object represents a connection (or potential connection) to a QMF broker. """
SYNC_TIME = 60
@@ -1542,6 +1805,7 @@ class Broker:
self.authUser = authUser
self.authPass = authPass
self.cv = Condition()
+ self.seqToAgentMap = {}
self.error = None
self.brokerId = None
self.connected = False
@@ -1574,9 +1838,13 @@ class Broker:
def getAgent(self, brokerBank, agentBank):
""" Return the agent object associated with a particular broker and agent bank value."""
- bankKey = (brokerBank, agentBank)
- if bankKey in self.agents:
- return self.agents[bankKey]
+ bankKey = str(agentBank)
+ try:
+ self.cv.acquire()
+ if bankKey in self.agents:
+ return self.agents[bankKey]
+ finally:
+ self.cv.release()
return None
def getSessionId(self):
@@ -1585,7 +1853,11 @@ class Broker:
def getAgents(self):
""" Get the list of agents reachable via this broker """
- return self.agents.values()
+ try:
+ self.cv.acquire()
+ return self.agents.values()
+ finally:
+ self.cv.release()
def getAmqpSession(self):
""" Get the AMQP session object for this connected broker. """
@@ -1612,10 +1884,29 @@ class Broker:
else:
return "Disconnected Broker"
+ def _setSequence(self, sequence, agent):
+ try:
+ self.cv.acquire()
+ self.seqToAgentMap[sequence] = agent
+ finally:
+ self.cv.release()
+
+ def _clearSequence(self, sequence):
+ try:
+ self.cv.acquire()
+ self.seqToAgentMap.pop(sequence)
+ finally:
+ self.cv.release()
+
def _tryToConnect(self):
try:
- self.agents = {}
- self.agents[(1,0)] = Agent(self, 0, "BrokerAgent")
+ try:
+ self.cv.acquire()
+ self.agents = {}
+ self.agents['0'] = Agent(self, 0, "BrokerAgent")
+ finally:
+ self.cv.release()
+
self.topicBound = False
self.syncInFlight = False
self.syncRequest = 0
@@ -1649,7 +1940,7 @@ class Broker:
self.amqpSession.message_subscribe(queue=self.replyName, destination="rdest",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
- self.amqpSession.incoming("rdest").listen(self._replyCb, self._exceptionCb)
+ self.amqpSession.incoming("rdest").listen(self._v1Cb, self._exceptionCb)
self.amqpSession.message_set_flow_mode(destination="rdest", flow_mode=1)
self.amqpSession.message_flow(destination="rdest", unit=0, value=0xFFFFFFFFL)
self.amqpSession.message_flow(destination="rdest", unit=1, value=0xFFFFFFFFL)
@@ -1659,11 +1950,29 @@ class Broker:
self.amqpSession.message_subscribe(queue=self.topicName, destination="tdest",
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
- self.amqpSession.incoming("tdest").listen(self._replyCb)
+ self.amqpSession.incoming("tdest").listen(self._v1Cb)
self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1)
self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFFL)
self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFFL)
+ ##
+ ## Set up connectivity for QMFv2
+ ##
+ self.v2_queue_name = "qmfc-v2-%s" % self.amqpSessionId
+ self.amqpSession.queue_declare(queue=self.v2_queue_name, exclusive=True, auto_delete=True)
+ self.amqpSession.exchange_bind(exchange="qmf.default.direct",
+ queue=self.v2_queue_name, binding_key=self.v2_queue_name)
+ self.amqpSession.exchange_bind(exchange="qmf.default.topic",
+ queue=self.v2_queue_name, binding_key="agent.#")
+ ## Other bindings here...
+ self.amqpSession.message_subscribe(queue=self.v2_queue_name, destination="v2dest",
+ accept_mode=self.amqpSession.accept_mode.none,
+ acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
+ self.amqpSession.incoming("v2dest").listen(self._v2Cb, self._exceptionCb)
+ self.amqpSession.message_set_flow_mode(destination="v2dest", flow_mode=1)
+ self.amqpSession.message_flow(destination="v2dest", unit=0, value=0xFFFFFFFFL)
+ self.amqpSession.message_flow(destination="v2dest", unit=1, value=0xFFFFFFFFL)
+
self.connected = True
self.session._handleBrokerConnect(self)
@@ -1671,6 +1980,7 @@ class Broker:
self._setHeader(codec, 'B')
msg = self._message(codec.encoded)
self._send(msg)
+ self._v2SendAgentLocate()
except socket.error, e:
self.error = "Socket Error %s - %s" % (e.__class__.__name__, e)
@@ -1683,18 +1993,73 @@ class Broker:
raise
def _updateAgent(self, obj):
- bankKey = (obj.brokerBank, obj.agentBank)
+ bankKey = str(obj.agentBank)
+ agent = None
if obj._deleteTime == 0:
- if bankKey not in self.agents:
- agent = Agent(self, obj.agentBank, obj.label)
- self.agents[bankKey] = agent
- if self.session.console != None:
- self.session.console.newAgent(agent)
+ try:
+ self.cv.acquire()
+ if bankKey not in self.agents:
+ agent = Agent(self, obj.agentBank, obj.label)
+ self.agents[bankKey] = agent
+ finally:
+ self.cv.release()
+ if agent and self.session.console:
+ self.session.console.newAgent(agent)
else:
- agent = self.agents.pop(bankKey, None)
- if agent != None and self.session.console != None:
+ try:
+ self.cv.acquire()
+ agent = self.agents.pop(bankKey, None)
+ if agent:
+ agent.close()
+ finally:
+ self.cv.release()
+ if agent and self.session.console:
self.session.console.delAgent(agent)
+ def _addAgent(self, name, agent):
+ try:
+ self.cv.acquire()
+ self.agents[name] = agent
+ finally:
+ self.cv.release()
+ if self.session.console:
+ self.session.console.newAgent(agent)
+
+ def _ageAgents(self):
+ try:
+ self.cv.acquire()
+ to_delete = []
+ to_notify = []
+ for key in self.agents:
+ if self.agents[key].isOld():
+ to_delete.append(key)
+ for key in to_delete:
+ agent = self.agents.pop(key)
+ agent.close()
+ to_notify.append(agent)
+ finally:
+ self.cv.release()
+ if self.session.console:
+ for agent in to_notify:
+ self.session.console.delAgent(agent)
+
+ def _v2SendAgentLocate(self, predicate={}):
+ """
+ Broadcast an agent-locate request to cause all agents in the domain to tell us who they are.
+ """
+ dp = self.amqpSession.delivery_properties()
+ dp.routing_key = "console.request.agent_locate"
+ mp = self.amqpSession.message_properties()
+ mp.content_type = "amqp/map"
+ mp.user_id = self.authUser
+ mp.app_id = "qmf2"
+ mp.reply_to = self.amqpSession.reply_to("qmf.default.direct", self.v2_queue_name)
+ mp.application_headers = {'qmf.opcode':'_agent_locate_request'}
+ sendCodec = Codec()
+ sendCodec.write_map(predicate)
+ msg = Message(dp, mp, sendCodec.encoded)
+ self._send(msg, "qmf.default.topic")
+
def _setHeader(self, codec, opcode, seq=0):
""" Compose the header of a management message. """
codec.write_uint8(ord('A'))
@@ -1785,24 +2150,105 @@ class Broker:
finally:
self.cv.release()
- def _replyCb(self, msg):
+ def _v1Cb(self, msg):
+ try:
+ self._v1CbProtected(msg)
+ except Exception, e:
+ print "EXCEPTION in Broker._v1Cb:", e
+
+ def _v1CbProtected(self, msg):
+ """
+ This is the general message handler for messages received via the QMFv1 exchanges.
+ """
+ agent = None
+ agent_addr = None
+ mp = msg.get("message_properties")
+ ah = mp.application_headers
+ if ah and 'qmf.agent' in ah:
+ agent_addr = ah['qmf.agent']
+
+ if not agent_addr:
+ #
+ # See if we can determine the agent identity from the routing key
+ #
+ dp = msg.get("delivery_properties")
+ rkey = None
+ if dp.routing_key:
+ rkey = dp.routing_key
+ items = rkey.split('.')
+ if len(items) >= 4:
+ if items[0] == 'console' and items[3].isdigit():
+ agent_addr = str(items[3]) # The QMFv1 Agent Bank
+ if agent_addr != None and agent_addr in self.agents:
+ agent = self.agents[agent_addr]
+
codec = Codec(msg.body)
+ alreadyTried = None
while True:
opcode, seq = self._checkHeader(codec)
+
+ if not agent and not alreadyTried:
+ alreadyTried = True
+ try:
+ self.cv.acquire()
+ if seq in self.seqToAgentMap:
+ agent = self.seqToAgentMap[seq]
+ finally:
+ self.cv.release()
+
if opcode == None: return
if opcode == 'b': self.session._handleBrokerResp (self, codec, seq)
elif opcode == 'p': self.session._handlePackageInd (self, codec, seq)
- elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq)
elif opcode == 'q': self.session._handleClassInd (self, codec, seq)
- elif opcode == 'm': self.session._handleMethodResp (self, codec, seq)
+ elif opcode == 's': self.session._handleSchemaResp (self, codec, seq, agent_addr)
elif opcode == 'h': self.session._handleHeartbeatInd (self, codec, seq, msg)
- elif opcode == 'e': self.session._handleEventInd (self, codec, seq)
- elif opcode == 's': self.session._handleSchemaResp (self, codec, seq)
- elif opcode == 'c': self.session._handleContentInd (self, codec, seq, prop=True)
- elif opcode == 'i': self.session._handleContentInd (self, codec, seq, stat=True)
- elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True)
- self.session.receiver._completed.add(msg.id)
- self.session.channel.session_completed(self.session.receiver._completed)
+ elif opcode == 'z': self.session._handleCommandComplete (self, codec, seq, agent)
+ elif agent:
+ agent._handleQmfV1Message(opcode, seq, mp, ah, codec)
+
+ self.amqpSession.receiver._completed.add(msg.id)
+ self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
+
+ def _v2Cb(self, msg):
+ """
+ This is the general message handler for messages received via QMFv2 exchanges.
+ """
+ mp = msg.get("message_properties")
+ ah = mp["application_headers"]
+ codec = Codec(msg.body)
+
+ if 'qmf.opcode' in ah:
+ opcode = ah['qmf.opcode']
+ if mp.content_type == "amqp/list":
+ content = codec.read_list()
+ if not content:
+ content = []
+ elif mp.content_type == "amqp/map":
+ content = codec.read_map()
+ if not content:
+ content = {}
+ else:
+ content = None
+
+ if content != None:
+ ##
+ ## Directly handle agent heartbeats and agent locate responses as these are broker-scope (they are
+ ## used to maintain the broker's list of agent proxies.
+ ##
+ if opcode == '_agent_heartbeat_indication': self.session._v2HandleHeartbeatInd(self, mp, ah, content)
+ elif opcode == '_agent_locate_response': self.session._v2HandleAgentLocateRsp(self, mp, ah, content)
+ else:
+ ##
+ ## All other opcodes are agent-scope and are forwarded to the agent proxy representing the sender
+ ## of the message.
+ ##
+ agent_addr = ah['qmf.agent']
+ if agent_addr in self.agents:
+ agent = self.agents[agent_addr]
+ agent._handleQmfV2Message(opcode, mp, ah, content)
+
+ self.amqpSession.receiver._completed.add(msg.id)
+ self.amqpSession.channel.session_completed(self.amqpSession.receiver._completed)
def _exceptionCb(self, data):
self.connected = False
@@ -1818,43 +2264,697 @@ class Broker:
if self.thread:
self.thread.disconnected()
+
+#===================================================================================================
+# Agent
+#===================================================================================================
class Agent:
- """ """
- def __init__(self, broker, agentBank, label):
+ """
+ This class represents a proxy for a remote agent being managed
+ """
+ def __init__(self, broker, agentBank, label, isV2=False, interval=0):
self.broker = broker
+ self.session = broker.session
+ self.schemaCache = self.session.schemaCache
self.brokerBank = broker.getBrokerBank()
- self.agentBank = agentBank
+ self.agentBank = str(agentBank)
self.label = label
+ self.isV2 = isV2
+ self.heartbeatInterval = interval
+ self.lock = Lock()
+ self.seqMgr = self.session.seqMgr
+ self.contextMap = {}
+ self.unsolicitedContext = RequestContext(self, self)
+ self.lastSeenTime = time()
+ self.closed = None
+
+
+ def _checkClosed(self):
+ if self.closed:
+ raise Exception("Agent is disconnected")
+
+
+ def __call__(self, **kwargs):
+ """
+ This is the handler for unsolicited stuff received from the agent
+ """
+ if 'qmf_object' in kwargs:
+ if self.session.console:
+ self.session.console.objectProps(self.broker, kwargs['qmf_object'])
+ elif 'qmf_object_stats' in kwargs:
+ if self.session.console:
+ self.session.console.objectStats(self.broker, kwargs['qmf_object_stats'])
+ elif 'qmf_event' in kwargs:
+ if self.session.console:
+ self.session.console.event(self.broker, kwargs['qmf_event'])
+
+
+ def touch(self):
+ self.lastSeenTime = time()
+
+
+ def isOld(self):
+ if self.heartbeatInterval == 0:
+ return None
+ if time() - self.lastSeenTime > (2.0 * self.heartbeatInterval):
+ return True
+ return None
+
+
+ def close(self):
+ self.closed = True
+ copy = {}
+ try:
+ self.lock.acquire()
+ for seq in self.contextMap:
+ copy[seq] = self.contextMap[seq]
+ finally:
+ self.lock.release()
+
+ for seq in copy:
+ context = copy[seq]
+ context.cancel("Agent disconnected")
+
def __repr__(self):
- return "Agent at bank %d.%d (%s)" % (self.brokerBank, self.agentBank, self.label)
+ if self.isV2:
+ ver = "v2"
+ else:
+ ver = "v1"
+ return "Agent(%s) at bank %d.%s (%s)" % (ver, self.brokerBank, self.agentBank, self.label)
+
def getBroker(self):
+ self._checkClosed()
return self.broker
+
def getBrokerBank(self):
+ self._checkClosed()
return self.brokerBank
+
def getAgentBank(self):
+ self._checkClosed()
return self.agentBank
+
+ def getObjects(self, notifiable=None, **kwargs):
+ """ Get a list of objects from QMF agents.
+ All arguments are passed by name(keyword).
+
+ If 'notifiable' is None (default), this call will block until completion or timeout.
+ If supplied, notifiable is assumed to be a callable object that will be called when the
+ list of queried objects arrives. The single argument to the call shall be a list of
+ the returned objects.
+
+ The class for queried objects may be specified in one of the following ways:
+
+ _schema = <schema> - supply a schema object returned from getSchema.
+ _key = <key> - supply a classKey from the list returned by getClasses.
+ _class = <name> - supply a class name as a string. If the class name exists
+ in multiple packages, a _package argument may also be supplied.
+ _objectId = <id> - get the object referenced by the object-id
+
+ The default timeout for this synchronous operation is 60 seconds. To change the timeout,
+ use the following argument:
+
+ _timeout = <time in seconds>
+
+ If additional arguments are supplied, they are used as property selectors. For example,
+ if the argument name="test" is supplied, only objects whose "name" property is "test"
+ will be returned in the result.
+ """
+ self._checkClosed()
+ if notifiable:
+ if not callable(notifiable):
+ raise Exception("notifiable object must be callable")
+
+ #
+ # Isolate the selectors from the kwargs
+ #
+ selectors = {}
+ for key in kwargs:
+ value = kwargs[key]
+ if key[0] != '_':
+ selectors[key] = value
+
+ #
+ # Allocate a context to track this asynchronous request.
+ #
+ context = RequestContext(self, notifiable, selectors)
+ sequence = self.seqMgr._reserve(context)
+ try:
+ self.lock.acquire()
+ self.contextMap[sequence] = context
+ context.setSequence(sequence)
+ finally:
+ self.lock.release()
+
+ #
+ # Compose and send the query message to the agent using the appropriate protocol for the
+ # agent's QMF version.
+ #
+ if self.isV2:
+ self._v2SendGetQuery(sequence, kwargs)
+ else:
+ self.broker._setSequence(sequence, self)
+ self._v1SendGetQuery(sequence, kwargs)
+
+ #
+ # If this is a synchronous call, block and wait for completion.
+ #
+ if not notifiable:
+ timeout = 60
+ if '_timeout' in kwargs:
+ timeout = kwargs['_timeout']
+ context.waitForSignal(timeout)
+ if context.exception:
+ raise Exception(context.exception)
+ result = context.queryResults
+ return result
+
+
+ def _clearContext(self, sequence):
+ try:
+ self.lock.acquire()
+ self.contextMap.pop(sequence)
+ finally:
+ self.lock.release()
+
+
+ def _schemaInfoFromV2Agent(self):
+ """
+ We have just received new schema information from this agent. Check to see if there's
+ more work that can now be done.
+ """
+ try:
+ self.lock.acquire()
+ copy_of_map = {}
+ for item in self.contextMap:
+ copy_of_map[item] = self.contextMap[item]
+ finally:
+ self.lock.release()
+
+ self.unsolicitedContext.reprocess()
+ for context in copy_of_map:
+ copy_of_map[context].reprocess()
+
+
+ def _handleV1Completion(self, sequence, code, text):
+ """
+ Called if one of this agent's V1 commands completed
+ """
+ context = None
+ try:
+ self.lock.acquire()
+ if sequence in self.contextMap:
+ context = self.contextMap[sequence]
+ finally:
+ self.lock.release()
+
+ if context:
+ if code != 0:
+ ex = "Error %d: %s" % (code, text)
+ context.setException(ex)
+ context.signal()
+ self.broker._clearSequence(sequence)
+
+
+ def _v1HandleMethodResp(self, codec, seq):
+ """
+ Handle a QMFv1 method response
+ """
+ code = codec.read_uint32()
+ text = codec.read_str16()
+ outArgs = {}
+ self.broker._clearSequence(seq)
+ pair = self.seqMgr._release(seq)
+ if pair == None:
+ return
+ method, synchronous = pair
+ if code == 0:
+ for arg in method.arguments:
+ if arg.dir.find("O") != -1:
+ outArgs[arg.name] = self.session._decodeValue(codec, arg.type, self.broker)
+ result = MethodResult(code, text, outArgs)
+ if synchronous:
+ try:
+ self.broker.cv.acquire()
+ self.broker.syncResult = result
+ self.broker.syncInFlight = False
+ self.broker.cv.notify()
+ finally:
+ self.broker.cv.release()
+ else:
+ if self.session.console:
+ self.session.console.methodResponse(self.broker, seq, result)
+
+
+ def _v1HandleEventInd(self, codec, seq):
+ """
+ Handle a QMFv1 event indication
+ """
+ event = Event(self, codec)
+ self.unsolicitedContext.doEvent(event)
+
+
+ def _v1HandleContentInd(self, codec, sequence, prop=False, stat=False):
+ """
+ Handle a QMFv1 content indication
+ """
+ classKey = ClassKey(codec)
+ schema = self.schemaCache.getSchema(classKey)
+ if not schema:
+ return
+
+ obj = Object(self, schema, codec, prop, stat)
+ if classKey.getPackageName() == "org.apache.qpid.broker" and classKey.getClassName() == "agent" and prop:
+ self.broker._updateAgent(obj)
+
+ context = self.unsolicitedContext
+ try:
+ self.lock.acquire()
+ if sequence in self.contextMap:
+ context = self.contextMap[sequence]
+ finally:
+ self.lock.release()
+
+ context.addV1QueryResult(obj)
+
+
+ def _v2HandleDataInd(self, mp, ah, content):
+ """
+ Handle a QMFv2 data indication from the agent
+ """
+ if mp.correlation_id:
+ try:
+ self.lock.acquire()
+ sequence = int(mp.correlation_id)
+ if sequence not in self.contextMap:
+ return
+ context = self.contextMap[sequence]
+ finally:
+ self.lock.release()
+ else:
+ context = self.unsolicitedContext
+
+ kind = "_data"
+ if "qmf.content" in ah:
+ kind = ah["qmf.content"]
+ if kind == "_data":
+ if content.__class__ != list:
+ return
+ for omap in content:
+ context.addV2QueryResult(omap)
+ context.processV2Data()
+
+ if 'partial' not in ah:
+ context.signal()
+
+
+ def _v2HandleMethodResp(self, mp, ah, content):
+ """
+ Handle a QMFv2 method response from the agent
+ """
+ context = None
+ sequence = None
+ if mp.correlation_id:
+ try:
+ self.lock.acquire()
+ seq = int(mp.correlation_id)
+ finally:
+ self.lock.release()
+ else:
+ return
+
+ pair = self.seqMgr._release(seq)
+ if pair == None:
+ return
+ method, synchronous = pair
+
+ result = MethodResult(0, 'OK', content['_arguments'])
+ if synchronous:
+ try:
+ self.broker.cv.acquire()
+ self.broker.syncResult = result
+ self.broker.syncInFlight = False
+ self.broker.cv.notify()
+ finally:
+ self.broker.cv.release()
+ else:
+ if self.session.console:
+ self.session.console.methodResponse(self.broker, seq, result)
+
+ def _v2HandleException(self, mp, ah, content):
+ """
+ Handle a QMFv2 exception
+ """
+ context = None
+ if mp.correlation_id:
+ try:
+ self.lock.acquire()
+ seq = int(mp.correlation_id)
+ finally:
+ self.lock.release()
+ else:
+ return
+
+ pair = self.seqMgr._release(seq)
+ if pair == None:
+ return
+ method, synchronous = pair
+
+ code = 7
+ text = ""
+ if '_status_code' in content:
+ code = content['_status_code']
+ if '_status_text' in content:
+ text = content['_status_text']
+ else:
+ text = content
+
+ result = MethodResult(code, text, {})
+ if synchronous:
+ try:
+ self.broker.cv.acquire()
+ self.broker.syncResult = result
+ self.broker.syncInFlight = False
+ self.broker.cv.notify()
+ finally:
+ self.broker.cv.release()
+ else:
+ if self.session.console:
+ self.session.console.methodResponse(self.broker, seq, result)
+
+
+ def _v1SendGetQuery(self, sequence, kwargs):
+ """
+ Send a get query to a QMFv1 agent.
+ """
+ #
+ # Build the query map
+ #
+ query = {}
+ if '_class' in kwargs:
+ query['_class'] = kwargs['_class']
+ if '_package' in kwargs:
+ query['_package'] = kwargs['_package']
+ elif '_key' in kwargs:
+ key = kwargs['_key']
+ query['_class'] = key.getClassName()
+ query['_package'] = key.getPackageName()
+ elif '_objectId' in kwargs:
+ query['_objectid'] = kwargs['_objectId'].__repr__()
+
+ #
+ # Construct and transmit the message
+ #
+ sendCodec = Codec()
+ self.broker._setHeader(sendCodec, 'G', sequence)
+ sendCodec.write_map(query)
+ smsg = self.broker._message(sendCodec.encoded, "agent.%d.%s" % (self.brokerBank, self.agentBank))
+ self.broker._send(smsg)
+
+
+ def _v2SendGetQuery(self, sequence, kwargs):
+ """
+ Send a get query to a QMFv2 agent.
+ """
+ #
+ # Build the query map
+ #
+ query = {'_what': 'OBJECT'}
+ if '_class' in kwargs:
+ schemaMap = {'_class_name': kwargs['_class']}
+ if '_package' in kwargs:
+ schemaMap['_package_name'] = kwargs['_package']
+ query['_schema_id'] = schemaMap
+ elif '_key' in kwargs:
+ query['_schema_id'] = kwargs['_key'].asMap()
+ elif '_objectId' in kwargs:
+ query['_object_id'] = kwargs['_objectId'].asMap()
+
+ #
+ # Construct and transmit the message
+ #
+ dp = self.broker.amqpSession.delivery_properties()
+ dp.routing_key = self.agentBank
+ mp = self.broker.amqpSession.message_properties()
+ mp.content_type = "amqp/map"
+ mp.user_id = self.broker.authUser
+ mp.correlation_id = str(sequence)
+ mp.app_id = "qmf2"
+ mp.reply_to = self.broker.amqpSession.reply_to("qmf.default.direct", self.broker.v2_queue_name)
+ mp.application_headers = {'qmf.opcode':'_query_request'}
+ sendCodec = Codec()
+ sendCodec.write_map(query)
+ msg = Message(dp, mp, sendCodec.encoded)
+ self.broker._send(msg, "qmf.default.direct")
+
+
+ def _v2SendSchemaRequest(self, schemaId):
+ """
+ Send a query to an agent to request details on a particular schema class.
+ IMPORTANT: This function currently sends a QMFv1 schema-request to the address of
+ the agent. The agent will send its response to amq.direct/<our-key>.
+ Eventually, this will be converted to a proper QMFv2 schema query.
+ """
+ sendCodec = Codec()
+ seq = self.seqMgr._reserve(None)
+ self.broker._setHeader(sendCodec, 'S', seq)
+ schemaId.encode(sendCodec)
+ smsg = self.broker._message(sendCodec.encoded, self.agentBank)
+ self.broker._send(smsg, "qmf.default.direct")
+
+
+ def _handleQmfV1Message(self, opcode, seq, mp, ah, codec):
+ """
+ Process QMFv1 messages arriving from an agent.
+ """
+ if opcode == 'm': self._v1HandleMethodResp(codec, seq)
+ elif opcode == 'e': self._v1HandleEventInd(codec, seq)
+ elif opcode == 'c': self._v1HandleContentInd(codec, seq, prop=True)
+ elif opcode == 'i': self._v1HandleContentInd(codec, seq, stat=True)
+ elif opcode == 'g': self._v1HandleContentInd(codec, seq, prop=True, stat=True)
+
+
+ def _handleQmfV2Message(self, opcode, mp, ah, content):
+ """
+ Process QMFv2 messages arriving from an agent.
+ """
+ if opcode == '_data_indication': self._v2HandleDataInd(mp, ah, content)
+ elif opcode == '_query_response': self._v2HandleDataInd(mp, ah, content)
+ elif opcode == '_method_response': self._v2HandleMethodResp(mp, ah, content)
+ elif opcode == '_exception': self._v2HandleException(mp, ah, content)
+
+
+#===================================================================================================
+# RequestContext
+#===================================================================================================
+class RequestContext(object):
+ """
+ This class tracks an asynchronous request sent to an agent.
+ TODO: Add logic for client-side selection and filtering deleted objects from get-queries
+ """
+ def __init__(self, agent, notifiable, selectors={}):
+ self.sequence = None
+ self.agent = agent
+ self.schemaCache = self.agent.schemaCache
+ self.notifiable = notifiable
+ self.selectors = selectors
+ self.startTime = time()
+ self.rawQueryResults = []
+ self.queryResults = []
+ self.exception = None
+ self.waitingForSchema = None
+ self.pendingSignal = None
+ self.cv = Condition()
+ self.blocked = notifiable == None
+
+
+ def setSequence(self, sequence):
+ self.sequence = sequence
+
+
+ def addV1QueryResult(self, data):
+ values = {}
+ for prop, val in data.getProperties():
+ values[prop.name] = val
+ for stat, val in data.getStatistics():
+ values[stat.name] = val
+ for key in values:
+ val = values[key]
+ if key in self.selectors and val != self.selectors[key]:
+ return
+
+ if self.notifiable:
+ self.notifiable(qmf_object=data)
+ else:
+ self.queryResults.append(data)
+
+
+ def addV2QueryResult(self, data):
+ values = data['_values']
+ for key in values:
+ val = values[key]
+ if key in self.selectors and val != self.selectors[key]:
+ return
+ self.rawQueryResults.append(data)
+
+
+ def doEvent(self, data):
+ if self.notifiable:
+ self.notifiable(qmf_event=data)
+
+
+ def setException(self, ex):
+ self.exception = ex
+
+
+ def getAge(self):
+ return time() - self.startTime
+
+
+ def cancel(self, exception):
+ self.setException(exception)
+ try:
+ self.cv.acquire()
+ self.blocked = None
+ self.waitingForSchema = None
+ self.cv.notify()
+ finally:
+ self.cv.release()
+ self._complete()
+
+
+ def waitForSignal(self, timeout):
+ try:
+ self.cv.acquire()
+ while self.blocked:
+ if (time() - self.startTime) > timeout:
+ self.exception = "Request timed out after %d seconds" % timeout
+ return
+ self.cv.wait(1)
+ finally:
+ self.cv.release()
+
+
+ def signal(self):
+ try:
+ self.cv.acquire()
+ if self.waitingForSchema:
+ self.pendingSignal = True
+ return
+ else:
+ self.blocked = None
+ self.cv.notify()
+ finally:
+ self.cv.release()
+ self._complete()
+
+
+ def _complete(self):
+ if self.notifiable:
+ if self.exception:
+ self.notifiable(qmf_exception=self.exception)
+ else:
+ self.notifiable(qmf_complete=True)
+
+ if self.sequence:
+ self.agent._clearContext(self.sequence)
+
+
+ def processV2Data(self):
+ """
+ Attempt to make progress on the entries in the raw_query_results queue. If an entry has a schema
+ that is in our schema cache, process it. Otherwise, send a request for the schema information
+ to the agent that manages the object.
+ """
+ schemaId = None
+ queryResults = []
+ try:
+ self.cv.acquire()
+ if self.waitingForSchema:
+ return
+ while (not self.waitingForSchema) and len(self.rawQueryResults) > 0:
+ head = self.rawQueryResults[0]
+ schemaId = self._getSchemaIdforV2ObjectLH(head)
+ schema = self.schemaCache.getSchema(schemaId)
+ if schema:
+ obj = Object(self.agent, schema, v2Map=head, agentName=self.agent.agentBank)
+ queryResults.append(obj)
+ self.rawQueryResults.pop(0)
+ else:
+ self.waitingForSchema = True
+ finally:
+ self.cv.release()
+
+ if self.waitingForSchema:
+ self.agent._v2SendSchemaRequest(schemaId)
+
+ for result in queryResults:
+ if self.notifiable:
+ self.notifiable(qmf_object=result)
+ else:
+ self.queryResults.append(result)
+
+ complete = None
+ try:
+ self.cv.acquire()
+ if not self.waitingForSchema and self.pendingSignal:
+ self.blocked = None
+ self.cv.notify()
+ complete = True
+ finally:
+ self.cv.release()
+
+ if complete:
+ self._complete()
+
+
+ def reprocess(self):
+ """
+ New schema information has been added to the schema-cache. Clear our 'waiting' status
+ and see if we can make more progress on the raw query list.
+ """
+ try:
+ self.cv.acquire()
+ self.waitingForSchema = None
+ finally:
+ self.cv.release()
+ self.processV2Data()
+
+
+ def _getSchemaIdforV2ObjectLH(self, data):
+ """
+ Given a data map, extract the schema-identifier.
+ """
+ if data.__class__ != dict:
+ return None
+ if '_schema_id' in data:
+ return ClassKey(data['_schema_id'])
+ return None
+
+
+#===================================================================================================
+# Event
+#===================================================================================================
class Event:
""" """
- def __init__(self, session, broker, codec):
- self.session = session
- self.broker = broker
+ def __init__(self, agent, codec):
+ self.agent = agent
+ self.session = agent.session
+ self.broker = agent.broker
self.classKey = ClassKey(codec)
self.timestamp = codec.read_int64()
self.severity = codec.read_uint8()
- self.schema = None
- pname = self.classKey.getPackageName()
- pkey = self.classKey.getPackageKey()
- if pname in session.packages:
- if pkey in session.packages[pname]:
- self.schema = session.packages[pname][pkey]
- self.arguments = {}
- for arg in self.schema.arguments:
- self.arguments[arg.name] = session._decodeValue(codec, arg.type, broker)
+ self.arguments = {}
+ self.schema = self.session.schemaCache.getSchema(self.classKey)
+ if not self.schema:
+ return
+ for arg in self.schema.arguments:
+ self.arguments[arg.name] = self.session._decodeValue(codec, arg.type, self.broker)
def __repr__(self):
if self.schema == None:
@@ -1895,6 +2995,10 @@ class Event:
def getSchema(self):
return self.schema
+
+#===================================================================================================
+# SequenceManager
+#===================================================================================================
class SequenceManager:
""" Manage sequence numbers for asynchronous method calls """
def __init__(self):
@@ -1926,6 +3030,9 @@ class SequenceManager:
return data
+#===================================================================================================
+# DebugConsole
+#===================================================================================================
class DebugConsole(Console):
""" """
def brokerConnected(self, broker):
Modified: qpid/trunk/qpid/tools/src/py/qpid-cluster
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-cluster?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-cluster (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-cluster Wed Mar 31 21:13:12 2010
@@ -94,7 +94,7 @@ class BrokerManager:
self.broker = self.qmf.addBroker(brokerUrl, self.config._connTimeout)
agents = self.qmf.getAgents()
for a in agents:
- if a.getAgentBank() == 0:
+ if a.getAgentBank() == '0':
self.brokerAgent = a
def Disconnect(self):
Modified: qpid/trunk/qpid/tools/src/py/qpid-config
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-config?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-config (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-config Wed Mar 31 21:13:12 2010
@@ -189,7 +189,7 @@ class BrokerManager:
self.broker = self.qmf.addBroker(brokerUrl, _connTimeout)
agents = self.qmf.getAgents()
for a in agents:
- if a.getAgentBank() == 0:
+ if a.getAgentBank() == '0':
self.brokerAgent = a
def Disconnect(self):
Modified: qpid/trunk/qpid/tools/src/py/qpid-stat
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-stat?rev=929716&r1=929715&r2=929716&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-stat (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-stat Wed Mar 31 21:13:12 2010
@@ -91,7 +91,7 @@ class Broker(object):
agents = qmf.getAgents()
for a in agents:
- if a.getAgentBank() == 0:
+ if a.getAgentBank() == '0':
self.brokerAgent = a
bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _agent=self.brokerAgent)[0]
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org