You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/11/21 14:53:54 UTC

svn commit: r719580 - in /incubator/qpid/trunk/qpid/python: qmf/console.py tests_0-10/management.py

Author: tross
Date: Fri Nov 21 05:53:53 2008
New Revision: 719580

URL: http://svn.apache.org/viewvc?rev=719580&view=rev
Log:
code cleanup on qmf console API
Formalized schema-class as an object with an all-string __repr__
Added additional testing for the console API

Modified:
    incubator/qpid/trunk/qpid/python/qmf/console.py
    incubator/qpid/trunk/qpid/python/tests_0-10/management.py

Modified: incubator/qpid/trunk/qpid/python/qmf/console.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qmf/console.py?rev=719580&r1=719579&r2=719580&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qmf/console.py (original)
+++ incubator/qpid/trunk/qpid/python/qmf/console.py Fri Nov 21 05:53:53 2008
@@ -171,11 +171,11 @@
   def __repr__(self):
     return "QMF Console Session Manager (brokers connected: %d)" % len(self.brokers)
 
-  def addBroker(self, target="localhost"):
+  def addBroker(self, target="localhost", initialTopicCredits=0xFFFFFFFF):
     """ Connect to a Qpid broker.  Returns an object of type Broker. """
     url = BrokerURL(target)
     broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass,
-                    ssl = url.scheme == URL.AMQPS)
+                    ssl = url.scheme == URL.AMQPS, topicCredits=initialTopicCredits)
     if not broker.isConnected and not self.manageConnections:
       raise Exception(broker.error)
 
@@ -205,18 +205,19 @@
       broker._waitForStable()
     list = []
     if packageName in self.packages:
-      for cname, hash in self.packages[packageName]:
-        list.append((packageName, cname, hash))
+      for pkey in self.packages[packageName]:
+        list.append(self.packages[packageName][pkey].getKey())
     return list
 
   def getSchema(self, classKey):
     """ Get the schema for a QMF class """
     for broker in self.brokers:
       broker._waitForStable()
-    pname, cname, hash = classKey
+    pname = classKey.getPackageName()
+    pkey = classKey.getPackageKey()
     if pname in self.packages:
-      if (cname, hash) in self.packages[pname]:
-        return self.packages[pname][(cname, hash)]
+      if pkey in self.packages[pname]:
+        return self.packages[pname][pkey]
 
   def bindPackage(self, packageName):
     """ """
@@ -230,7 +231,8 @@
     """ """
     if not self.userBindings or not self.rcvObjects:
       raise Exception("userBindings option not set for Session")
-    pname, cname, hash = classKey
+    pname = classKey.getPackageName()
+    cname = classKey.getClassName()
     for broker in self.brokers:
       broker.amqpSession.exchange_bind(exchange="qpid.management", queue=broker.topicName,
                                        binding_key="console.obj.*.*.%s.%s.#" % (pname, cname))
@@ -297,14 +299,17 @@
         for agent in broker.getAgents():
           agentList.append(agent)
 
+    pname = None
     cname = None
-    if   "_schema" in kwargs: pname, cname, hash = kwargs["_schema"].getKey()
-    elif "_key"    in kwargs: pname, cname, hash = kwargs["_key"]
+    hash = None
+    classKey = None
+    if   "_schema" in kwargs: classKey = kwargs["_schema"].getKey()
+    elif "_key"    in kwargs: classKey = kwargs["_key"]
     elif "_class"  in kwargs:
-      pname, cname, hash = None, kwargs["_class"], None
+      cname = kwargs["_class"]
       if "_package" in kwargs:
         pname = kwargs["_package"]
-    if cname == None and "_objectId" not in kwargs:
+    if cname == None and classKey == None and "_objectId" not in kwargs:
       raise Exception("No class supplied, use '_schema', '_key', '_class', or '_objectId' argument")
 
     map = {}
@@ -312,6 +317,10 @@
     if "_objectId" in kwargs:
       map["_objectid"] = kwargs["_objectId"].__repr__()
     else:
+      if cname == None:
+        cname = classKey.getClassName()
+        pname = classKey.getPackageName()
+        hash = classKey.getHash()
       map["_class"] = cname
       if pname != None: map["_package"] = pname
       if hash  != None: map["_hash"]    = hash
@@ -442,15 +451,13 @@
 
   def _handleClassInd(self, broker, codec, seq):
     kind  = codec.read_uint8()
-    pname = str(codec.read_str8())
-    cname = str(codec.read_str8())
-    hash  = codec.read_bin128()
+    classKey = ClassKey(codec)
     unknown = False
 
     try:
       self.cv.acquire()
-      if pname in self.packages:
-        if (cname, hash) not in self.packages[pname]:
+      if classKey.getPackageName() in self.packages:
+        if classKey.getPackageKey() not in self.packages[classKey.getPackageName()]:
           unknown = True
     finally:
       self.cv.release()
@@ -461,9 +468,7 @@
       sendCodec = Codec(broker.conn.spec)
       seq = self.seqMgr._reserve(self._CONTEXT_STARTUP)
       broker._setHeader(sendCodec, 'S', seq)
-      sendCodec.write_str8(pname)
-      sendCodec.write_str8(cname)
-      sendCodec.write_bin128(hash)
+      classKey.encode(sendCodec)
       smsg = broker._message(sendCodec.encoded)
       broker._send(smsg)
 
@@ -512,14 +517,11 @@
 
   def _handleSchemaResp(self, broker, codec, seq):
     kind  = codec.read_uint8()
-    pname = str(codec.read_str8())
-    cname = str(codec.read_str8())
-    hash  = codec.read_bin128()
-    classKey = (pname, cname, hash)
+    classKey = ClassKey(codec)
     _class = SchemaClass(kind, classKey, codec)
     try:
       self.cv.acquire()
-      self.packages[pname][(cname, hash)] = _class
+      self.packages[classKey.getPackageName()][classKey.getPackageKey()] = _class
     finally:
       self.cv.release()
       
@@ -529,22 +531,21 @@
       self.console.newClass(kind, classKey)
 
   def _handleContentInd(self, broker, codec, seq, prop=False, stat=False):
-    pname = str(codec.read_str8())
-    cname = str(codec.read_str8())
-    hash  = codec.read_bin128()
-    classKey = (pname, cname, hash)
+    classKey = ClassKey(codec)
     try:
       self.cv.acquire()
+      pname = classKey.getPackageName()
       if pname not in self.packages:
         return
-      if (cname, hash) not in self.packages[pname]:
+      pkey = classKey.getPackageKey()
+      if pkey not in self.packages[pname]:
         return
-      schema = self.packages[pname][(cname, hash)]
+      schema = self.packages[pname][pkey]
     finally:
       self.cv.release()
 
     object = Object(self, broker, schema, codec, prop, stat)
-    if pname == "org.apache.qpid.broker" and cname == "agent":
+    if pname == "org.apache.qpid.broker" and classKey.getClassName() == "agent":
       broker._updateAgent(object)
 
     try:
@@ -664,10 +665,7 @@
         seq = self.seqMgr._reserve((method, False))
         broker._setHeader(sendCodec, 'M', seq)
         objectId.encode(sendCodec)
-        pname, cname, hash = schemaKey
-        sendCodec.write_str8(pname)
-        sendCodec.write_str8(cname)
-        sendCodec.write_bin128(hash)
+        schemaKey.encode(sendCodec)
         sendCodec.write_str8(name)
 
         count = 0
@@ -692,6 +690,36 @@
   def __init__(self, name):
     self.name = name
 
+class ClassKey:
+  """ """
+  def __init__(self, codec):
+    self.pname = str(codec.read_str8())
+    self.cname = str(codec.read_str8())
+    self.hash  = codec.read_bin128()
+
+  def encode(self, codec):
+    codec.write_str8(self.pname)
+    codec.write_str8(self.cname)
+    codec.write_bin128(self.hash)
+
+  def getPackageName(self):
+    return self.pname
+
+  def getClassName(self):
+    return self.cname
+
+  def getHash(self):
+    return self.hash
+
+  def getHashString(self):
+    return "%08x-%04x-%04x-%04x-%04x%08x" % struct.unpack ("!LHHHHL", self.hash)
+
+  def getPackageKey(self):
+    return (self.cname, self.hash)
+
+  def __repr__(self):
+    return self.pname + ":" + self.cname + "(" + self.getHashString() + ")"
+
 class SchemaClass:
   """ """
   CLASS_KIND_TABLE = 1
@@ -722,15 +750,13 @@
         self.arguments.append(SchemaArgument(codec, methodArg=False))
 
   def __repr__(self):
-    pname, cname, hash = self.classKey
     if self.kind == self.CLASS_KIND_TABLE:
       kindStr = "Table"
     elif self.kind == self.CLASS_KIND_EVENT:
       kindStr = "Event"
     else:
       kindStr = "Unsupported"
-    result = "%s Class: %s:%s " % (kindStr, pname, cname)
-    result += "(%08x-%04x-%04x-%04x-%04x%08x)" % struct.unpack ("!LHHHHL", hash)
+    result = "%s Class: %s " % (kindStr, self.classKey.__repr__())
     return result
 
   def getKey(self):
@@ -1003,10 +1029,7 @@
         seq = self._session.seqMgr._reserve((method, synchronous))
         self._broker._setHeader(sendCodec, 'M', seq)
         self._objectId.encode(sendCodec)
-        pname, cname, hash = self._schema.getKey()
-        sendCodec.write_str8(pname)
-        sendCodec.write_str8(cname)
-        sendCodec.write_bin128(hash)
+        self._schema.getKey().encode(sendCodec)
         sendCodec.write_str8(name)
 
         count = 0
@@ -1085,14 +1108,16 @@
   """ """
   SYNC_TIME = 60
 
-  def __init__(self, session, host, port, authMech, authUser, authPass, ssl=False):
+  def __init__(self, session, host, port, authMech, authUser, authPass,
+               ssl=False, topicCredits=0xFFFFFFFF):
     self.session  = session
-    self.host     = host
-    self.port     = port
+    self.host = host
+    self.port = port
     self.ssl = ssl
     self.authUser = authUser
     self.authPass = authPass
-    self.agents   = {}
+    self.topicCredits = topicCredits
+    self.agents = {}
     self.agents["1.0"] = Agent(self, "1.0", "BrokerAgent")
     self.topicBound = False
     self.cv = Condition()
@@ -1100,8 +1125,8 @@
     self.syncRequest = 0
     self.syncResult = None
     self.reqsOutstanding = 1
-    self.error     = None
-    self.brokerId  = None
+    self.error = None
+    self.brokerId = None
     self.isConnected = False
     self.amqpSessionId = "%s.%d" % (os.uname()[1], os.getpid())
     self._tryToConnect()
@@ -1152,6 +1177,9 @@
       auth = ""
     return "amqp%s://%s%s:%d" % (ssl, auth, self.host, self.port or 5672)
 
+  def replenishCredits(self, credits):
+    self.amqpSession.message_flow(destination="tdest", unit=0, value=credits)
+
   def __repr__(self):
     if self.isConnected:
       return "Broker connected at: %s" % self.getUrl()
@@ -1185,8 +1213,8 @@
                                          accept_mode=self.amqpSession.accept_mode.none,
                                          acquire_mode=self.amqpSession.acquire_mode.pre_acquired)
       self.amqpSession.incoming("tdest").listen(self._replyCb)
-      self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=1)
-      self.amqpSession.message_flow(destination="tdest", unit=0, value=0xFFFFFFFF)
+      self.amqpSession.message_set_flow_mode(destination="tdest", flow_mode=0)
+      self.amqpSession.message_flow(destination="tdest", unit=0, value=self.topicCredits)
       self.amqpSession.message_flow(destination="tdest", unit=1, value=0xFFFFFFFF)
 
       self.isConnected = True
@@ -1351,16 +1379,15 @@
   def __init__(self, session, broker, codec):
     self.session = session
     self.broker  = broker
-    pname = str(codec.read_str8())
-    cname = str(codec.read_str8())
-    hash  = codec.read_bin128()
-    self.classKey = (pname, cname, hash)
+    self.classKey = ClassKey(codec)
     self.timestamp = codec.read_int64()
     self.severity = codec.read_uint8()
     self.schema = None
+    pname = self.classKey.getPackageName()
+    pkey = self.classKey.getPackageKey()
     if pname in session.packages:
-      if (cname, hash) in session.packages[pname]:
-        self.schema = session.packages[pname][(cname, hash)]
+      if pkey in session.packages[pname]:
+        self.schema = session.packages[pname][pkey]
         self.arguments = {}
         for arg in self.schema.arguments:
           self.arguments[arg.name] = session._decodeValue(codec, arg.type)
@@ -1369,7 +1396,7 @@
     if self.schema == None:
       return "<uninterpretable>"
     out = strftime("%c", gmtime(self.timestamp / 1000000000))
-    out += " " + self._sevName() + " " + self.classKey[0] + ":" + self.classKey[1]
+    out += " " + self._sevName() + " " + self.classKey.getPackageName() + ":" + self.classKey.getClassName()
     out += " broker=" + self.broker.getUrl()
     for arg in self.schema.arguments:
       disp = self.session._displayValue(self.arguments[arg.name], arg.type).encode("utf8")

Modified: incubator/qpid/trunk/qpid/python/tests_0-10/management.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/management.py?rev=719580&r1=719579&r2=719580&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/management.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/management.py Fri Nov 21 05:53:53 2008
@@ -71,10 +71,27 @@
             self.assertEqual (res.sequence, seq)
             self.assertEqual (res.body,     body)
 
-    def test_system_object (self):
+    def test_get_objects(self):
         self.startQmf()
-        systems = self.qmf.getObjects(_class="system")
-        self.assertEqual (len (systems), 1)
+
+        # get the package list, verify that the qpid broker package is there
+        packages = self.qmf.getPackages()
+        assert 'org.apache.qpid.broker' in packages
+
+        # get the schema class keys for the broker, verify the broker table and link-down event
+        keys = self.qmf.getClasses('org.apache.qpid.broker')
+        broker = None
+        linkDown = None
+        for key in keys:
+            if key.getClassName() == "broker":  broker = key
+            if key.getClassName() == "brokerLinkDown" : linkDown = key
+        assert broker
+        assert linkDown
+
+        brokerObjs = self.qmf.getObjects(_class="broker")
+        assert len(brokerObjs) == 1
+        brokerObjs = self.qmf.getObjects(_key=broker)
+        assert len(brokerObjs) == 1
 
     def test_self_session_id (self):
         self.startQmf()