You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/11/05 18:16:47 UTC
svn commit: r711628 - /incubator/qpid/trunk/qpid/python/commands/qpid-route
Author: tross
Date: Wed Nov 5 09:16:43 2008
New Revision: 711628
URL: http://svn.apache.org/viewvc?rev=711628&view=rev
Log:
Added support for push-routes and queue-based routes.
Cleaned up the code a bit.
Modified:
incubator/qpid/trunk/qpid/python/commands/qpid-route
Modified: incubator/qpid/trunk/qpid/python/commands/qpid-route
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-route?rev=711628&r1=711627&r2=711628&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-route (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-route Wed Nov 5 09:16:43 2008
@@ -25,80 +25,81 @@
import os
from qpid import qmfconsole
-def Usage ():
- print "Usage: qpid-route [OPTIONS] link add <dest-broker> <src-broker>"
- print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>"
- print " qpid-route [OPTIONS] link list [<dest-broker>]"
- print " qpid-route [OPTIONS] link map [<broker>]"
+def Usage():
+ print "Usage: qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]"
+ print " qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>"
print
print " qpid-route [OPTIONS] route add <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list]"
print " qpid-route [OPTIONS] route del <dest-broker> <src-broker> <exchange> <routing-key>"
+ print " qpid-route [OPTIONS] queue add <dest-broker> <src-broker> <exchange> <queue>"
+ print " qpid-route [OPTIONS] queue del <dest-broker> <src-broker> <exchange> <queue>"
print " qpid-route [OPTIONS] route list [<dest-broker>]"
print " qpid-route [OPTIONS] route flush [<dest-broker>]"
- print " qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]"
- print " qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>"
+ print " qpid-route [OPTIONS] route map [<broker>]"
+ print
+ print " qpid-route [OPTIONS] link add <dest-broker> <src-broker>"
+ print " qpid-route [OPTIONS] link del <dest-broker> <src-broker>"
+ print " qpid-route [OPTIONS] link list [<dest-broker>]"
print
print "Options:"
print " -v [ --verbose ] Verbose output"
print " -q [ --quiet ] Quiet output, don't print duplicate warnings"
print " -d [ --durable ] Added configuration shall be durable"
print " -e [ --del-empty-link ] Delete link after deleting last route on the link"
+ print " -s [ --src-local ] Make connection to source broker (push route)"
print " -t <transport> [ --transport <transport>]"
print " Specify transport to use for links, defaults to tcp"
print
print " dest-broker and src-broker are in the form: [username/password@] hostname | ip-address [:<port>]"
print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
print
- sys.exit (1)
+ sys.exit(1)
-_verbose = False
-_quiet = False
-_durable = False
-_dellink = False
+_verbose = False
+_quiet = False
+_durable = False
+_dellink = False
+_srclocal = False
_transport = "tcp"
class RouteManager:
- def __init__ (self, destBroker):
- self.dest = qmfconsole.BrokerURL(destBroker)
- self.src = None
+ def __init__(self, localBroker):
+ self.local = qmfconsole.BrokerURL(localBroker)
+ self.remote = None
self.qmf = qmfconsole.Session()
- self.broker = self.qmf.addBroker(destBroker)
+ self.broker = self.qmf.addBroker(localBroker)
- def Disconnect (self):
+ def disconnect(self):
self.qmf.delBroker(self.broker)
- def getLink (self):
+ def getLink(self):
links = self.qmf.getObjects(_class="link")
for link in links:
- if self.src.match(link.host, link.port):
+ if self.remote.match(link.host, link.port):
return link
return None
- def AddLink (self, srcBroker):
- self.src = qmfconsole.BrokerURL(srcBroker)
- if self.dest.match(self.src.host, self.src.port):
- print "Linking broker to itself is not permitted"
- sys.exit(1)
+ def addLink(self, remoteBroker):
+ self.remote = qmfconsole.BrokerURL(remoteBroker)
+ if self.local.match(self.remote.host, self.remote.port):
+ raise Exception("Linking broker to itself is not permitted")
brokers = self.qmf.getObjects(_class="broker")
broker = brokers[0]
link = self.getLink()
- if link != None:
- raise Exception("Link already exists")
-
- if self.src.authName == "anonymous":
- mech = "ANONYMOUS"
- else:
- mech = "PLAIN"
- res = broker.connect(self.src.host, self.src.port, _durable,
- mech, self.src.authName, self.src.authPass,
- _transport)
- if _verbose:
- print "Connect method returned:", res.status, res.text
- link = self.getLink()
+ if link == None:
+ if self.remote.authName == "anonymous":
+ mech = "ANONYMOUS"
+ else:
+ mech = "PLAIN"
+ res = broker.connect(self.remote.host, self.remote.port, _durable,
+ mech, self.remote.authName, self.remote.authPass,
+ _transport)
+ if _verbose:
+ print "Connect method returned:", res.status, res.text
- def DelLink (self, srcBroker):
- self.src = qmfconsole.BrokerURL(srcBroker)
+ def delLink(self, remoteBroker):
+ self.remote = qmfconsole.BrokerURL(remoteBroker)
brokers = self.qmf.getObjects(_class="broker")
broker = brokers[0]
link = self.getLink()
@@ -109,7 +110,7 @@
if _verbose:
print "Close method returned:", res.status, res.text
- def ListLinks (self):
+ def listLinks(self):
links = self.qmf.getObjects(_class="link")
if len(links) == 0:
print "No Links Found"
@@ -121,14 +122,14 @@
print "%-16s%-8d %c %-18s%s" % \
(link.host, link.port, YN(link.durable), link.state, link.lastError)
- def MapLinks(self):
+ def mapRoutes(self):
qmf = self.qmf
print
print "Finding Linked Brokers:"
brokerList = {}
- brokerList[self.dest.name()] = self.broker
- print " %s... Ok" % self.dest
+ brokerList[self.local.name()] = self.broker
+ print " %s... Ok" % self.local
added = True
while added:
@@ -155,8 +156,7 @@
fedExchanges.append(bridge.src)
if len(fedExchanges) == 0:
print " none found"
- else:
- print
+ print
for ex in fedExchanges:
print " Exchange %s:" % ex
@@ -180,71 +180,124 @@
bridges = qmf.getObjects(_class="bridge", dynamic=False)
if len(bridges) == 0:
print " none found"
- else:
- print
+ print
for bridge in bridges:
link = bridge._linkRef_
fromUrl = "%s:%s" % (link.host, link.port)
toUrl = bridge.getBroker().getUrl()
- print " %s(%s) <= %s(%s) key=%s" % (toUrl, bridge.dest, fromUrl, bridge.src, bridge.key)
+ leftType = "ex"
+ rightType = "ex"
+ if bridge.srcIsLocal:
+ arrow = "=>"
+ left = bridge.src
+ right = bridge.dest
+ if bridge.srcIsQueue:
+ leftType = "queue"
+ else:
+ arrow = "<="
+ left = bridge.dest
+ right = bridge.src
+ if bridge.srcIsQueue:
+ rightType = "queue"
+
+ if bridge.srcIsQueue:
+ print " %s(%s=%s) %s %s(%s=%s)" % \
+ (toUrl, leftType, left, arrow, fromUrl, rightType, right)
+ else:
+ print " %s(%s=%s) %s %s(%s=%s) key=%s" % \
+ (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key)
print
for broker in brokerList:
- if broker != self.dest.name():
+ if broker != self.local.name():
qmf.delBroker(brokerList[broker])
- def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes, dynamic=False):
- self.src = qmfconsole.BrokerURL(srcBroker)
- if self.dest.match(self.src.host, self.src.port):
- raise Exception("Linking broker to itself is not permitted")
-
- brokers = self.qmf.getObjects(_class="broker")
- broker = brokers[0]
+ def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False):
+ if dynamic and _srclocal:
+ raise Exception("--src-local is not permitted on dynamic routes")
+ self.addLink(remoteBroker)
link = self.getLink()
if link == None:
- if _verbose:
- print "Inter-broker link not found, creating..."
+ raise Exception("Link failed to create")
- if self.src.authName == "anonymous":
- mech = "ANONYMOUS"
- else:
- mech = "PLAIN"
- res = broker.connect(self.src.host, self.src.port, _durable,
- mech, self.src.authName, self.src.authPass,
- _transport)
- if _verbose:
- print "Connect method returned:", res.status, res.text
- link = self.getLink()
+ bridges = self.qmf.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and \
+ bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue:
+ if not _quiet:
+ raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey))
+ sys.exit(0)
+ if _verbose:
+ print "Creating inter-broker binding..."
+ res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic)
+ if res.status != 0:
+ raise Exception(res.text)
+ if _verbose:
+ print "Bridge method returned:", res.status, res.text
+
+ def addQueueRoute(self, remoteBroker, exchange, queue):
+ self.addLink(remoteBroker)
+ link = self.getLink()
if link == None:
- raise Exception("Protocol Error - Missing link ID")
+ raise Exception("Link failed to create")
bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
if bridge.linkRef == link.getObjectId() and \
- bridge.dest == exchange and bridge.key == routingKey:
+ bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
if not _quiet:
- raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey))
- sys.exit (0)
+ raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue))
+ sys.exit(0)
if _verbose:
print "Creating inter-broker binding..."
- res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, False, dynamic)
+ res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False)
if res.status != 0:
raise Exception(res.text)
if _verbose:
print "Bridge method returned:", res.status, res.text
- def DelRoute (self, srcBroker, exchange, routingKey, dynamic=False):
- self.src = qmfconsole.BrokerURL(srcBroker)
+ def delQueueRoute(self, remoteBroker, exchange, queue):
+ self.remote = qmfconsole.BrokerURL(remoteBroker)
link = self.getLink()
if link == None:
if not _quiet:
- raise Exception("No link found from %s to %s" % (self.src.name(), self.dest.name()))
- sys.exit (0)
+ raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
+ sys.exit(0)
+
+ bridges = self.qmf.getObjects(_class="bridge")
+ for bridge in bridges:
+ if bridge.linkRef == link.getObjectId() and \
+ bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
+ if _verbose:
+ print "Closing bridge..."
+ res = bridge.close()
+ if res.status != 0:
+ raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
+ if len(bridges) == 1 and _dellink:
+ link = self.getLink()
+ if link == None:
+ sys.exit(0)
+ if _verbose:
+ print "Last bridge on link, closing link..."
+ res = link.close()
+ if res.status != 0:
+ raise Exception("Error closing link: %d - %s" % (res.status, res.text))
+ sys.exit(0)
+ if not _quiet:
+ raise Exception("Route not found")
+
+ def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False):
+ self.remote = qmfconsole.BrokerURL(remoteBroker)
+ link = self.getLink()
+ if link == None:
+ if not _quiet:
+ raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
+ sys.exit(0)
bridges = self.qmf.getObjects(_class="bridge")
for bridge in bridges:
@@ -255,20 +308,20 @@
res = bridge.close()
if res.status != 0:
raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
- if len (bridges) == 1 and _dellink:
- link = self.getLink ()
+ if len(bridges) == 1 and _dellink:
+ link = self.getLink()
if link == None:
- sys.exit (0)
+ sys.exit(0)
if _verbose:
print "Last bridge on link, closing link..."
res = link.close()
if res.status != 0:
raise Exception("Error closing link: %d - %s" % (res.status, res.text))
- sys.exit (0)
+ sys.exit(0)
if not _quiet:
raise Exception("Route not found")
- def ListRoutes (self):
+ def listRoutes(self):
links = self.qmf.getObjects(_class="link")
bridges = self.qmf.getObjects(_class="bridge")
@@ -283,9 +336,9 @@
keyText = "<dynamic>"
else:
keyText = bridge.key
- print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, keyText)
+ print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText)
- def ClearAllRoutes (self):
+ def clearAllRoutes(self):
links = self.qmf.getObjects(_class="link")
bridges = self.qmf.getObjects(_class="bridge")
@@ -347,10 +400,10 @@
##
try:
- longOpts = ("verbose", "quiet", "durable", "del-empty-link", "transport=")
- (optlist, cargs) = getopt.gnu_getopt (sys.argv[1:], "vqdet:", longOpts)
+ longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=")
+ (optlist, cargs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts)
except:
- Usage ()
+ Usage()
for opt in optlist:
if opt[0] == "-v" or opt[0] == "--verbose":
@@ -361,77 +414,94 @@
_durable = True
if opt[0] == "-e" or opt[0] == "--del-empty-link":
_dellink = True
+ if opt[0] == "-s" or opt[0] == "--src-local":
+ _srclocal = True
if opt[0] == "-t" or opt[0] == "--transport":
_transport = opt[1]
-nargs = len (cargs)
+nargs = len(cargs)
if nargs < 2:
- Usage ()
+ Usage()
if nargs == 2:
- destBroker = "localhost"
+ localBroker = "localhost"
else:
- destBroker = cargs[2]
+ if _srclocal:
+ localBroker = cargs[3]
+ remoteBroker = cargs[2]
+ else:
+ localBroker = cargs[2]
+ if nargs > 3:
+ remoteBroker = cargs[3]
group = cargs[0]
cmd = cargs[1]
try:
- rm = RouteManager (destBroker)
+ rm = RouteManager(localBroker)
if group == "link":
if cmd == "add":
if nargs != 4:
Usage()
- rm.AddLink (cargs[3])
+ rm.addLink(remoteBroker)
elif cmd == "del":
if nargs != 4:
Usage()
- rm.DelLink (cargs[3])
+ rm.delLink(remoteBroker)
elif cmd == "list":
- rm.ListLinks()
- elif cmd == "map":
- rm.MapLinks()
+ rm.listLinks()
elif group == "dynamic":
if cmd == "add":
if nargs < 5 or nargs > 7:
- Usage ()
+ Usage()
tag = ""
excludes = ""
if nargs > 5: tag = cargs[5]
if nargs > 6: excludes = cargs[6]
- rm.AddRoute (cargs[3], cargs[4], "", tag, excludes, dynamic=True)
+ rm.addRoute(remoteBroker, cargs[4], "", tag, excludes, dynamic=True)
elif cmd == "del":
if nargs != 5:
- Usage ()
+ Usage()
else:
- rm.DelRoute (cargs[3], cargs[4], "", dynamic=True)
+ rm.delRoute(remoteBroker, cargs[4], "", dynamic=True)
elif group == "route":
if cmd == "add":
if nargs < 6 or nargs > 8:
- Usage ()
+ Usage()
tag = ""
excludes = ""
if nargs > 6: tag = cargs[6]
- if nargs > 7: excludes = cargs[7]
- rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes, dynamic=False)
+ if nargs > 7: excludes = cargs[7]
+ rm.addRoute(remoteBroker, cargs[4], cargs[5], tag, excludes, dynamic=False)
elif cmd == "del":
if nargs != 6:
- Usage ()
- else:
- rm.DelRoute (cargs[3], cargs[4], cargs[5], dynamic=False)
+ Usage()
+ rm.delRoute(remoteBroker, cargs[4], cargs[5], dynamic=False)
+ elif cmd == "map":
+ rm.mapRoutes()
else:
- if cmd == "list":
- rm.ListRoutes ()
+ if cmd == "list":
+ rm.listRoutes()
elif cmd == "flush":
- rm.ClearAllRoutes ()
+ rm.clearAllRoutes()
else:
- Usage ()
+ Usage()
+
+ elif group == "queue":
+ if nargs != 6:
+ Usage()
+ if cmd == "add":
+ rm.addQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5])
+ elif cmd == "del":
+ rm.delQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5])
+ else:
+ Usage()
except Exception,e:
print "Failed:", e.args[0]
sys.exit(1)
-rm.Disconnect ()
+rm.disconnect()