You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/09/19 22:54:09 UTC
svn commit: r697237 - in /incubator/qpid/trunk/qpid/python:
commands/qpid-config commands/qpid-route qpid/qmfconsole.py
Author: tross
Date: Fri Sep 19 13:54:08 2008
New Revision: 697237
URL: http://svn.apache.org/viewvc?rev=697237&view=rev
Log:
QPID-1288 - Added error handling and remote agent support to the console API. Ported qpid-config and qpid-route to the new API
Modified:
incubator/qpid/trunk/qpid/python/commands/qpid-config
incubator/qpid/trunk/qpid/python/commands/qpid-route
incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
Modified: incubator/qpid/trunk/qpid/python/commands/qpid-config
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-config?rev=697237&r1=697236&r2=697237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-config (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-config Fri Sep 19 13:54:08 2008
@@ -22,16 +22,7 @@
import os
import getopt
import sys
-import socket
-import qpid
-from threading import Condition
-from qpid.management import managementClient
-from qpid.managementdata import Broker
-from qpid.peer import Closed
-from qpid.connection import Connection, ConnectionFailed
-from qpid.datatypes import uuid4
-from qpid.util import connect
-from time import sleep
+from qpid import qmfconsole
_recursive = False
_host = "localhost"
@@ -78,44 +69,21 @@
class BrokerManager:
def __init__ (self):
- self.dest = None
- self.src = None
- self.broker = None
-
- def SetBroker (self, broker):
- self.broker = broker
-
- def ConnectToBroker (self):
- try:
- self.sessionId = "%s.%d" % (os.uname()[1], os.getpid())
- self.conn = Connection (connect (self.broker.host, self.broker.port),
- username=self.broker.username, password=self.broker.password)
- self.conn.start ()
- self.session = self.conn.session (self.sessionId)
- self.mclient = managementClient (self.conn.spec)
- self.mchannel = self.mclient.addChannel (self.session)
- except socket.error, e:
- print "Socket Error %s - %s" % (e[0], e[1])
- sys.exit (1)
- except Closed, e:
- print "Connect Failed %d - %s" % (e[0], e[1])
- sys.exit (1)
- except ConnectionFailed, e:
- print "Connect Failed %d - %s" % (e[0], e[1])
- sys.exit(1)
-
- def Disconnect (self):
- self.mclient.removeChannel (self.mchannel)
- self.session.close(timeout=10)
- self.conn.close(timeout=10)
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+
+ def SetBroker (self, brokerUrl):
+ self.url = brokerUrl
+ self.qmf = qmfconsole.Session()
+ self.broker = self.qmf.addBroker(brokerUrl)
+
+ def Disconnect(self):
+ self.qmf.delBroker(self.broker)
def Overview (self):
- self.ConnectToBroker ()
- mc = self.mclient
- mch = self.mchannel
- mc.syncWaitForStable (mch)
- exchanges = mc.syncGetObjects (mch, "exchange")
- queues = mc.syncGetObjects (mch, "queue")
+ exchanges = self.qmf.getObjects(name="exchange")
+ queues = self.qmf.getObjects(name="queue")
print "Total Exchanges: %d" % len (exchanges)
etype = {}
for ex in exchanges:
@@ -136,11 +104,7 @@
print " non-durable: %d" % (len (queues) - _durable)
def ExchangeList (self, filter):
- self.ConnectToBroker ()
- mc = self.mclient
- mch = self.mchannel
- mc.syncWaitForStable (mch)
- exchanges = mc.syncGetObjects (mch, "exchange")
+ exchanges = self.qmf.getObjects(name="exchange")
print "Durable Type Bindings Exchange Name"
print "======================================================="
for ex in exchanges:
@@ -148,18 +112,14 @@
print "%4c %-10s%5d %s" % (YN (ex.durable), ex.type, ex.bindingCount, ex.name)
def ExchangeListRecurse (self, filter):
- self.ConnectToBroker ()
- mc = self.mclient
- mch = self.mchannel
- mc.syncWaitForStable (mch)
- exchanges = mc.syncGetObjects (mch, "exchange")
- bindings = mc.syncGetObjects (mch, "binding")
- queues = mc.syncGetObjects (mch, "queue")
+ exchanges = self.qmf.getObjects(name="exchange")
+ bindings = self.qmf.getObjects(name="binding")
+ queues = self.qmf.getObjects(name="queue")
for ex in exchanges:
if self.match (ex.name, filter):
print "Exchange '%s' (%s)" % (ex.name, ex.type)
for bind in bindings:
- if bind.exchangeRef == ex.id:
+ if bind.exchangeRef == ex.getObjectId():
qname = "<unknown>"
queue = self.findById (queues, bind.queueRef)
if queue != None:
@@ -168,12 +128,8 @@
def QueueList (self, filter):
- self.ConnectToBroker ()
- mc = self.mclient
- mch = self.mchannel
- mc.syncWaitForStable (mch)
- queues = mc.syncGetObjects (mch, "queue")
- journals = mc.syncGetObjects (mch, "journal")
+ queues = self.qmf.getObjects(name="queue")
+ journals = self.qmf.getObjects(name="journal")
print " Store Size"
print "Durable AutoDel Excl Bindings (files x file pages) Queue Name"
print "==========================================================================================="
@@ -193,18 +149,14 @@
YN (q.exclusive), q.bindingCount, q.name)
def QueueListRecurse (self, filter):
- self.ConnectToBroker ()
- mc = self.mclient
- mch = self.mchannel
- mc.syncWaitForStable (mch)
- exchanges = mc.syncGetObjects (mch, "exchange")
- bindings = mc.syncGetObjects (mch, "binding")
- queues = mc.syncGetObjects (mch, "queue")
+ exchanges = self.qmf.getObjects(name="exchange")
+ bindings = self.qmf.getObjects(name="binding")
+ queues = self.qmf.getObjects(name="queue")
for queue in queues:
if self.match (queue.name, filter):
print "Queue '%s'" % queue.name
for bind in bindings:
- if bind.queueRef == queue.id:
+ if bind.queueRef == queue.getObjectId():
ename = "<unknown>"
ex = self.findById (exchanges, bind.exchangeRef)
if ex != None:
@@ -216,30 +168,19 @@
def AddExchange (self, args):
if len (args) < 2:
Usage ()
- self.ConnectToBroker ()
etype = args[0]
ename = args[1]
-
- try:
- self.session.exchange_declare (exchange=ename, type=etype, durable=_durable)
- except Closed, e:
- print "Failed:", e
+ self.broker.getAmqpSession().exchange_declare (exchange=ename, type=etype, durable=_durable)
def DelExchange (self, args):
if len (args) < 1:
Usage ()
- self.ConnectToBroker ()
ename = args[0]
-
- try:
- self.session.exchange_delete (exchange=ename)
- except Closed, e:
- print "Failed:", e
+ self.broker.getAmqpSession().exchange_delete (exchange=ename)
def AddQueue (self, args):
if len (args) < 1:
Usage ()
- self.ConnectToBroker ()
qname = args[0]
declArgs = {}
if _durable:
@@ -251,55 +192,37 @@
if _maxQueueCount:
declArgs[MAX_QUEUE_COUNT] = _maxQueueCount
- try:
- self.session.queue_declare (queue=qname, durable=_durable, arguments=declArgs)
- except Closed, e:
- print "Failed:", e
+ self.broker.getAmqpSession().queue_declare (queue=qname, durable=_durable, arguments=declArgs)
def DelQueue (self, args):
if len (args) < 1:
Usage ()
- self.ConnectToBroker ()
qname = args[0]
-
- try:
- self.session.queue_delete (queue=qname)
- except Closed, e:
- print "Failed:", e
+ self.broker.getAmqpSession().queue_delete (queue=qname)
def Bind (self, args):
if len (args) < 2:
Usage ()
- self.ConnectToBroker ()
ename = args[0]
qname = args[1]
key = ""
if len (args) > 2:
key = args[2]
-
- try:
- self.session.exchange_bind (queue=qname, exchange=ename, binding_key=key)
- except Closed, e:
- print "Failed:", e
+ self.broker.getAmqpSession().exchange_bind (queue=qname, exchange=ename, binding_key=key)
def Unbind (self, args):
if len (args) < 2:
Usage ()
- self.ConnectToBroker ()
ename = args[0]
qname = args[1]
key = ""
if len (args) > 2:
key = args[2]
-
- try:
- self.session.exchange_unbind (queue=qname, exchange=ename, binding_key=key)
- except Closed, e:
- print "Failed:", e
+ self.broker.getAmqpSession().exchange_unbind (queue=qname, exchange=ename, binding_key=key)
def findById (self, items, id):
for item in items:
- if item.id == id:
+ if item.getObjectId() == id:
return item
return None
@@ -343,43 +266,48 @@
nargs = len (cargs)
bm = BrokerManager ()
-bm.SetBroker (Broker (_host))
-if nargs == 0:
- bm.Overview ()
-else:
- cmd = cargs[0]
- modifier = ""
- if nargs > 1:
- modifier = cargs[1]
- if cmd[0] == 'e':
- if _recursive:
- bm.ExchangeListRecurse (modifier)
- else:
- bm.ExchangeList (modifier)
- elif cmd[0] == 'q':
- if _recursive:
- bm.QueueListRecurse (modifier)
- else:
- bm.QueueList (modifier)
- elif cmd == "add":
- if modifier == "exchange":
- bm.AddExchange (cargs[2:])
- elif modifier == "queue":
- bm.AddQueue (cargs[2:])
- else:
- Usage ()
- elif cmd == "del":
- if modifier == "exchange":
- bm.DelExchange (cargs[2:])
- elif modifier == "queue":
- bm.DelQueue (cargs[2:])
+try:
+ bm.SetBroker(qmfconsole.BrokerURL(_host))
+ if nargs == 0:
+ bm.Overview ()
+ else:
+ cmd = cargs[0]
+ modifier = ""
+ if nargs > 1:
+ modifier = cargs[1]
+ if cmd[0] == 'e':
+ if _recursive:
+ bm.ExchangeListRecurse (modifier)
+ else:
+ bm.ExchangeList (modifier)
+ elif cmd[0] == 'q':
+ if _recursive:
+ bm.QueueListRecurse (modifier)
+ else:
+ bm.QueueList (modifier)
+ elif cmd == "add":
+ if modifier == "exchange":
+ bm.AddExchange (cargs[2:])
+ elif modifier == "queue":
+ bm.AddQueue (cargs[2:])
+ else:
+ Usage ()
+ elif cmd == "del":
+ if modifier == "exchange":
+ bm.DelExchange (cargs[2:])
+ elif modifier == "queue":
+ bm.DelQueue (cargs[2:])
+ else:
+ Usage ()
+ elif cmd == "bind":
+ bm.Bind (cargs[1:])
+ elif cmd == "unbind":
+ bm.Unbind (cargs[1:])
else:
Usage ()
- elif cmd == "bind":
- bm.Bind (cargs[1:])
- elif cmd == "unbind":
- bm.Unbind (cargs[1:])
- else:
- Usage ()
+except Exception,e:
+ print "Failed:", e.message
+ sys.exit(1)
+
bm.Disconnect()
Modified: incubator/qpid/trunk/qpid/python/commands/qpid-route
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-route?rev=697237&r1=697236&r2=697237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-route (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-route Fri Sep 19 13:54:08 2008
@@ -22,13 +22,8 @@
import getopt
import sys
import socket
-import qpid
import os
-from qpid.management import managementClient
-from qpid.managementdata import Broker
-from qpid.peer import Closed
-from qpid.connection import Connection, ConnectionFailed
-from qpid.util import connect
+from qpid import qmfconsole
def Usage ():
print "Usage: qpid-route [OPTIONS] link add <dest-broker> <src-broker>"
@@ -58,93 +53,57 @@
class RouteManager:
def __init__ (self, destBroker):
- self.dest = Broker (destBroker)
+ self.dest = qmfconsole.BrokerURL(destBroker)
self.src = None
-
- def ConnectToBroker (self):
- broker = self.dest
- if _verbose:
- print "Connecting to broker: %s:%d" % (broker.host, broker.port)
- try:
- self.sessionId = "%s.%d" % (os.uname()[1], os.getpid())
- self.conn = Connection (connect (broker.host, broker.port), \
- username=broker.username, password=broker.password)
- self.conn.start ()
- self.session = self.conn.session(self.sessionId)
- self.mclient = managementClient (self.conn.spec)
- self.mch = self.mclient.addChannel (self.session)
- self.mclient.syncWaitForStable (self.mch)
- except socket.error, e:
- print "Socket Error %s - %s" % (e[0], e[1])
- sys.exit (1)
- except Closed, e:
- print "Connect Failed %d - %s" % (e[0], e[1])
- sys.exit (1)
- except ConnectionFailed, e:
- print "Connect Failed %d - %s" % (e[0], e[1])
- sys.exit(1)
+ self.qmf = qmfconsole.Session()
+ self.broker = self.qmf.addBroker(destBroker)
def Disconnect (self):
- self.mclient.removeChannel (self.mch)
- self.session.close(timeout=10)
- self.conn.close(timeout=10)
+ self.qmf.delBroker(self.broker)
def getLink (self):
- links = self.mclient.syncGetObjects (self.mch, "link")
+ links = self.qmf.getObjects(name="link")
for link in links:
if "%s:%d" % (link.host, link.port) == self.src.name ():
return link
return None
def AddLink (self, srcBroker):
- self.src = Broker (srcBroker)
- mc = self.mclient
-
+ self.src = qmfconsole.BrokerURL(srcBroker)
if self.dest.name() == self.src.name():
print "Linking broker to itself is not permitted"
sys.exit(1)
- brokers = mc.syncGetObjects (self.mch, "broker")
+ brokers = self.qmf.getObjects(name="broker")
broker = brokers[0]
link = self.getLink()
if link != None:
- print "Link already exists"
- sys.exit(1)
+ raise Exception("Link already exists")
- connectArgs = {}
- connectArgs["host"] = self.src.host
- connectArgs["port"] = self.src.port
- connectArgs["useSsl"] = False
- connectArgs["durable"] = _durable
- if self.src.username == "anonymous":
- connectArgs["authMechanism"] = "ANONYMOUS"
+ if self.src.authName == "anonymous":
+ mech = "ANONYMOUS"
else:
- connectArgs["authMechanism"] = "PLAIN"
- connectArgs["username"] = self.src.username
- connectArgs["password"] = self.src.password
- res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs)
+ mech = "PLAIN"
+ res = broker.connect(self.src.host, self.src.port, False, _durable,
+ mech, self.src.authName, self.src.authPass)
if _verbose:
- print "Connect method returned:", res.status, res.statusText
- link = self.getLink ()
+ print "Connect method returned:", res.status, res.text
+ link = self.getLink()
def DelLink (self, srcBroker):
- self.src = Broker (srcBroker)
- mc = self.mclient
-
- brokers = mc.syncGetObjects (self.mch, "broker")
+ self.src = qmfconsole.BrokerURL(srcBroker)
+ brokers = self.qmf.getObjects(name="broker")
broker = brokers[0]
link = self.getLink()
if link == None:
- print "Link not found"
- sys.exit(1)
+ raise Exception("Link not found")
- res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
+ res = link.close()
if _verbose:
- print "Close method returned:", res.status, res.statusText
+ print "Close method returned:", res.status, res.text
def ListLinks (self):
- mc = self.mclient
- links = mc.syncGetObjects (self.mch, "link")
+ links = self.qmf.getObjects(name="link")
if len(links) == 0:
print "No Links Found"
else:
@@ -152,145 +111,118 @@
print "Host Port Durable State Last Error"
print "==================================================================="
for link in links:
- print "%-16s%-8d %c %-18s%s" % (link.host, link.port, YN(link.durable), link.state, link.lastError)
+ print "%-16s%-8d %c %-18s%s" % \
+ (link.host, link.port, YN(link.durable), link.state, link.lastError)
def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes):
- self.src = Broker (srcBroker)
- mc = self.mclient
-
+ self.src = qmfconsole.BrokerURL(srcBroker)
if self.dest.name() == self.src.name():
- print "Linking broker to itself is not permitted"
- sys.exit(1)
+ raise Exception("Linking broker to itself is not permitted")
- brokers = mc.syncGetObjects (self.mch, "broker")
+ brokers = self.qmf.getObjects(name="broker")
broker = brokers[0]
- link = self.getLink ()
+ link = self.getLink()
if link == None:
if _verbose:
print "Inter-broker link not found, creating..."
- connectArgs = {}
- connectArgs["host"] = self.src.host
- connectArgs["port"] = self.src.port
- connectArgs["useSsl"] = False
- connectArgs["durable"] = _durable
- if self.src.username == "anonymous":
- connectArgs["authMechanism"] = "ANONYMOUS"
+ if self.src.authName == "anonymous":
+ mech = "ANONYMOUS"
else:
- connectArgs["authMechanism"] = "PLAIN"
- connectArgs["username"] = self.src.username
- connectArgs["password"] = self.src.password
- res = mc.syncCallMethod (self.mch, broker.id, broker.classKey, "connect", connectArgs)
+ mech = "PLAIN"
+ res = broker.connect(self.src.host, self.src.port, False, _durable,
+ mech, self.src.authName, self.src.authPass)
if _verbose:
- print "Connect method returned:", res.status, res.statusText
- link = self.getLink ()
+ print "Connect method returned:", res.status, res.text
+ link = self.getLink()
if link == None:
- print "Protocol Error - Missing link ID"
- sys.exit (1)
+ raise Exception("Protocol Error - Missing link ID")
- bridges = mc.syncGetObjects (self.mch, "bridge")
+ bridges = self.qmf.getObjects(name="bridge")
for bridge in bridges:
- if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey:
+ if bridge.linkRef == link.getObjectId() and \
+ bridge.dest == exchange and bridge.key == routingKey:
if not _quiet:
- print "Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey)
- sys.exit (1)
+ raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey))
sys.exit (0)
if _verbose:
print "Creating inter-broker binding..."
- bridgeArgs = {}
- bridgeArgs["durable"] = _durable
- bridgeArgs["src"] = exchange
- bridgeArgs["dest"] = exchange
- bridgeArgs["key"] = routingKey
- bridgeArgs["tag"] = tag
- bridgeArgs["excludes"] = excludes
- bridgeArgs["srcIsQueue"] = 0
- bridgeArgs["srcIsLocal"] = 0
- res = mc.syncCallMethod (self.mch, link.id, link.classKey, "bridge", bridgeArgs)
+ res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, 0, 0)
if res.status == 4:
- print "Can't create a durable route on a non-durable link"
- sys.exit(1)
+ raise Exception("Can't create a durable route on a non-durable link")
if _verbose:
print "Bridge method returned:", res.status, res.statusText
def DelRoute (self, srcBroker, exchange, routingKey):
- self.src = Broker (srcBroker)
- mc = self.mclient
-
- link = self.getLink ()
+ self.src = qmfconsole.BrokerURL(srcBroker)
+ link = self.getLink()
if link == None:
if not _quiet:
- print "No link found from %s to %s" % (self.src.name(), self.dest.name())
- sys.exit (1)
+ raise Exception("No link found from %s to %s" % (self.src.name(), self.dest.name()))
sys.exit (0)
- bridges = mc.syncGetObjects (self.mch, "bridge")
+ bridges = self.qmf.getObjects(name="bridge")
for bridge in bridges:
- if bridge.linkRef == link.id and bridge.dest == exchange and bridge.key == routingKey:
+ if bridge.linkRef == link.getObjectId() and bridge.dest == exchange and bridge.key == routingKey:
if _verbose:
print "Closing bridge..."
- res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close")
+ res = bridge.close()
if res.status != 0:
- print "Error closing bridge: %d - %s" % (res.status, res.statusText)
- sys.exit (1)
+ raise Exception("Error closing bridge: %d - %s" % (res.status, res.statusText))
if len (bridges) == 1 and _dellink:
link = self.getLink ()
if link == None:
sys.exit (0)
if _verbose:
print "Last bridge on link, closing link..."
- res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
+ res = link.close()
if res.status != 0:
- print "Error closing link: %d - %s" % (res.status, res.statusText)
- sys.exit (1)
+ raise Exception("Error closing link: %d - %s" % (res.status, res.statusText))
sys.exit (0)
if not _quiet:
- print "Route not found"
- sys.exit (1)
+ raise Exception("Route not found")
def ListRoutes (self):
- mc = self.mclient
- links = mc.syncGetObjects (self.mch, "link")
- bridges = mc.syncGetObjects (self.mch, "bridge")
+ links = self.qmf.getObjects(name="link")
+ bridges = self.qmf.getObjects(name="bridge")
for bridge in bridges:
myLink = None
for link in links:
- if bridge.linkRef == link.id:
+ if bridge.linkRef == link.getObjectId():
myLink = link
break
if myLink != None:
print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, bridge.key)
def ClearAllRoutes (self):
- mc = self.mclient
- links = mc.syncGetObjects (self.mch, "link")
- bridges = mc.syncGetObjects (self.mch, "bridge")
+ links = self.qmf.getObjects(name="link")
+ bridges = self.qmf.getObjects(name="bridge")
for bridge in bridges:
if _verbose:
myLink = None
for link in links:
- if bridge.linkRef == link.id:
+ if bridge.linkRef == link.getObjectId():
myLink = link
break
if myLink != None:
print "Deleting Bridge: %s:%d %s %s... " % (myLink.host, myLink.port, bridge.dest, bridge.key),
- res = mc.syncCallMethod (self.mch, bridge.id, bridge.classKey, "close")
+ res = bridge.close()
if res.status != 0:
print "Error: %d - %s" % (res.status, res.statusText)
elif _verbose:
print "Ok"
if _dellink:
- links = mc.syncGetObjects (self.mch, "link")
+ links = self.qmf.getObjects(name="link")
for link in links:
if _verbose:
print "Deleting Link: %s:%d... " % (link.host, link.port),
- res = mc.syncCallMethod (self.mch, link.id, link.classKey, "close")
+ res = link.close()
if res.status != 0:
print "Error: %d - %s" % (res.status, res.statusText)
elif _verbose:
@@ -331,41 +263,45 @@
group = cargs[0]
cmd = cargs[1]
-rm = RouteManager (destBroker)
-rm.ConnectToBroker ()
-if group == "link":
- if cmd == "add":
- if nargs != 4:
- Usage()
- rm.AddLink (cargs[3])
- elif cmd == "del":
- if nargs != 4:
- Usage()
- rm.DelLink (cargs[3])
- elif cmd == "list":
- rm.ListLinks ()
-
-elif group == "route":
- if cmd == "add":
- if nargs < 6 or nargs > 8:
- Usage ()
-
- tag = ""
- excludes = ""
- if nargs > 6: tag = cargs[6]
- if nargs > 7: excludes = cargs[7]
- rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes)
- elif cmd == "del":
- if nargs != 6:
- Usage ()
- else:
- rm.DelRoute (cargs[3], cargs[4], cargs[5])
- else:
- if cmd == "list":
- rm.ListRoutes ()
- elif cmd == "flush":
- rm.ClearAllRoutes ()
+try:
+ rm = RouteManager (destBroker)
+ if group == "link":
+ if cmd == "add":
+ if nargs != 4:
+ Usage()
+ rm.AddLink (cargs[3])
+ elif cmd == "del":
+ if nargs != 4:
+ Usage()
+ rm.DelLink (cargs[3])
+ elif cmd == "list":
+ rm.ListLinks ()
+
+ elif group == "route":
+ if cmd == "add":
+ if nargs < 6 or nargs > 8:
+ Usage ()
+
+ tag = ""
+ excludes = ""
+ if nargs > 6: tag = cargs[6]
+ if nargs > 7: excludes = cargs[7]
+ rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes)
+ elif cmd == "del":
+ if nargs != 6:
+ Usage ()
+ else:
+ rm.DelRoute (cargs[3], cargs[4], cargs[5])
else:
- Usage ()
+ if cmd == "list":
+ rm.ListRoutes ()
+ elif cmd == "flush":
+ rm.ClearAllRoutes ()
+ else:
+ Usage ()
+except Exception,e:
+ print "Failed:", e.message
+ sys.exit(1)
+
rm.Disconnect ()
Modified: incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py?rev=697237&r1=697236&r2=697237&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/qmfconsole.py Fri Sep 19 13:54:08 2008
@@ -23,6 +23,7 @@
import qpid
import struct
import socket
+import re
from qpid.peer import Closed
from qpid.connection import Connection, ConnectionFailed
from qpid.datatypes import uuid4
@@ -46,10 +47,14 @@
used to obtain details about the class."""
pass
- def newAgent(self, broker, agent):
+ def newAgent(self, agent):
""" Invoked when a QMF agent is discovered. """
pass
+ def delAgent(self, agent):
+ """ Invoked when a QMF agent disconects. """
+ pass
+
def objectProps(self, broker, id, record):
""" Invoked when an object is updated. """
pass
@@ -70,6 +75,25 @@
""" """
pass
+class BrokerURL:
+ def __init__(self, text):
+ rex = re.compile(r"""
+ # [ <user> [ / <password> ] @] <host> [ :<port> ]
+ ^ (?: ([^/]*) (?: / ([^@]*) )? @)? ([^:]+) (?: :([0-9]+))?$""", re.X)
+ match = rex.match(text)
+ if not match: raise ValueError("'%s' is not a valid broker url" % (text))
+ user, password, host, port = match.groups()
+
+ self.host = socket.gethostbyname(host)
+ if port: self.port = int(port)
+ else: self.port = 5672
+ self.authName = user or "guest"
+ self.authPass = password or "guest"
+ self.authMech = "PLAIN"
+
+ def name(self):
+ return self.host + ":" + str(self.port)
+
class Session:
"""
An instance of the Session class represents a console session running
@@ -95,12 +119,17 @@
self.cv = Condition()
self.syncSequenceList = []
self.getResult = []
+ self.error = None
+
+ def __repr__(self):
+ return "QMF Console Session Manager (brokers connected: %d)" % len(self.brokers)
- def addBroker(self, host="localhost", port=5672,
- authMech="PLAIN", authName="guest", authPass="guest"):
+ def addBroker(self, target="localhost"):
""" Connect to a Qpid broker. Returns an object of type Broker. """
- broker = Broker(self, host, port, authMech, authName, authPass)
+ url = BrokerURL(target)
+ broker = Broker(self, url.host, url.port, url.authMech, url.authName, url.authPass)
self.brokers.append(broker)
+ self.getObjects(broker=broker, name="agent")
return broker
def delBroker(self, broker):
@@ -138,11 +167,22 @@
if (cname, hash) in self.packages[pname]:
return self.packages[pname][(cname, hash)]
- def getAgents(self):
+ def getAgents(self, broker=None):
""" Get a list of currently known agents """
- for broker in self.brokers:
- broker._waitForStable()
- pass
+ brokerList = []
+ if broker == None:
+ for b in self.brokers:
+ brokerList.append(b)
+ else:
+ brokerList.append(broker)
+
+ for b in brokerList:
+ b._waitForStable()
+ agentList = []
+ for b in brokerList:
+ for a in b.getAgents():
+ agentList.append(a)
+ return agentList
def getObjects(self, **kwargs):
""" Get a list of objects from QMF agents.
@@ -165,7 +205,8 @@
broker = <broker> - supply a broker as returned by addBroker
"""
if "broker" in kwargs:
- brokerList = [].append(kwargs["broker"])
+ brokerList = []
+ brokerList.append(kwargs["broker"])
else:
brokerList = self.brokers
for broker in brokerList:
@@ -176,7 +217,7 @@
agent = kwargs["agent"]
if agent.broker not in brokerList:
raise Exception("Supplied agent is not accessible through the supplied broker")
- agentList = append(agent)
+ agentList.append(agent)
else:
for broker in brokerList:
for agent in broker.getAgents():
@@ -209,7 +250,7 @@
starttime = time()
timeout = False
self.cv.acquire()
- while len(self.syncSequenceList) > 0:
+ while len(self.syncSequenceList) > 0 and self.error == None:
self.cv.wait(self.GET_WAIT_TIME)
if time() - starttime > self.GET_WAIT_TIME:
for pendingSeq in self.syncSequenceList:
@@ -218,6 +259,11 @@
timeout = True
self.cv.release()
+ if self.error:
+ errorText = self.error
+ self.error = None
+ raise Exception(errorText)
+
if len(self.getResult) == 0 and timeout:
raise RuntimeError("No agent responded within timeout period")
return self.getResult
@@ -351,6 +397,8 @@
self.cv.release()
schema = self.packages[pname][(cname, hash)]
object = Object(self, broker, schema, codec, prop, stat)
+ if pname == "org.apache.qpid.broker" and cname == "agent":
+ broker._updateAgent(object)
self.cv.acquire()
if seq in self.syncSequenceList:
@@ -365,6 +413,13 @@
if stat:
self.console.objectStats(broker, object.getObjectId(), object)
+ def _handleError(self, error):
+ self.error = error
+ self.cv.acquire()
+ self.syncSequenceList = []
+ self.cv.notify()
+ self.cv.release()
+
class Package:
""" """
def __init__(self, name):
@@ -407,23 +462,23 @@
return result
def getKey(self):
- """ """
+ """ Return the class-key for this class. """
return self.classKey
def getProperties(self):
- """ """
+ """ Return the list of properties for the class. """
return self.properties
def getStatistics(self):
- """ """
+ """ Return the list of statistics for the class. """
return self.statistics
def getMethods(self):
- """ """
+ """ Return the list of methods for the class. """
return self.methods
def getEvents(self):
- """ """
+ """ Return the list of events for the class. """
return self.events
class SchemaProperty:
@@ -448,6 +503,9 @@
elif key == "maxlen" : self.maxlen = value
elif key == "desc" : self.desc = str(value)
+ def __repr__(self):
+ return self.name
+
class SchemaStatistic:
""" """
def __init__(self, codec):
@@ -461,6 +519,9 @@
if key == "unit" : self.unit = str(value)
elif key == "desc" : self.desc = str(value)
+ def __repr__(self):
+ return self.name
+
class SchemaMethod:
""" """
def __init__(self, codec):
@@ -476,6 +537,19 @@
for idx in range(argCount):
self.arguments.append(SchemaArgument(codec, methodArg=True))
+ def __repr__(self):
+ result = self.name + "("
+ first = True
+ for arg in self.arguments:
+ if arg.dir.find("I") != -1:
+ if first:
+ first = False
+ else:
+ result += ", "
+ result += arg.name
+ result += ")"
+ return result
+
class SchemaEvent:
""" """
def __init__(self, codec):
@@ -491,6 +565,18 @@
for idx in range(argCount):
self.arguments.append(SchemaArgument(codec, methodArg=False))
+ def __repr__(self):
+ result = self.name + "("
+ first = True
+ for arg in self.arguments:
+ if first:
+ first = False
+ else:
+ result += ", "
+ result += arg.name
+ result += ")"
+ return result
+
class SchemaArgument:
""" """
def __init__(self, codec, methodArg):
@@ -538,12 +624,8 @@
return 0
def __repr__(self):
- return "%08x-%04x-%04x-%04x-%04x%08x" % ((self.first & 0xFFFFFFFF00000000) >> 32,
- (self.first & 0x00000000FFFF0000) >> 16,
- (self.first & 0x000000000000FFFF),
- (self.second & 0xFFFF000000000000) >> 48,
- (self.second & 0x0000FFFF00000000) >> 32,
- (self.second & 0x00000000FFFFFFFF))
+ return "%d-%d-%d-%d-%x" % (self.getFlags(), self.getSequence(),
+ self.getBroker(), self.getBank(), self.getObject())
def index(self):
return (self.first, self.second)
@@ -596,23 +678,27 @@
self.statistics.append((statistic, self._decodeValue(codec, statistic.type)))
def getObjectId(self):
- """ """
+ """ Return the object identifier for this object """
return self.objectId
def getClassKey(self):
- """ """
+ """ Return the class-key that references the schema describing this object. """
return self.schema.getKey()
def getSchema(self):
- """ """
+ """ Return the schema that describes this object. """
return self.schema
+ def getMethods(self):
+ """ Return a list of methods available for this object. """
+ return self.schema.getMethods()
+
def getTimestamps(self):
- """ """
+ """ Return the current, creation, and deletion times for this object. """
return self.currentTime, self.createTime, self.deleteTime
def getIndex(self):
- """ """
+ """ Return a string describing this object's primary key. """
result = ""
for property, value in self.properties:
if property.index:
@@ -634,6 +720,7 @@
for statistic, value in self.statistics:
if name == statistic.name:
return value
+ raise Exception("Type Object has no attribute '%s'" % name)
def _invoke(self, name, args, kwargs):
for method in self.schema.getMethods():
@@ -653,20 +740,27 @@
self._encodeValue(sendCodec, args[aIdx], arg.type)
aIdx += 1
smsg = self.broker._message(sendCodec.encoded, "agent." + str(self.objectId.getBank()))
- self.broker._send(smsg)
self.broker.cv.acquire()
self.broker.syncInFlight = True
+ self.broker.cv.release()
+
+ self.broker._send(smsg)
+
+ self.broker.cv.acquire()
starttime = time()
- while self.broker.syncInFlight:
+ while self.broker.syncInFlight and self.broker.error == None:
self.broker.cv.wait(self.broker.SYNC_TIME)
if time() - starttime > self.broker.SYNC_TIME:
self.broker.cv.release()
self.session.seqMgr._release(seq)
raise RuntimeError("Timed out waiting for method to respond")
self.broker.cv.release()
+ if self.broker.error != None:
+ errorText = self.broker.error
+ self.broker.error = None
+ raise Exception(errorText)
return self.broker.syncResult
- else:
- raise Exception("Invalid Method (software defect)")
+ raise Exception("Invalid Method (software defect) [%s]" % name)
def _parsePresenceMasks(self, codec, schema):
excludeList = []
@@ -747,18 +841,21 @@
class Broker:
""" """
- SYNC_TIME = 10
+ SYNC_TIME = 10
def __init__(self, session, host, port, authMech, authUser, authPass):
self.session = session
- self.agents = []
- self.agents.append(Agent(self, 0))
+ self.host = host
+ self.port = port
+ self.agents = {}
+ self.agents[0] = Agent(self, 0, "BrokerAgent")
self.topicBound = False
self.cv = Condition()
self.syncInFlight = False
self.syncRequest = 0
self.syncResult = None
self.reqsOutstanding = 1
+ self.error = None
self.brokerId = None
err = None
try:
@@ -767,7 +864,7 @@
self.conn.start()
self.replyName = "reply-%s" % self.amqpSessionId
self.amqpSession = self.conn.session(self.amqpSessionId)
- self.amqpSession.auto_sync = False
+ self.amqpSession.auto_sync = True
self.amqpSession.queue_declare(queue=self.replyName, exclusive=True, auto_delete=True)
self.amqpSession.exchange_bind(exchange="amq.direct",
queue=self.replyName, binding_key=self.replyName)
@@ -798,6 +895,7 @@
except ConnectionFailed, e:
err = "Connect Failed %d - %s" % (e[0], e[1])
+ self.active = True
if err != None:
raise Exception(err)
@@ -811,7 +909,36 @@
def getAgents(self):
""" Get the list of agents reachable via this broker """
- return self.agents
+ return self.agents.values()
+
+ def getAmqpSession(self):
+ """ Get the AMQP session object for this connected broker. """
+ return self.amqpSession
+
+ def isConnected(self):
+ return self.active
+
+ def __repr__(self):
+ if self.active:
+ if self.port == 5672:
+ port = ""
+ else:
+ port = ":%d" % self.port
+ return "Broker connected at: amqp://%s%s" % (self.host, port)
+ else:
+ return "Disconnected Broker"
+
+ def _updateAgent(self, obj):
+ if obj.deleteTime == 0:
+ if obj.objectIdBank not in self.agents:
+ agent = Agent(self, obj.objectIdBank, obj.label)
+ self.agents[obj.objectIdBank] = agent
+ if self.session.console != None:
+ self.session.console.newAgent(agent)
+ else:
+ agent = self.agents.pop(obj.objectIdBank, None)
+ if agent != None and self.session.console != None:
+ self.session.console.delAgent(agent)
def _setHeader(self, codec, opcode, seq=0):
""" Compose the header of a management message. """
@@ -848,10 +975,14 @@
self.amqpSession.message_transfer(destination=dest, message=msg)
def _shutdown(self):
- self.amqpSession.incoming("rdest").stop()
- if self.session.console != None:
- self.amqpSession.incoming("tdest").stop()
- self.amqpSession.close()
+ if self.active:
+ self.amqpSession.incoming("rdest").stop()
+ if self.session.console != None:
+ self.amqpSession.incoming("tdest").stop()
+ self.amqpSession.close()
+ self.active = False
+ else:
+ raise Exception("Broker already disconnected")
def _waitForStable(self):
self.cv.acquire()
@@ -877,7 +1008,8 @@
self.reqsOutstanding -= 1
if self.reqsOutstanding == 0 and not self.topicBound and self.session.console != None:
self.topicBound = True
- self.amqpSession.exchange_bind(exchange="qpid.management", queue=self.topicName, binding_key="mgmt.#")
+ self.amqpSession.exchange_bind(exchange="qpid.management",
+ queue=self.topicName, binding_key="mgmt.#")
if self.reqsOutstanding == 0 and self.syncInFlight:
self.syncInFlight = False
self.cv.notify()
@@ -901,13 +1033,23 @@
elif opcode == 'g': self.session._handleContentInd (self, codec, seq, prop=True, stat=True)
def _exceptionCb(self, data):
- pass
+ self.active = False
+ self.error = data
+ self.cv.acquire()
+ if self.syncInFlight:
+ self.cv.notify()
+ self.cv.release()
+ self.session._handleError(self.error)
class Agent:
""" """
- def __init__(self, broker, bank):
+ def __init__(self, broker, bank, label):
self.broker = broker
self.bank = bank
+ self.label = label
+
+ def __repr__(self):
+ return "Agent at bank %d (%s)" % (self.bank, self.label)
class Event:
""" """
@@ -941,11 +1083,32 @@
return data
-# TEST
+class DebugConsole(Console):
+ """ """
+ def newPackage(self, name):
+ print "newPackage:", name
+
+ def newClass(self, classKey):
+ print "newClass:", classKey
+
+ def newAgent(self, agent):
+ print "newAgent:", agent
+
+ def delAgent(self, agent):
+ print "delAgent:", agent
-#c = Console()
-#s = Session(c)
-#b = s.addBroker()
-#cl = s.getClasses("org.apache.qpid.broker")
-#sch = s.getSchema(cl[0])
+ def objectProps(self, broker, id, record):
+ print "objectProps:", record.getClassKey()
+
+ def objectStats(self, broker, id, record):
+ print "objectStats:", record.getClassKey()
+
+ def event(self, broker, event):
+ print "event:", event
+
+ def heartbeat(self, agent, timestamp):
+ print "heartbeat:", agent
+
+ def brokerInfo(self, broker):
+ print "brokerInfo:", broker