You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/09/19 22:54:09 UTC

svn commit: r697237 - in /incubator/qpid/trunk/qpid/python: commands/qpid-config commands/qpid-route qpid/qmfconsole.py

Author: tross
Date: Fri Sep 19 13:54:08 2008
New Revision: 697237

URL: http://svn.apache.org/viewvc?rev=697237&view=rev
Log:
QPID-1288 - Added error handling and remote agent support to the console API.  Ported qpid-config and qpid-route to the new API

Modified:
    incubator/qpid/trunk/qpid/python/commands/qpid-config
    incubator/qpid/trunk/qpid/python/commands/qpid-route
    incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py

Modified: incubator/qpid/trunk/qpid/python/commands/qpid-config
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-config?rev=697237&r1=697236&r2=697237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-config (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-config Fri Sep 19 13:54:08 2008
@@ -22,16 +22,7 @@
 import os
 import getopt
 import sys
-import socket
-import qpid
-from threading       import Condition
-from qpid.management import managementClient
-from qpid.managementdata import Broker
-from qpid.peer       import Closed
-from qpid.connection import Connection, ConnectionFailed
-from qpid.datatypes  import uuid4
-from qpid.util       import connect
-from time            import sleep
+from qpid import qmfconsole
 
 _recursive    = False
 _host         = "localhost"
@@ -78,44 +69,21 @@
 
 class BrokerManager:
     def __init__ (self):
-        self.dest   = None
-        self.src    = None
-        self.broker = None
-
-    def SetBroker (self, broker):
-        self.broker = broker
-
-    def ConnectToBroker (self):
-        try:
-            self.sessionId = "%s.%d" % (os.uname()[1], os.getpid())
-            self.conn = Connection (connect (self.broker.host, self.broker.port),
-                                    username=self.broker.username, password=self.broker.password)
-            self.conn.start ()
-            self.session  = self.conn.session (self.sessionId)
-            self.mclient  = managementClient (self.conn.spec)
-            self.mchannel = self.mclient.addChannel (self.session)
-        except socket.error, e:
-            print "Socket Error %s - %s" % (e[0], e[1])
-            sys.exit (1)
-        except Closed, e:
-            print "Connect Failed %d - %s" % (e[0], e[1])
-            sys.exit (1)
-        except ConnectionFailed, e:
-            print "Connect Failed %d - %s" % (e[0], e[1])
-            sys.exit(1)
-
-    def Disconnect (self):
-        self.mclient.removeChannel (self.mchannel)
-        self.session.close(timeout=10)
-        self.conn.close(timeout=10)
+        self.brokerName = None
+        self.qmf        = None
+        self.broker     = None
+
+    def SetBroker (self, brokerUrl):
+        self.url = brokerUrl
+        self.qmf = qmfconsole.Session()
+        self.broker = self.qmf.addBroker(brokerUrl)
+
+    def Disconnect(self):
+        self.qmf.delBroker(self.broker)
 
     def Overview (self):
-        self.ConnectToBroker ()
-        mc  = self.mclient
-        mch = self.mchannel
-        mc.syncWaitForStable (mch)
-        exchanges = mc.syncGetObjects (mch, "exchange")
-        queues    = mc.syncGetObjects (mch, "queue")
+        exchanges = self.qmf.getObjects(name="exchange")
+        queues    = self.qmf.getObjects(name="queue")
         print "Total Exchanges: %d" % len (exchanges)
         etype = {}
         for ex in exchanges:
@@ -136,11 +104,7 @@
         print "    non-durable: %d" % (len (queues) - _durable)
 
     def ExchangeList (self, filter):
-        self.ConnectToBroker ()
-        mc  = self.mclient
-        mch = self.mchannel
-        mc.syncWaitForStable (mch)
-        exchanges = mc.syncGetObjects (mch, "exchange")
+        exchanges = self.qmf.getObjects(name="exchange")
         print "Durable   Type      Bindings  Exchange Name"
         print "======================================================="
         for ex in exchanges:
@@ -148,18 +112,14 @@
                 print "%4c      %-10s%5d     %s" % (YN (ex.durable), ex.type, ex.bindingCount, ex.name)
 
     def ExchangeListRecurse (self, filter):
-        self.ConnectToBroker ()
-        mc  = self.mclient
-        mch = self.mchannel
-        mc.syncWaitForStable (mch)
-        exchanges = mc.syncGetObjects (mch, "exchange")
-        bindings  = mc.syncGetObjects (mch, "binding")
-        queues    = mc.syncGetObjects (mch, "queue")
+        exchanges = self.qmf.getObjects(name="exchange")
+        bindings  = self.qmf.getObjects(name="binding")
+        queues    = self.qmf.getObjects(name="queue")
         for ex in exchanges:
             if self.match (ex.name, filter):
                 print "Exchange '%s' (%s)" % (ex.name, ex.type)
                 for bind in bindings:
-                    if bind.exchangeRef == ex.id:
+                    if bind.exchangeRef == ex.getObjectId():
                         qname = "<unknown>"
                         queue = self.findById (queues, bind.queueRef)
                         if queue != None:
@@ -168,12 +128,8 @@
             
 
     def QueueList (self, filter):
-        self.ConnectToBroker ()
-        mc  = self.mclient
-        mch = self.mchannel
-        mc.syncWaitForStable (mch)
-        queues   = mc.syncGetObjects (mch, "queue")
-        journals = mc.syncGetObjects (mch, "journal")
+        queues   = self.qmf.getObjects(name="queue")
+        journals = self.qmf.getObjects(name="journal")
         print "                                      Store Size"
         print "Durable  AutoDel  Excl  Bindings  (files x file pages)  Queue Name"
         print "==========================================================================================="
@@ -193,18 +149,14 @@
                              YN (q.exclusive), q.bindingCount, q.name)
 
     def QueueListRecurse (self, filter):
-        self.ConnectToBroker ()
-        mc  = self.mclient
-        mch = self.mchannel
-        mc.syncWaitForStable (mch)
-        exchanges = mc.syncGetObjects (mch, "exchange")
-        bindings  = mc.syncGetObjects (mch, "binding")
-        queues    = mc.syncGetObjects (mch, "queue")
+        exchanges = self.qmf.getObjects(name="exchange")
+        bindings  = self.qmf.getObjects(name="binding")
+        queues    = self.qmf.getObjects(name="queue")
         for queue in queues:
             if self.match (queue.name, filter):
                 print "Queue '%s'" % queue.name
                 for bind in bindings:
-                    if bind.queueRef == queue.id:
+                    if bind.queueRef == queue.getObjectId():
                         ename = "<unknown>"
                         ex    = self.findById (exchanges, bind.exchangeRef)
                         if ex != None:
@@ -216,30 +168,19 @@
     def AddExchange (self, args):
         if len (args) < 2:
             Usage ()
-        self.ConnectToBroker ()
         etype = args[0]
         ename = args[1]
-
-        try:
-            self.session.exchange_declare (exchange=ename, type=etype, durable=_durable)
-        except Closed, e:
-            print "Failed:", e
+        self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, durable=_durable)
 
     def DelExchange (self, args):
         if len (args) < 1:
             Usage ()
-        self.ConnectToBroker ()
         ename = args[0]
-
-        try:
-            self.session.exchange_delete (exchange=ename)
-        except Closed, e:
-            print "Failed:", e
+        self.broker.getAmqpSession().exchange_delete (exchange=ename)
 
     def AddQueue (self, args):
         if len (args) < 1:
             Usage ()
-        self.ConnectToBroker ()
         qname    = args[0]
         declArgs = {}
         if _durable:
@@ -251,55 +192,37 @@
         if _maxQueueCount:
             declArgs[MAX_QUEUE_COUNT]  = _maxQueueCount
 
-        try:
-            self.session.queue_declare (queue=qname, durable=_durable, arguments=declArgs)
-        except Closed, e:
-            print "Failed:", e
+        self.broker.getAmqpSession().queue_declare (queue=qname, durable=_durable, arguments=declArgs)
 
     def DelQueue (self, args):
         if len (args) < 1:
             Usage ()
-        self.ConnectToBroker ()
         qname = args[0]
-
-        try:
-            self.session.queue_delete (queue=qname)
-        except Closed, e:
-            print "Failed:", e
+        self.broker.getAmqpSession().queue_delete (queue=qname)
 
     def Bind (self, args):
         if len (args) < 2:
             Usage ()
-        self.ConnectToBroker ()
         ename = args[0]
         qname = args[1]
         key   = ""
         if len (args) > 2:
             key = args[2]
-
-        try:
-            self.session.exchange_bind (queue=qname, exchange=ename, binding_key=key)
-        except Closed, e:
-            print "Failed:", e
+        self.broker.getAmqpSession().exchange_bind (queue=qname, exchange=ename, binding_key=key)
 
     def Unbind (self, args):
         if len (args) < 2:
             Usage ()
-        self.ConnectToBroker ()
         ename = args[0]
         qname = args[1]
         key   = ""
         if len (args) > 2:
             key = args[2]
-
-        try:
-            self.session.exchange_unbind (queue=qname, exchange=ename, binding_key=key)
-        except Closed, e:
-            print "Failed:", e
+        self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key)
 
     def findById (self, items, id):
         for item in items:
-            if item.id == id:
+            if item.getObjectId() == id:
                 return item
         return None
 
@@ -343,43 +266,48 @@
 
 nargs = len (cargs)
 bm    = BrokerManager ()
-bm.SetBroker (Broker (_host))
 
-if nargs == 0:
-    bm.Overview ()
-else:
-    cmd = cargs[0]
-    modifier = ""
-    if nargs > 1:
-        modifier = cargs[1]
-    if cmd[0] == 'e':
-        if _recursive:
-            bm.ExchangeListRecurse (modifier)
-        else:
-            bm.ExchangeList (modifier)
-    elif cmd[0] == 'q':
-        if _recursive:
-            bm.QueueListRecurse (modifier)
-        else:
-            bm.QueueList (modifier)
-    elif cmd == "add":
-        if modifier == "exchange":
-            bm.AddExchange (cargs[2:])
-        elif modifier == "queue":
-            bm.AddQueue (cargs[2:])
-        else:
-            Usage ()
-    elif cmd == "del":
-        if modifier == "exchange":
-            bm.DelExchange (cargs[2:])
-        elif modifier == "queue":
-            bm.DelQueue (cargs[2:])
+try:
+    bm.SetBroker(qmfconsole.BrokerURL(_host))
+    if nargs == 0:
+        bm.Overview ()
+    else:
+        cmd = cargs[0]
+        modifier = ""
+        if nargs > 1:
+            modifier = cargs[1]
+        if cmd[0] == 'e':
+            if _recursive:
+                bm.ExchangeListRecurse (modifier)
+            else:
+                bm.ExchangeList (modifier)
+        elif cmd[0] == 'q':
+            if _recursive:
+                bm.QueueListRecurse (modifier)
+            else:
+                bm.QueueList (modifier)
+        elif cmd == "add":
+            if modifier == "exchange":
+                bm.AddExchange (cargs[2:])
+            elif modifier == "queue":
+                bm.AddQueue (cargs[2:])
+            else:
+                Usage ()
+        elif cmd == "del":
+            if modifier == "exchange":
+                bm.DelExchange (cargs[2:])
+            elif modifier == "queue":
+                bm.DelQueue (cargs[2:])
+            else:
+                Usage ()
+        elif cmd == "bind":
+            bm.Bind (cargs[1:])
+        elif cmd == "unbind":
+            bm.Unbind (cargs[1:])
         else:
             Usage ()
-    elif cmd == "bind":
-        bm.Bind (cargs[1:])
-    elif cmd == "unbind":
-        bm.Unbind (cargs[1:])
-    else:
-        Usage ()
+except Exception,e:
+    print "Failed:", e.message
+    sys.exit(1)
+
 bm.Disconnect()

Modified: incubator/qpid/trunk/qpid/python/commands/qpid-route
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-route?rev=697237&r1=697236&r2=697237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-route (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-route Fri Sep 19 13:54:08 2008
@@ -22,13 +22,8 @@
 import getopt
 import sys
 import socket
-import qpid
 import os
-from qpid.management import managementClient
-from qpid.managementdata import Broker
-from qpid.peer       import Closed
-from qpid.connection import Connection, ConnectionFailed
-from qpid.util       import connect
+from qpid import qmfconsole
 
 def Usage ():
     print "Usage:  qpid-route [OPTIONS] link add  <dest-broker> <src-broker>"
@@ -58,93 +53,57 @@
 
 class RouteManager:
     def __init__ (self, destBroker):
-        self.dest = Broker (destBroker)
+        self.dest = qmfconsole.BrokerURL(destBroker)
         self.src  = None
-
-    def ConnectToBroker (self):
-        broker = self.dest
-        if _verbose:
-            print "Connecting to broker: %s:%d" % (broker.host, broker.port)
-        try:
-            self.sessionId = "%s.%d" % (os.uname()[1], os.getpid())
-            self.conn    = Connection (connect (broker.host, broker.port), \
-                                           username=broker.username, password=broker.password)
-            self.conn.start ()
-            self.session = self.conn.session(self.sessionId)
-            self.mclient = managementClient (self.conn.spec)
-            self.mch     = self.mclient.addChannel (self.session)
-            self.mclient.syncWaitForStable (self.mch)
-        except socket.error, e:
-            print "Socket Error %s - %s" % (e[0], e[1])
-            sys.exit (1)
-        except Closed, e:
-            print "Connect Failed %d - %s" % (e[0], e[1])
-            sys.exit (1)
-        except ConnectionFailed, e:
-            print "Connect Failed %d - %s" % (e[0], e[1])
-            sys.exit(1)
+        self.qmf = qmfconsole.Session()
+        self.broker = self.qmf.addBroker(destBroker)
 
     def Disconnect (self):
-        self.mclient.removeChannel (self.mch)
-        self.session.close(timeout=10)
-        self.conn.close(timeout=10)
+        self.qmf.delBroker(self.broker)
 
     def getLink (self):
-        links = self.mclient.syncGetObjects (self.mch, "link")
+        links = self.qmf.getObjects(name="link")
         for link in links:
             if "%s:%d" % (link.host, link.port) == self.src.name ():
                 return link
         return None
 
     def AddLink (self, srcBroker):
-        self.src  = Broker (srcBroker)
-        mc = self.mclient
-
+        self.src = qmfconsole.BrokerURL(srcBroker)
         if self.dest.name() == self.src.name():
             print "Linking broker to itself is not permitted"
             sys.exit(1)
 
-        brokers = mc.syncGetObjects (self.mch, "broker")
+        brokers = self.qmf.getObjects(name="broker")
         broker = brokers[0]
         link = self.getLink()
         if link != None:
-            print "Link already exists"
-            sys.exit(1)
+            raise Exception("Link already exists")
 
-        connectArgs = {}
-        connectArgs["host"]          = self.src.host
-        connectArgs["port"]          = self.src.port
-        connectArgs["useSsl"]        = False
-        connectArgs["durable"]       = _durable
-        if self.src.username == "anonymous":
-            connectArgs["authMechanism"] = "ANONYMOUS"
+        if self.src.authName == "anonymous":
+            mech = "ANONYMOUS"
         else:
-            connectArgs["authMechanism"] = "PLAIN"
-            connectArgs["username"]      = self.src.username
-            connectArgs["password"]      = self.src.password
-        res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs)
+            mech = "PLAIN"
+        res = broker.connect(self.src.host, self.src.port, False, _durable,
+                             mech, self.src.authName, self.src.authPass)
         if _verbose:
-            print "Connect method returned:", res.status, res.statusText
-        link = self.getLink ()
+            print "Connect method returned:", res.status, res.text
+        link = self.getLink()
 
     def DelLink (self, srcBroker):
-        self.src  = Broker (srcBroker)
-        mc = self.mclient
-
-        brokers = mc.syncGetObjects (self.mch, "broker")
+        self.src = qmfconsole.BrokerURL(srcBroker)
+        brokers = self.qmf.getObjects(name="broker")
         broker = brokers[0]
         link = self.getLink()
         if link == None:
-            print "Link not found"
-            sys.exit(1)
+            raise Exception("Link not found")
 
-        res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
+        res = link.close()
         if _verbose:
-            print "Close method returned:", res.status, res.statusText
+            print "Close method returned:", res.status, res.text
 
     def ListLinks (self):
-        mc = self.mclient
-        links = mc.syncGetObjects (self.mch, "link")
+        links = self.qmf.getObjects(name="link")
         if len(links) == 0:
             print "No Links Found"
         else:
@@ -152,145 +111,118 @@
             print "Host            Port    Durable  State             Last Error"
             print "==================================================================="
             for link in links:
-                print "%-16s%-8d   %c     %-18s%s" % (link.host, link.port, YN(link.durable), link.state, link.lastError)
+                print "%-16s%-8d   %c     %-18s%s" % \
+                (link.host, link.port, YN(link.durable), link.state, link.lastError)
 
     def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes):
-        self.src  = Broker (srcBroker)
-        mc = self.mclient
-
+        self.src = qmfconsole.BrokerURL(srcBroker)
         if self.dest.name() == self.src.name():
-            print "Linking broker to itself is not permitted"
-            sys.exit(1)
+            raise Exception("Linking broker to itself is not permitted")
 
-        brokers = mc.syncGetObjects (self.mch, "broker")
+        brokers = self.qmf.getObjects(name="broker")
         broker = brokers[0]
 
-        link = self.getLink ()
+        link = self.getLink()
         if link == None:
             if _verbose:
                 print "Inter-broker link not found, creating..."
 
-            connectArgs = {}
-            connectArgs["host"]          = self.src.host
-            connectArgs["port"]          = self.src.port
-            connectArgs["useSsl"]        = False
-            connectArgs["durable"]       = _durable
-            if self.src.username == "anonymous":
-                connectArgs["authMechanism"] = "ANONYMOUS"
+            if self.src.authName == "anonymous":
+                mech = "ANONYMOUS"
             else:
-                connectArgs["authMechanism"] = "PLAIN"
-                connectArgs["username"]      = self.src.username
-                connectArgs["password"]      = self.src.password
-            res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs)
+                mech = "PLAIN"
+            res = broker.connect(self.src.host, self.src.port, False, _durable,
+                                 mech, self.src.authName, self.src.authPass)
             if _verbose:
-                print "Connect method returned:", res.status, res.statusText
-            link = self.getLink ()
+                print "Connect method returned:", res.status, res.text
+            link = self.getLink()
 
         if link == None:
-            print "Protocol Error - Missing link ID"
-            sys.exit (1)
+            raise Exception("Protocol Error - Missing link ID")
 
-        bridges = mc.syncGetObjects (self.mch, "bridge")
+        bridges = self.qmf.getObjects(name="bridge")
         for bridge in bridges:
-            if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey:
+            if bridge.linkRef == link.getObjectId() and \
+                    bridge.dest == exchange and bridge.key == routingKey:
                 if not _quiet:
-                    print "Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)
-                    sys.exit (1)
+                    raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey))
                 sys.exit (0)
 
         if _verbose:
             print "Creating inter-broker binding..."
-        bridgeArgs = {}
-        bridgeArgs["durable"]    = _durable
-        bridgeArgs["src"]        = exchange
-        bridgeArgs["dest"]       = exchange
-        bridgeArgs["key"]        = routingKey
-        bridgeArgs["tag"]        = tag
-        bridgeArgs["excludes"]   = excludes
-        bridgeArgs["srcIsQueue"] = 0
-        bridgeArgs["srcIsLocal"] = 0
-        res = mc.syncCallMethod (self.mch, link.id, link.classKey, "bridge", bridgeArgs)
+        res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, 0, 0)
         if res.status == 4:
-            print "Can't create a durable route on a non-durable link"
-            sys.exit(1)
+            raise Exception("Can't create a durable route on a non-durable link")
         if _verbose:
             print "Bridge method returned:", res.status, res.statusText
 
     def DelRoute (self, srcBroker, exchange, routingKey):
-        self.src  = Broker (srcBroker)
-        mc = self.mclient
-
-        link = self.getLink ()
+        self.src = qmfconsole.BrokerURL(srcBroker)
+        link = self.getLink()
         if link == None:
             if not _quiet:
-                print "No link found from %s to %s" % (self.src.name(), self.dest.name())
-                sys.exit (1)
+                raise Exception("No link found from %s to %s" % (self.src.name(), self.dest.name()))
             sys.exit (0)
 
-        bridges = mc.syncGetObjects (self.mch, "bridge")
+        bridges = self.qmf.getObjects(name="bridge")
         for bridge in bridges:
-            if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey:
+            if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey:
                 if _verbose:
                     print "Closing bridge..."
-                res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close")
+                res = bridge.close()
                 if res.status != 0:
-                    print "Error closing bridge: %d - %s" % (res.status, res.statusText)
-                    sys.exit (1)
+                    raise Exception("Error closing bridge: %d - %s" % (res.status, res.statusText))
                 if len (bridges) == 1 and _dellink:
                     link = self.getLink ()
                     if link == None:
                         sys.exit (0)
                     if _verbose:
                         print "Last bridge on link, closing link..."
-                    res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
+                    res = link.close()
                     if res.status != 0:
-                        print "Error closing link: %d - %s" % (res.status, res.statusText)
-                        sys.exit (1)
+                        raise Exception("Error closing link: %d - %s" % (res.status, res.statusText))
                 sys.exit (0)
         if not _quiet:
-            print "Route not found"
-            sys.exit (1)
+            raise Exception("Route not found")
 
     def ListRoutes (self):
-        mc = self.mclient
-        links   = mc.syncGetObjects (self.mch, "link")
-        bridges = mc.syncGetObjects (self.mch, "bridge")
+        links   = self.qmf.getObjects(name="link")
+        bridges = self.qmf.getObjects(name="bridge")
 
         for bridge in bridges:
             myLink = None
             for link in links:
-                if bridge.linkRef == link.id:
+                if bridge.linkRef == link.getObjectId():
                     myLink = link
                     break
             if myLink != None:
                 print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key)
 
     def ClearAllRoutes (self):
-        mc = self.mclient
-        links   = mc.syncGetObjects (self.mch, "link")
-        bridges = mc.syncGetObjects (self.mch, "bridge")
+        links   = self.qmf.getObjects(name="link")
+        bridges = self.qmf.getObjects(name="bridge")
 
         for bridge in bridges:
             if _verbose:
                 myLink = None
                 for link in links:
-                    if bridge.linkRef == link.id:
+                    if bridge.linkRef == link.getObjectId():
                         myLink = link
                         break
                 if myLink != None:
                     print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key),
-            res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close")
+            res = bridge.close()
             if res.status != 0:
                 print "Error: %d - %s" % (res.status, res.statusText)
             elif _verbose:
                 print "Ok"
 
         if _dellink:
-            links = mc.syncGetObjects (self.mch, "link")
+            links = self.qmf.getObjects(name="link")
             for link in links:
                 if _verbose:
                     print "Deleting Link: %s:%d... " % (link.host, link.port),
-                res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
+                res = link.close()
                 if res.status != 0:
                     print "Error: %d - %s" % (res.status, res.statusText)
                 elif _verbose:
@@ -331,41 +263,45 @@
 
 group = cargs[0]
 cmd   = cargs[1]
-rm    = RouteManager (destBroker)
-rm.ConnectToBroker ()
 
-if group == "link":
-    if cmd == "add":
-        if nargs != 4:
-            Usage()
-        rm.AddLink (cargs[3])
-    elif cmd == "del":
-        if nargs != 4:
-            Usage()
-        rm.DelLink (cargs[3])
-    elif cmd == "list":
-        rm.ListLinks ()
-
-elif group == "route":
-    if cmd == "add":
-        if nargs < 6 or nargs > 8:
-            Usage ()
-
-        tag = ""
-        excludes = ""
-        if nargs > 6: tag = cargs[6]     
-        if nargs > 7: excludes = cargs[7]     
-        rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes)
-    elif cmd == "del":
-        if nargs != 6:
-            Usage ()
-        else:
-            rm.DelRoute (cargs[3], cargs[4], cargs[5])
-    else:
-        if   cmd == "list":
-            rm.ListRoutes ()
-        elif cmd == "flush":
-            rm.ClearAllRoutes ()
+try:
+    rm = RouteManager (destBroker)
+    if group == "link":
+        if cmd == "add":
+            if nargs != 4:
+                Usage()
+            rm.AddLink (cargs[3])
+        elif cmd == "del":
+            if nargs != 4:
+                Usage()
+            rm.DelLink (cargs[3])
+        elif cmd == "list":
+            rm.ListLinks ()
+
+    elif group == "route":
+        if cmd == "add":
+            if nargs < 6 or nargs > 8:
+                Usage ()
+
+            tag = ""
+            excludes = ""
+            if nargs > 6: tag = cargs[6]     
+            if nargs > 7: excludes = cargs[7]     
+            rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes)
+        elif cmd == "del":
+            if nargs != 6:
+                Usage ()
+            else:
+                rm.DelRoute (cargs[3], cargs[4], cargs[5])
         else:
-            Usage ()
+            if   cmd == "list":
+                rm.ListRoutes ()
+            elif cmd == "flush":
+                rm.ClearAllRoutes ()
+            else:
+                Usage ()
+except Exception,e:
+    print "Failed:", e.message
+    sys.exit(1)
+
 rm.Disconnect ()

Modified: incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py?rev=697237&r1=697236&r2=697237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py Fri Sep 19 13:54:08 2008
@@ -23,6 +23,7 @@
 import qpid
 import struct
 import socket
+import re
 from qpid.peer       import Closed
 from qpid.connection import Connection, ConnectionFailed
 from qpid.datatypes  import uuid4
@@ -46,10 +47,14 @@
     used to obtain details about the class."""
     pass
 
-  def newAgent(self, broker, agent):
+  def newAgent(self, agent):
     """ Invoked when a QMF agent is discovered. """
     pass
 
+  def delAgent(self, agent):
+    """ Invoked when a QMF agent disconects. """
+    pass
+
   def objectProps(self, broker, id, record):
     """ Invoked when an object is updated. """
     pass
@@ -70,6 +75,25 @@
     """ """
     pass
 
+class BrokerURL:
+  def __init__(self, text):
+    rex = re.compile(r"""
+    # [   <user>  [   / <password> ] @]  <host>  [   :<port>   ]
+    ^ (?: ([^/]*) (?: / ([^@]*)   )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X)
+    match = rex.match(text)
+    if not match: raise ValueError("'%s' is not a valid broker url" % (text))
+    user, password, host, port = match.groups()
+
+    self.host = socket.gethostbyname(host)
+    if port: self.port = int(port)
+    else: self.port = 5672
+    self.authName = user or "guest"
+    self.authPass = password or "guest"
+    self.authMech = "PLAIN"
+
+  def name(self):
+    return self.host + ":" + str(self.port)
+
 class Session:
   """
   An instance of the Session class represents a console session running
@@ -95,12 +119,17 @@
     self.cv               = Condition()
     self.syncSequenceList = []
     self.getResult        = []
+    self.error            = None
+
+  def __repr__(self):
+    return "QMF Console Session Manager (brokers connected: %d)" % len(self.brokers)
 
-  def addBroker(self, host="localhost", port=5672,
-                authMech="PLAIN", authName="guest", authPass="guest"):
+  def addBroker(self, target="localhost"):
     """ Connect to a Qpid broker.  Returns an object of type Broker. """
-    broker = Broker(self, host, port, authMech, authName, authPass)
+    url = BrokerURL(target)
+    broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass)
     self.brokers.append(broker)
+    self.getObjects(broker=broker, name="agent")
     return broker
 
   def delBroker(self, broker):
@@ -138,11 +167,22 @@
       if (cname, hash) in self.packages[pname]:
         return self.packages[pname][(cname, hash)]
 
-  def getAgents(self):
+  def getAgents(self, broker=None):
     """ Get a list of currently known agents """
-    for broker in self.brokers:
-      broker._waitForStable()
-    pass
+    brokerList = []
+    if broker == None:
+      for b in self.brokers:
+        brokerList.append(b)
+    else:
+      brokerList.append(broker)
+
+    for b in brokerList:
+      b._waitForStable()
+    agentList = []
+    for b in brokerList:
+      for a in b.getAgents():
+        agentList.append(a)
+    return agentList
 
   def getObjects(self, **kwargs):
     """ Get a list of objects from QMF agents.
@@ -165,7 +205,8 @@
     broker = <broker> - supply a broker as returned by addBroker
     """
     if "broker" in kwargs:
-      brokerList = [].append(kwargs["broker"])
+      brokerList = []
+      brokerList.append(kwargs["broker"])
     else:
       brokerList = self.brokers
     for broker in brokerList:
@@ -176,7 +217,7 @@
       agent = kwargs["agent"]
       if agent.broker not in brokerList:
         raise Exception("Supplied agent is not accessible through the supplied broker")
-      agentList = append(agent)
+      agentList.append(agent)
     else:
       for broker in brokerList:
         for agent in broker.getAgents():
@@ -209,7 +250,7 @@
     starttime = time()
     timeout = False
     self.cv.acquire()
-    while len(self.syncSequenceList) > 0:
+    while len(self.syncSequenceList) > 0 and self.error == None:
       self.cv.wait(self.GET_WAIT_TIME)
       if time() - starttime > self.GET_WAIT_TIME:
         for pendingSeq in self.syncSequenceList:
@@ -218,6 +259,11 @@
         timeout = True
     self.cv.release()
 
+    if self.error:
+      errorText = self.error
+      self.error = None
+      raise Exception(errorText)
+
     if len(self.getResult) == 0 and timeout:
       raise RuntimeError("No agent responded within timeout period")
     return self.getResult
@@ -351,6 +397,8 @@
     self.cv.release()
     schema = self.packages[pname][(cname, hash)]
     object = Object(self, broker, schema, codec, prop, stat)
+    if pname == "org.apache.qpid.broker" and cname == "agent":
+      broker._updateAgent(object)
 
     self.cv.acquire()
     if seq in self.syncSequenceList:
@@ -365,6 +413,13 @@
       if stat:
         self.console.objectStats(broker, object.getObjectId(), object)
 
+  def _handleError(self, error):
+    self.error = error
+    self.cv.acquire()
+    self.syncSequenceList = []
+    self.cv.notify()
+    self.cv.release()
+  
 class Package:
   """ """
   def __init__(self, name):
@@ -407,23 +462,23 @@
     return result
 
   def getKey(self):
-    """ """
+    """ Return the class-key for this class. """
     return self.classKey
 
   def getProperties(self):
-    """ """
+    """ Return the list of properties for the class. """
     return self.properties
 
   def getStatistics(self):
-    """ """
+    """ Return the list of statistics for the class. """
     return self.statistics
 
   def getMethods(self):
-    """ """
+    """ Return the list of methods for the class. """
     return self.methods
 
   def getEvents(self):
-    """ """
+    """ Return the list of events for the class. """
     return self.events
 
 class SchemaProperty:
@@ -448,6 +503,9 @@
       elif key == "maxlen" : self.maxlen = value
       elif key == "desc"   : self.desc   = str(value)
 
+  def __repr__(self):
+    return self.name
+
 class SchemaStatistic:
   """ """
   def __init__(self, codec):
@@ -461,6 +519,9 @@
       if   key == "unit" : self.unit = str(value)
       elif key == "desc" : self.desc = str(value)
 
+  def __repr__(self):
+    return self.name
+
 class SchemaMethod:
   """ """
   def __init__(self, codec):
@@ -476,6 +537,19 @@
     for idx in range(argCount):
       self.arguments.append(SchemaArgument(codec, methodArg=True))
 
+  def __repr__(self):
+    result = self.name + "("
+    first = True
+    for arg in self.arguments:
+      if arg.dir.find("I") != -1:
+        if first:
+          first = False
+        else:
+          result += ", "
+        result += arg.name
+    result += ")"
+    return result
+
 class SchemaEvent:
   """ """
   def __init__(self, codec):
@@ -491,6 +565,18 @@
     for idx in range(argCount):
       self.arguments.append(SchemaArgument(codec, methodArg=False))
 
+  def __repr__(self):
+    result = self.name + "("
+    first = True
+    for arg in self.arguments:
+      if first:
+        first = False
+      else:
+        result += ", "
+      result += arg.name
+    result += ")"
+    return result
+
 class SchemaArgument:
   """ """
   def __init__(self, codec, methodArg):
@@ -538,12 +624,8 @@
     return 0
 
   def __repr__(self):
-    return "%08x-%04x-%04x-%04x-%04x%08x" % ((self.first  & 0xFFFFFFFF00000000) >> 32,
-                                             (self.first  & 0x00000000FFFF0000) >> 16,
-                                             (self.first  & 0x000000000000FFFF),
-                                             (self.second & 0xFFFF000000000000) >> 48,
-                                             (self.second & 0x0000FFFF00000000) >> 32,
-                                             (self.second & 0x00000000FFFFFFFF))
+    return "%d-%d-%d-%d-%x" % (self.getFlags(), self.getSequence(),
+                               self.getBroker(), self.getBank(), self.getObject())
 
   def index(self):
     return (self.first, self.second)
@@ -596,23 +678,27 @@
         self.statistics.append((statistic, self._decodeValue(codec, statistic.type)))
 
   def getObjectId(self):
-    """ """
+    """ Return the object identifier for this object """
     return self.objectId
 
   def getClassKey(self):
-    """ """
+    """ Return the class-key that references the schema describing this object. """
     return self.schema.getKey()
 
   def getSchema(self):
-    """ """
+    """ Return the schema that describes this object. """
     return self.schema
 
+  def getMethods(self):
+    """ Return a list of methods available for this object. """
+    return self.schema.getMethods()
+
   def getTimestamps(self):
-    """ """
+    """ Return the current, creation, and deletion times for this object. """
     return self.currentTime, self.createTime, self.deleteTime
 
   def getIndex(self):
-    """ """
+    """ Return a string describing this object's primary key. """
     result = ""
     for property, value in self.properties:
       if property.index:
@@ -634,6 +720,7 @@
     for statistic, value in self.statistics:
       if name == statistic.name:
         return value
+    raise Exception("Type Object has no attribute '%s'" % name)
 
   def _invoke(self, name, args, kwargs):
     for method in self.schema.getMethods():
@@ -653,20 +740,27 @@
             self._encodeValue(sendCodec, args[aIdx], arg.type)
             aIdx += 1
         smsg = self.broker._message(sendCodec.encoded, "agent." + str(self.objectId.getBank()))
-        self.broker._send(smsg)
         self.broker.cv.acquire()
         self.broker.syncInFlight = True
+        self.broker.cv.release()
+
+        self.broker._send(smsg)
+
+        self.broker.cv.acquire()
         starttime = time()
-        while self.broker.syncInFlight:
+        while self.broker.syncInFlight and self.broker.error == None:
           self.broker.cv.wait(self.broker.SYNC_TIME)
           if time() - starttime > self.broker.SYNC_TIME:
             self.broker.cv.release()
             self.session.seqMgr._release(seq)
             raise RuntimeError("Timed out waiting for method to respond")
         self.broker.cv.release()
+        if self.broker.error != None:
+          errorText = self.broker.error
+          self.broker.error = None
+          raise Exception(errorText)
         return self.broker.syncResult
-      else:
-        raise Exception("Invalid Method (software defect)")
+    raise Exception("Invalid Method (software defect) [%s]" % name)
 
   def _parsePresenceMasks(self, codec, schema):
     excludeList = []
@@ -747,18 +841,21 @@
 
 class Broker:
   """ """
-  SYNC_TIME         = 10
+  SYNC_TIME = 10
 
   def __init__(self, session, host, port, authMech, authUser, authPass):
     self.session = session
-    self.agents = []
-    self.agents.append(Agent(self, 0))
+    self.host    = host
+    self.port    = port
+    self.agents  = {}
+    self.agents[0] = Agent(self, 0, "BrokerAgent")
     self.topicBound = False
     self.cv = Condition()
     self.syncInFlight = False
     self.syncRequest = 0
     self.syncResult = None
     self.reqsOutstanding = 1
+    self.error     = None
     self.brokerId  = None
     err = None
     try:
@@ -767,7 +864,7 @@
       self.conn.start()
       self.replyName = "reply-%s" % self.amqpSessionId
       self.amqpSession = self.conn.session(self.amqpSessionId)
-      self.amqpSession.auto_sync = False
+      self.amqpSession.auto_sync = True
       self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True)
       self.amqpSession.exchange_bind(exchange="amq.direct",
                                  queue=self.replyName, binding_key=self.replyName)
@@ -798,6 +895,7 @@
     except ConnectionFailed, e:
       err = "Connect Failed %d - %s" % (e[0], e[1])
 
+    self.active = True
     if err != None:
       raise Exception(err)
 
@@ -811,7 +909,36 @@
 
   def getAgents(self):
     """ Get the list of agents reachable via this broker """
-    return self.agents
+    return self.agents.values()
+
+  def getAmqpSession(self):
+    """ Get the AMQP session object for this connected broker. """
+    return self.amqpSession
+
+  def isConnected(self):
+    return self.active
+
+  def __repr__(self):
+    if self.active:
+      if self.port == 5672:
+        port = ""
+      else:
+        port = ":%d" % self.port
+      return "Broker connected at: amqp://%s%s" % (self.host, port)
+    else:
+      return "Disconnected Broker"
+
+  def _updateAgent(self, obj):
+    if obj.deleteTime == 0:
+      if obj.objectIdBank not in self.agents:
+        agent = Agent(self, obj.objectIdBank, obj.label)
+        self.agents[obj.objectIdBank] = agent
+        if self.session.console != None:
+          self.session.console.newAgent(agent)
+    else:
+      agent = self.agents.pop(obj.objectIdBank, None)
+      if agent != None and self.session.console != None:
+        self.session.console.delAgent(agent)
 
   def _setHeader(self, codec, opcode, seq=0):
     """ Compose the header of a management message. """
@@ -848,10 +975,14 @@
     self.amqpSession.message_transfer(destination=dest, message=msg)
 
   def _shutdown(self):
-    self.amqpSession.incoming("rdest").stop()
-    if self.session.console != None:
-      self.amqpSession.incoming("tdest").stop()
-    self.amqpSession.close()
+    if self.active:
+      self.amqpSession.incoming("rdest").stop()
+      if self.session.console != None:
+        self.amqpSession.incoming("tdest").stop()
+      self.amqpSession.close()
+      self.active = False
+    else:
+      raise Exception("Broker already disconnected")
 
   def _waitForStable(self):
     self.cv.acquire()
@@ -877,7 +1008,8 @@
     self.reqsOutstanding -= 1
     if self.reqsOutstanding == 0 and not self.topicBound and self.session.console != None:
       self.topicBound = True
-      self.amqpSession.exchange_bind(exchange="qpid.management", queue=self.topicName, binding_key="mgmt.#")
+      self.amqpSession.exchange_bind(exchange="qpid.management",
+                                     queue=self.topicName, binding_key="mgmt.#")
     if self.reqsOutstanding == 0 and self.syncInFlight:
       self.syncInFlight = False
       self.cv.notify()
@@ -901,13 +1033,23 @@
     elif opcode == 'g': self.session._handleContentInd      (self, codec, seq, prop=True, stat=True)
 
   def _exceptionCb(self, data):
-    pass
+    self.active = False
+    self.error = data
+    self.cv.acquire()
+    if self.syncInFlight:
+      self.cv.notify()
+    self.cv.release()
+    self.session._handleError(self.error)
 
 class Agent:
   """ """
-  def __init__(self, broker, bank):
+  def __init__(self, broker, bank, label):
     self.broker = broker
     self.bank   = bank
+    self.label  = label
+
+  def __repr__(self):
+    return "Agent at bank %d (%s)" % (self.bank, self.label)
 
 class Event:
   """ """
@@ -941,11 +1083,32 @@
     return data
 
 
-# TEST
+class DebugConsole(Console):
+  """ """
+  def newPackage(self, name):
+    print "newPackage:", name
+
+  def newClass(self, classKey):
+    print "newClass:", classKey
+
+  def newAgent(self, agent):
+    print "newAgent:", agent
+
+  def delAgent(self, agent):
+    print "delAgent:", agent
 
-#c = Console()
-#s = Session(c)
-#b = s.addBroker()
-#cl = s.getClasses("org.apache.qpid.broker")
-#sch = s.getSchema(cl[0])
+  def objectProps(self, broker, id, record):
+    print "objectProps:", record.getClassKey()
+
+  def objectStats(self, broker, id, record):
+    print "objectStats:", record.getClassKey()
+
+  def event(self, broker, event):
+    print "event:", event
+
+  def heartbeat(self, agent, timestamp):
+    print "heartbeat:", agent
+
+  def brokerInfo(self, broker):
+    print "brokerInfo:", broker