You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2008/11/05 18:16:47 UTC

svn commit: r711628 - /incubator/qpid/trunk/qpid/python/commands/qpid-route

Author: tross
Date: Wed Nov  5 09:16:43 2008
New Revision: 711628

URL: http://svn.apache.org/viewvc?rev=711628&view=rev
Log:
Added support for push-routes and queue-based routes.
Cleaned up the code a bit.

Modified:
    incubator/qpid/trunk/qpid/python/commands/qpid-route

Modified: incubator/qpid/trunk/qpid/python/commands/qpid-route
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-route?rev=711628&r1=711627&r2=711628&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-route (original)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-route Wed Nov  5 09:16:43 2008
@@ -25,80 +25,81 @@
 import os
 from qpid import qmfconsole
 
-def Usage ():
-    print "Usage:  qpid-route [OPTIONS] link add  <dest-broker> <src-broker>"
-    print "        qpid-route [OPTIONS] link del  <dest-broker> <src-broker>"
-    print "        qpid-route [OPTIONS] link list [<dest-broker>]"
-    print "        qpid-route [OPTIONS] link map  [<broker>]"
+def Usage():
+    print "Usage:  qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]"
+    print "        qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>"
     print
     print "        qpid-route [OPTIONS] route add   <dest-broker> <src-broker> <exchange> <routing-key> [tag] [exclude-list]"
     print "        qpid-route [OPTIONS] route del   <dest-broker> <src-broker> <exchange> <routing-key>"
+    print "        qpid-route [OPTIONS] queue add   <dest-broker> <src-broker> <exchange> <queue>"
+    print "        qpid-route [OPTIONS] queue del   <dest-broker> <src-broker> <exchange> <queue>"
     print "        qpid-route [OPTIONS] route list  [<dest-broker>]"
     print "        qpid-route [OPTIONS] route flush [<dest-broker>]"
-    print "        qpid-route [OPTIONS] dynamic add <dest-broker> <src-broker> <exchange> [tag] [exclude-list]"
-    print "        qpid-route [OPTIONS] dynamic del <dest-broker> <src-broker> <exchange>"
+    print "        qpid-route [OPTIONS] route map   [<broker>]"
+    print
+    print "        qpid-route [OPTIONS] link add  <dest-broker> <src-broker>"
+    print "        qpid-route [OPTIONS] link del  <dest-broker> <src-broker>"
+    print "        qpid-route [OPTIONS] link list [<dest-broker>]"
     print
     print "Options:"
     print "    -v [ --verbose ]         Verbose output"
     print "    -q [ --quiet ]           Quiet output, don't print duplicate warnings"
     print "    -d [ --durable ]         Added configuration shall be durable"
     print "    -e [ --del-empty-link ]  Delete link after deleting last route on the link"
+    print "    -s [ --src-local ]       Make connection to source broker (push route)"
     print "    -t <transport> [ --transport <transport>]"
     print "                             Specify transport to use for links, defaults to tcp"
     print
     print "  dest-broker and src-broker are in the form:  [username/password@] hostname | ip-address [:<port>]"
     print "  ex:  localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
     print
-    sys.exit (1)
+    sys.exit(1)
 
-_verbose  = False
-_quiet    = False
-_durable  = False
-_dellink  = False
+_verbose   = False
+_quiet     = False
+_durable   = False
+_dellink   = False
+_srclocal  = False
 _transport = "tcp"
 
 class RouteManager:
-    def __init__ (self, destBroker):
-        self.dest = qmfconsole.BrokerURL(destBroker)
-        self.src  = None
+    def __init__(self, localBroker):
+        self.local = qmfconsole.BrokerURL(localBroker)
+        self.remote  = None
         self.qmf = qmfconsole.Session()
-        self.broker = self.qmf.addBroker(destBroker)
+        self.broker = self.qmf.addBroker(localBroker)
 
-    def Disconnect (self):
+    def disconnect(self):
         self.qmf.delBroker(self.broker)
 
-    def getLink (self):
+    def getLink(self):
         links = self.qmf.getObjects(_class="link")
         for link in links:
-            if self.src.match(link.host, link.port):
+            if self.remote.match(link.host, link.port):
                 return link
         return None
 
-    def AddLink (self, srcBroker):
-        self.src = qmfconsole.BrokerURL(srcBroker)
-        if self.dest.match(self.src.host, self.src.port):
-            print "Linking broker to itself is not permitted"
-            sys.exit(1)
+    def addLink(self, remoteBroker):
+        self.remote = qmfconsole.BrokerURL(remoteBroker)
+        if self.local.match(self.remote.host, self.remote.port):
+            raise Exception("Linking broker to itself is not permitted")
 
         brokers = self.qmf.getObjects(_class="broker")
         broker = brokers[0]
         link = self.getLink()
-        if link != None:
-            raise Exception("Link already exists")
-
-        if self.src.authName == "anonymous":
-            mech = "ANONYMOUS"
-        else:
-            mech = "PLAIN"
-        res = broker.connect(self.src.host, self.src.port, _durable,
-                             mech, self.src.authName, self.src.authPass,
-                             _transport)
-        if _verbose:
-            print "Connect method returned:", res.status, res.text
-        link = self.getLink()
+        if link == None:
+            if self.remote.authName == "anonymous":
+                mech = "ANONYMOUS"
+            else:
+                mech = "PLAIN"
+            res = broker.connect(self.remote.host, self.remote.port, _durable,
+                                 mech, self.remote.authName, self.remote.authPass,
+                                 _transport)
+            if _verbose:
+                print "Connect method returned:", res.status, res.text
 
-    def DelLink (self, srcBroker):
-        self.src = qmfconsole.BrokerURL(srcBroker)
+    def delLink(self, remoteBroker):
+        self.remote = qmfconsole.BrokerURL(remoteBroker)
         brokers = self.qmf.getObjects(_class="broker")
         broker = brokers[0]
         link = self.getLink()
@@ -109,7 +110,7 @@
         if _verbose:
             print "Close method returned:", res.status, res.text
 
-    def ListLinks (self):
+    def listLinks(self):
         links = self.qmf.getObjects(_class="link")
         if len(links) == 0:
             print "No Links Found"
@@ -121,14 +122,14 @@
                 print "%-16s%-8d   %c     %-18s%s" % \
                 (link.host, link.port, YN(link.durable), link.state, link.lastError)
 
-    def MapLinks(self):
+    def mapRoutes(self):
         qmf = self.qmf
         print
         print "Finding Linked Brokers:"
 
         brokerList = {}
-        brokerList[self.dest.name()] = self.broker
-        print "    %s... Ok" % self.dest
+        brokerList[self.local.name()] = self.broker
+        print "    %s... Ok" % self.local
 
         added = True
         while added:
@@ -155,8 +156,7 @@
                 fedExchanges.append(bridge.src)
         if len(fedExchanges) == 0:
             print "  none found"
-        else:
-            print
+        print
 
         for ex in fedExchanges:
             print "  Exchange %s:" % ex
@@ -180,71 +180,124 @@
         bridges = qmf.getObjects(_class="bridge", dynamic=False)
         if len(bridges) == 0:
             print "  none found"
-        else:
-            print
+        print
 
         for bridge in bridges:
             link = bridge._linkRef_
             fromUrl = "%s:%s" % (link.host, link.port)
             toUrl = bridge.getBroker().getUrl()
-            print "  %s(%s) <= %s(%s) key=%s" % (toUrl, bridge.dest, fromUrl, bridge.src, bridge.key)
+            leftType = "ex"
+            rightType = "ex"
+            if bridge.srcIsLocal:
+                arrow = "=>"
+                left = bridge.src
+                right = bridge.dest
+                if bridge.srcIsQueue:
+                    leftType = "queue"
+            else:
+                arrow = "<="
+                left = bridge.dest
+                right = bridge.src
+                if bridge.srcIsQueue:
+                    rightType = "queue"
+
+            if bridge.srcIsQueue:
+                print "  %s(%s=%s) %s %s(%s=%s)" % \
+                    (toUrl, leftType, left, arrow, fromUrl, rightType, right)
+            else:
+                print "  %s(%s=%s) %s %s(%s=%s) key=%s" % \
+                    (toUrl, leftType, left, arrow, fromUrl, rightType, right, bridge.key)
         print
 
         for broker in brokerList:
-            if broker != self.dest.name():
+            if broker != self.local.name():
                 qmf.delBroker(brokerList[broker])
 
 
-    def AddRoute (self, srcBroker, exchange, routingKey, tag, excludes, dynamic=False):
-        self.src = qmfconsole.BrokerURL(srcBroker)
-        if self.dest.match(self.src.host, self.src.port):
-            raise Exception("Linking broker to itself is not permitted")
-
-        brokers = self.qmf.getObjects(_class="broker")
-        broker = brokers[0]
+    def addRoute(self, remoteBroker, exchange, routingKey, tag, excludes, dynamic=False):
+        if dynamic and _srclocal:
+            raise Exception("--src-local is not permitted on dynamic routes")
 
+        self.addLink(remoteBroker)
         link = self.getLink()
         if link == None:
-            if _verbose:
-                print "Inter-broker link not found, creating..."
+            raise Exception("Link failed to create")
 
-            if self.src.authName == "anonymous":
-                mech = "ANONYMOUS"
-            else:
-                mech = "PLAIN"
-            res = broker.connect(self.src.host, self.src.port, _durable,
-                                 mech, self.src.authName, self.src.authPass,
-                                 _transport)
-            if _verbose:
-                print "Connect method returned:", res.status, res.text
-            link = self.getLink()
+        bridges = self.qmf.getObjects(_class="bridge")
+        for bridge in bridges:
+            if bridge.linkRef == link.getObjectId() and \
+                    bridge.dest == exchange and bridge.key == routingKey and not bridge.srcIsQueue:
+                if not _quiet:
+                    raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey))
+                sys.exit(0)
 
+        if _verbose:
+            print "Creating inter-broker binding..."
+        res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, _srclocal, dynamic)
+        if res.status != 0:
+            raise Exception(res.text)
+        if _verbose:
+            print "Bridge method returned:", res.status, res.text
+
+    def addQueueRoute(self, remoteBroker, exchange, queue):
+        self.addLink(remoteBroker)
+        link = self.getLink()
         if link == None:
-            raise Exception("Protocol Error - Missing link ID")
+            raise Exception("Link failed to create")
 
         bridges = self.qmf.getObjects(_class="bridge")
         for bridge in bridges:
             if bridge.linkRef == link.getObjectId() and \
-                    bridge.dest == exchange and bridge.key == routingKey:
+                    bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
                 if not _quiet:
-                    raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, routingKey))
-                sys.exit (0)
+                    raise Exception("Duplicate Route - ignoring: %s(%s)" % (exchange, queue))
+                sys.exit(0)
 
         if _verbose:
             print "Creating inter-broker binding..."
-        res = link.bridge(_durable, exchange, exchange, routingKey, tag, excludes, False, False, dynamic)
+        res = link.bridge(_durable, queue, exchange, "", "", "", True, _srclocal, False)
         if res.status != 0:
             raise Exception(res.text)
         if _verbose:
             print "Bridge method returned:", res.status, res.text
 
-    def DelRoute (self, srcBroker, exchange, routingKey, dynamic=False):
-        self.src = qmfconsole.BrokerURL(srcBroker)
+    def delQueueRoute(self, remoteBroker, exchange, queue):
+        self.remote = qmfconsole.BrokerURL(remoteBroker)
         link = self.getLink()
         if link == None:
             if not _quiet:
-                raise Exception("No link found from %s to %s" % (self.src.name(), self.dest.name()))
-            sys.exit (0)
+                raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
+            sys.exit(0)
+
+        bridges = self.qmf.getObjects(_class="bridge")
+        for bridge in bridges:
+            if bridge.linkRef == link.getObjectId() and \
+                    bridge.dest == exchange and bridge.src == queue and bridge.srcIsQueue:
+                if _verbose:
+                    print "Closing bridge..."
+                res = bridge.close()
+                if res.status != 0:
+                    raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
+                if len(bridges) == 1 and _dellink:
+                    link = self.getLink()
+                    if link == None:
+                        sys.exit(0)
+                    if _verbose:
+                        print "Last bridge on link, closing link..."
+                    res = link.close()
+                    if res.status != 0:
+                        raise Exception("Error closing link: %d - %s" % (res.status, res.text))
+                sys.exit(0)
+        if not _quiet:
+            raise Exception("Route not found")
+
+    def delRoute(self, remoteBroker, exchange, routingKey, dynamic=False):
+        self.remote = qmfconsole.BrokerURL(remoteBroker)
+        link = self.getLink()
+        if link == None:
+            if not _quiet:
+                raise Exception("No link found from %s to %s" % (self.remote.name(), self.local.name()))
+            sys.exit(0)
 
         bridges = self.qmf.getObjects(_class="bridge")
         for bridge in bridges:
@@ -255,20 +308,20 @@
                 res = bridge.close()
                 if res.status != 0:
                     raise Exception("Error closing bridge: %d - %s" % (res.status, res.text))
-                if len (bridges) == 1 and _dellink:
-                    link = self.getLink ()
+                if len(bridges) == 1 and _dellink:
+                    link = self.getLink()
                     if link == None:
-                        sys.exit (0)
+                        sys.exit(0)
                     if _verbose:
                         print "Last bridge on link, closing link..."
                     res = link.close()
                     if res.status != 0:
                         raise Exception("Error closing link: %d - %s" % (res.status, res.text))
-                sys.exit (0)
+                sys.exit(0)
         if not _quiet:
             raise Exception("Route not found")
 
-    def ListRoutes (self):
+    def listRoutes(self):
         links   = self.qmf.getObjects(_class="link")
         bridges = self.qmf.getObjects(_class="bridge")
 
@@ -283,9 +336,9 @@
                     keyText = "<dynamic>"
                 else:
                     keyText = bridge.key
-                print "%s %s:%d %s %s" % (self.dest.name(), myLink.host, myLink.port, bridge.dest, keyText)
+                print "%s %s:%d %s %s" % (self.local.name(), myLink.host, myLink.port, bridge.dest, keyText)
 
-    def ClearAllRoutes (self):
+    def clearAllRoutes(self):
         links   = self.qmf.getObjects(_class="link")
         bridges = self.qmf.getObjects(_class="bridge")
 
@@ -347,10 +400,10 @@
 ##
 
 try:
-    longOpts = ("verbose", "quiet", "durable", "del-empty-link", "transport=")
-    (optlist, cargs) = getopt.gnu_getopt (sys.argv[1:], "vqdet:", longOpts)
+    longOpts = ("verbose", "quiet", "durable", "del-empty-link", "src-local", "transport=")
+    (optlist, cargs) = getopt.gnu_getopt(sys.argv[1:], "vqdest:", longOpts)
 except:
-    Usage ()
+    Usage()
 
 for opt in optlist:
     if opt[0] == "-v" or opt[0] == "--verbose":
@@ -361,77 +414,94 @@
         _durable = True
     if opt[0] == "-e" or opt[0] == "--del-empty-link":
         _dellink = True
+    if opt[0] == "-s" or opt[0] == "--src-local":
+        _srclocal = True
     if opt[0] == "-t" or opt[0] == "--transport":
         _transport = opt[1]
 
-nargs = len (cargs)
+nargs = len(cargs)
 if nargs < 2:
-    Usage ()
+    Usage()
 if nargs == 2:
-    destBroker = "localhost"
+    localBroker = "localhost"
 else:
-    destBroker = cargs[2]
+    if _srclocal:
+        localBroker = cargs[3]
+        remoteBroker = cargs[2]
+    else:
+        localBroker = cargs[2]
+        if nargs > 3:
+            remoteBroker = cargs[3]
 
 group = cargs[0]
 cmd   = cargs[1]
 
 try:
-    rm = RouteManager (destBroker)
+    rm = RouteManager(localBroker)
     if group == "link":
         if cmd == "add":
             if nargs != 4:
                 Usage()
-            rm.AddLink (cargs[3])
+            rm.addLink(remoteBroker)
         elif cmd == "del":
             if nargs != 4:
                 Usage()
-            rm.DelLink (cargs[3])
+            rm.delLink(remoteBroker)
         elif cmd == "list":
-            rm.ListLinks()
-        elif cmd == "map":
-            rm.MapLinks()
+            rm.listLinks()
 
     elif group == "dynamic":
         if cmd == "add":
             if nargs < 5 or nargs > 7:
-                Usage ()
+                Usage()
 
             tag = ""
             excludes = ""
             if nargs > 5: tag = cargs[5]     
             if nargs > 6: excludes = cargs[6]     
-            rm.AddRoute (cargs[3], cargs[4], "", tag, excludes, dynamic=True)
+            rm.addRoute(remoteBroker, cargs[4], "", tag, excludes, dynamic=True)
         elif cmd == "del":
             if nargs != 5:
-                Usage ()
+                Usage()
             else:
-                rm.DelRoute (cargs[3], cargs[4], "", dynamic=True)
+                rm.delRoute(remoteBroker, cargs[4], "", dynamic=True)
 
     elif group == "route":
         if cmd == "add":
             if nargs < 6 or nargs > 8:
-                Usage ()
+                Usage()
 
             tag = ""
             excludes = ""
             if nargs > 6: tag = cargs[6]     
-            if nargs > 7: excludes = cargs[7]     
-            rm.AddRoute (cargs[3], cargs[4], cargs[5], tag, excludes, dynamic=False)
+            if nargs > 7: excludes = cargs[7]
+            rm.addRoute(remoteBroker, cargs[4], cargs[5], tag, excludes, dynamic=False)
         elif cmd == "del":
             if nargs != 6:
-                Usage ()
-            else:
-                rm.DelRoute (cargs[3], cargs[4], cargs[5], dynamic=False)
+                Usage()
+            rm.delRoute(remoteBroker, cargs[4], cargs[5], dynamic=False)
+        elif cmd == "map":
+            rm.mapRoutes()
         else:
-            if   cmd == "list":
-                rm.ListRoutes ()
+            if cmd == "list":
+                rm.listRoutes()
             elif cmd == "flush":
-                rm.ClearAllRoutes ()
+                rm.clearAllRoutes()
             else:
-                Usage ()
+                Usage()
+
+    elif group == "queue":
+        if nargs != 6:
+            Usage()
+        if cmd == "add":
+            rm.addQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5])
+        elif cmd == "del":
+            rm.delQueueRoute(remoteBroker, exchange=cargs[4], queue=cargs[5])
+        else:
+            Usage()
 
 except Exception,e:
     print "Failed:", e.args[0]
     sys.exit(1)
 
-rm.Disconnect ()
+rm.disconnect()