You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/11/21 14:53:54 UTC
svn commit: r719580 - in /incubator/qpid/trunk/qpid/python: qmf/console.py
tests_0-10/management.py
Author: tross
Date: Fri Nov 21 05:53:53 2008
New Revision: 719580
URL: http://svn.apache.org/viewvc?rev=719580&view=rev
Log:
code cleanup on qmf console API
Formalized schema-class as an object with an all-string __repr__
Added additional testing for the console API
Modified:
incubator/qpid/trunk/qpid/python/qmf/console.py
incubator/qpid/trunk/qpid/python/tests_0-10/management.py
Modified: incubator/qpid/trunk/qpid/python/qmf/console.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qmf/console.py?rev=719580&r1=719579&r2=719580&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qmf/console.py (original)
+++ incubator/qpid/trunk/qpid/python/qmf/console.py Fri Nov 21 05:53:53 2008
@@ -171,11 +171,11 @@
def __repr__(self):
return "QMF Console Session Manager (brokers connected: %d)" % len(self.brokers)
- def addBroker(self, target="localhost"):
+ def addBroker(self, target="localhost", initialTopicCredits=0xFFFFFFFF):
""" Connect to a Qpid broker. Returns an object of type Broker. """
url = BrokerURL(target)
broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass,
- ssl = url.scheme == URL.AMQPS)
+ ssl = url.scheme == URL.AMQPS, topicCredits=initialTopicCredits)
if not broker.isConnected and not self.manageConnections:
raise Exception(broker.error)
@@ -205,18 +205,19 @@
broker._waitForStable()
list = []
if packageName in self.packages:
- for cname, hash in self.packages[packageName]:
- list.append((packageName, cname, hash))
+ for pkey in self.packages[packageName]:
+ list.append(self.packages[packageName][pkey].getKey())
return list
def getSchema(self, classKey):
""" Get the schema for a QMF class """
for broker in self.brokers:
broker._waitForStable()
- pname, cname, hash = classKey
+ pname = classKey.getPackageName()
+ pkey = classKey.getPackageKey()
if pname in self.packages:
- if (cname, hash) in self.packages[pname]:
- return self.packages[pname][(cname, hash)]
+ if pkey in self.packages[pname]:
+ return self.packages[pname][pkey]
def bindPackage(self, packageName):
""" """
@@ -230,7 +231,8 @@
""" """
if not self.userBindings or not self.rcvObjects:
raise Exception("userBindings option not set for Session")
- pname, cname, hash = classKey
+ pname = classKey.getPackageName()
+ cname = classKey.getClassName()
for broker in self.brokers:
broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
binding_key="console.obj.*.*.%s.%s.#" % (pname, cname))
@@ -297,14 +299,17 @@
for agent in broker.getAgents():
agentList.append(agent)
+ pname = None
cname = None
- if "_schema" in kwargs: pname, cname, hash = kwargs["_schema"].getKey()
- elif "_key" in kwargs: pname, cname, hash = kwargs["_key"]
+ hash = None
+ classKey = None
+ if "_schema" in kwargs: classKey = kwargs["_schema"].getKey()
+ elif "_key" in kwargs: classKey = kwargs["_key"]
elif "_class" in kwargs:
- pname, cname, hash = None, kwargs["_class"], None
+ cname = kwargs["_class"]
if "_package" in kwargs:
pname = kwargs["_package"]
- if cname == None and "_objectId" not in kwargs:
+ if cname == None and classKey == None and "_objectId" not in kwargs:
raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument")
map = {}
@@ -312,6 +317,10 @@
if "_objectId" in kwargs:
map["_objectid"] = kwargs["_objectId"].__repr__()
else:
+ if cname == None:
+ cname = classKey.getClassName()
+ pname = classKey.getPackageName()
+ hash = classKey.getHash()
map["_class"] = cname
if pname != None: map["_package"] = pname
if hash != None: map["_hash"] = hash
@@ -442,15 +451,13 @@
def _handleClassInd(self, broker, codec, seq):
kind = codec.read_uint8()
- pname = str(codec.read_str8())
- cname = str(codec.read_str8())
- hash = codec.read_bin128()
+ classKey = ClassKey(codec)
unknown = False
try:
self.cv.acquire()
- if pname in self.packages:
- if (cname, hash) not in self.packages[pname]:
+ if classKey.getPackageName() in self.packages:
+ if classKey.getPackageKey() not in self.packages[classKey.getPackageName()]:
unknown = True
finally:
self.cv.release()
@@ -461,9 +468,7 @@
sendCodec = Codec(broker.conn.spec)
seq = self.seqMgr._reserve(self._CONTEXT_STARTUP)
broker._setHeader(sendCodec, 'S', seq)
- sendCodec.write_str8(pname)
- sendCodec.write_str8(cname)
- sendCodec.write_bin128(hash)
+ classKey.encode(sendCodec)
smsg = broker._message(sendCodec.encoded)
broker._send(smsg)
@@ -512,14 +517,11 @@
def _handleSchemaResp(self, broker, codec, seq):
kind = codec.read_uint8()
- pname = str(codec.read_str8())
- cname = str(codec.read_str8())
- hash = codec.read_bin128()
- classKey = (pname, cname, hash)
+ classKey = ClassKey(codec)
_class = SchemaClass(kind, classKey, codec)
try:
self.cv.acquire()
- self.packages[pname][(cname, hash)] = _class
+ self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class
finally:
self.cv.release()
@@ -529,22 +531,21 @@
self.console.newClass(kind, classKey)
def _handleContentInd(self, broker, codec, seq, prop=False, stat=False):
- pname = str(codec.read_str8())
- cname = str(codec.read_str8())
- hash = codec.read_bin128()
- classKey = (pname, cname, hash)
+ classKey = ClassKey(codec)
try:
self.cv.acquire()
+ pname = classKey.getPackageName()
if pname not in self.packages:
return
- if (cname, hash) not in self.packages[pname]:
+ pkey = classKey.getPackageKey()
+ if pkey not in self.packages[pname]:
return
- schema = self.packages[pname][(cname, hash)]
+ schema = self.packages[pname][pkey]
finally:
self.cv.release()
object = Object(self, broker, schema, codec, prop, stat)
- if pname == "org.apache.qpid.broker" and cname == "agent":
+ if pname == "org.apache.qpid.broker" and classKey.getClassName() == "agent":
broker._updateAgent(object)
try:
@@ -664,10 +665,7 @@
seq = self.seqMgr._reserve((method, False))
broker._setHeader(sendCodec, 'M', seq)
objectId.encode(sendCodec)
- pname, cname, hash = schemaKey
- sendCodec.write_str8(pname)
- sendCodec.write_str8(cname)
- sendCodec.write_bin128(hash)
+ schemaKey.encode(sendCodec)
sendCodec.write_str8(name)
count = 0
@@ -692,6 +690,36 @@
def __init__(self, name):
self.name = name
+class ClassKey:
+ """ """
+ def __init__(self, codec):
+ self.pname = str(codec.read_str8())
+ self.cname = str(codec.read_str8())
+ self.hash = codec.read_bin128()
+
+ def encode(self, codec):
+ codec.write_str8(self.pname)
+ codec.write_str8(self.cname)
+ codec.write_bin128(self.hash)
+
+ def getPackageName(self):
+ return self.pname
+
+ def getClassName(self):
+ return self.cname
+
+ def getHash(self):
+ return self.hash
+
+ def getHashString(self):
+ return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", self.hash)
+
+ def getPackageKey(self):
+ return (self.cname, self.hash)
+
+ def __repr__(self):
+ return self.pname + ":" + self.cname + "(" + self.getHashString() + ")"
+
class SchemaClass:
""" """
CLASS_KIND_TABLE = 1
@@ -722,15 +750,13 @@
self.arguments.append(SchemaArgument(codec, methodArg=False))
def __repr__(self):
- pname, cname, hash = self.classKey
if self.kind == self.CLASS_KIND_TABLE:
kindStr = "Table"
elif self.kind == self.CLASS_KIND_EVENT:
kindStr = "Event"
else:
kindStr = "Unsupported"
- result = "%s Class: %s:%s " % (kindStr, pname, cname)
- result += "(%08x-%04x-%04x-%04x-%04x%08x)" % struct.unpack ("!LHHHHL", hash)
+ result = "%s Class: %s " % (kindStr, self.classKey.__repr__())
return result
def getKey(self):
@@ -1003,10 +1029,7 @@
seq = self._session.seqMgr._reserve((method, synchronous))
self._broker._setHeader(sendCodec, 'M', seq)
self._objectId.encode(sendCodec)
- pname, cname, hash = self._schema.getKey()
- sendCodec.write_str8(pname)
- sendCodec.write_str8(cname)
- sendCodec.write_bin128(hash)
+ self._schema.getKey().encode(sendCodec)
sendCodec.write_str8(name)
count = 0
@@ -1085,14 +1108,16 @@
""" """
SYNC_TIME = 60
- def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False):
+ def __init__(self, session, host, port, authMech, authUser, authPass,
+ ssl=False, topicCredits=0xFFFFFFFF):
self.session = session
- self.host = host
- self.port = port
+ self.host = host
+ self.port = port
self.ssl = ssl
self.authUser = authUser
self.authPass = authPass
- self.agents = {}
+ self.topicCredits = topicCredits
+ self.agents = {}
self.agents["1.0"] = Agent(self, "1.0", "BrokerAgent")
self.topicBound = False
self.cv = Condition()
@@ -1100,8 +1125,8 @@
self.syncRequest = 0
self.syncResult = None
self.reqsOutstanding = 1
- self.error = None
- self.brokerId = None
+ self.error = None
+ self.brokerId = None
self.isConnected = False
self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid())
self._tryToConnect()
@@ -1152,6 +1177,9 @@
auth = ""
return "amqp%s://%s%s:%d" % (ssl, auth, self.host, self.port or 5672)
+ def replenishCredits(self, credits):
+ self.amqpSession.message_flow(destination="tdest", unit=0, value=credits)
+
def __repr__(self):
if self.isConnected:
return "Broker connected at: %s" % self.getUrl()
@@ -1185,8 +1213,8 @@
accept_mode=self.amqpSession.accept_mode.none,
acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
self.amqpSession.incoming("tdest").listen(self._replyCb)
- self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1)
- self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF)
+ self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=0)
+ self.amqpSession.message_flow(destination="tdest", unit=0, value=self.topicCredits)
self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFF)
self.isConnected = True
@@ -1351,16 +1379,15 @@
def __init__(self, session, broker, codec):
self.session = session
self.broker = broker
- pname = str(codec.read_str8())
- cname = str(codec.read_str8())
- hash = codec.read_bin128()
- self.classKey = (pname, cname, hash)
+ self.classKey = ClassKey(codec)
self.timestamp = codec.read_int64()
self.severity = codec.read_uint8()
self.schema = None
+ pname = self.classKey.getPackageName()
+ pkey = self.classKey.getPackageKey()
if pname in session.packages:
- if (cname, hash) in session.packages[pname]:
- self.schema = session.packages[pname][(cname, hash)]
+ if pkey in session.packages[pname]:
+ self.schema = session.packages[pname][pkey]
self.arguments = {}
for arg in self.schema.arguments:
self.arguments[arg.name] = session._decodeValue(codec, arg.type)
@@ -1369,7 +1396,7 @@
if self.schema == None:
return "<uninterpretable>"
out = strftime("%c", gmtime(self.timestamp / 1000000000))
- out += " " + self._sevName() + " " + self.classKey[0] + ":" + self.classKey[1]
+ out += " " + self._sevName() + " " + self.classKey.getPackageName() + ":" + self.classKey.getClassName()
out += " broker=" + self.broker.getUrl()
for arg in self.schema.arguments:
disp = self.session._displayValue(self.arguments[arg.name], arg.type).encode("utf8")
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/management.py?rev=719580&r1=719579&r2=719580&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/management.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/management.py Fri Nov 21 05:53:53 2008
@@ -71,10 +71,27 @@
self.assertEqual (res.sequence, seq)
self.assertEqual (res.body, body)
- def test_system_object (self):
+ def test_get_objects(self):
self.startQmf()
- systems = self.qmf.getObjects(_class="system")
- self.assertEqual (len (systems), 1)
+
+ # get the package list, verify that the qpid broker package is there
+ packages = self.qmf.getPackages()
+ assert 'org.apache.qpid.broker' in packages
+
+ # get the schema class keys for the broker, verify the broker table and link-down event
+ keys = self.qmf.getClasses('org.apache.qpid.broker')
+ broker = None
+ linkDown = None
+ for key in keys:
+ if key.getClassName() == "broker": broker = key
+ if key.getClassName() == "brokerLinkDown" : linkDown = key
+ assert broker
+ assert linkDown
+
+ brokerObjs = self.qmf.getObjects(_class="broker")
+ assert len(brokerObjs) == 1
+ brokerObjs = self.qmf.getObjects(_key=broker)
+ assert len(brokerObjs) == 1
def test_self_session_id (self):
self.startQmf()