You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ns...@apache.org on 2008/03/28 21:53:49 UTC

svn commit: r642375 - in /incubator/qpid/trunk/qpid/python/commands: ./ qpid-config

Author: nsantos
Date: Fri Mar 28 13:53:44 2008
New Revision: 642375

URL: http://svn.apache.org/viewvc?rev=642375&view=rev
Log:
QPID-885: patch from Ted Ross

Added:
    incubator/qpid/trunk/qpid/python/commands/
    incubator/qpid/trunk/qpid/python/commands/qpid-config   (with props)

Added: incubator/qpid/trunk/qpid/python/commands/qpid-config
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-config?rev=642375&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-config (added)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-config Fri Mar 28 13:53:44 2008
@@ -0,0 +1,342 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import socket
+import qpid
+from threading       import Condition
+from qpid.management import managementClient
+from qpid.peer       import Closed
+from qpid.client     import Client
+from time            import sleep
+
+defspecpath  = "/usr/share/amqp/amqp.0-10-preview.xml"
+specpath     = defspecpath
+recursive    = False
+host         = "localhost"
+
+def Usage ():
+    print "Usage:  qpid-config [OPTIONS]"
+    print "        qpid-config [OPTIONS] exchanges [filter-string]"
+    print "        qpid-config [OPTIONS] queues    [filter-string]"
+    print "        qpid-config [OPTIONS] add exchange <type> <name> [durable]"
+    print "        qpid-config [OPTIONS] del exchange <name>"
+    print "        qpid-config [OPTIONS] add queue <name> [durable]"
+    print "        qpid-config [OPTIONS] del queue <name>"
+    print "        qpid-config [OPTIONS] bind   <exchange-name> <queue-name> [binding-key]"
+    print "        qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]"
+    print
+    print "Options:"
+    print "    -b                   show bindings"
+    print "    -a <broker-addr>     default: localhost"
+    print "         broker-addr is in the form:   hostname | ip-address [:<port>]"
+    print "         ex:  localhost, 10.1.1.7:10000, broker-host:10000"
+    print "    -s <amqp-spec-file>  default:", defspecpath
+    print
+    sys.exit (1)
+
+class Broker:
+    def __init__ (self, text):
+        colon = text.find (":")
+        if colon == -1:
+            host = text
+            self.port = 5672
+        else:
+            host = text[:colon]
+            self.port = int (text[colon+1:])
+        self.host = socket.gethostbyname (host)
+
+    def name (self):
+        return self.host + ":" + str (self.port)
+
+class BrokerManager:
+    def __init__ (self):
+        self.dest   = None
+        self.src    = None
+        self.broker = None
+
+    def SetBroker (self, broker):
+        self.broker = broker
+
+    def ConnectToBroker (self):
+        try:
+            self.spec     = qpid.spec.load (specpath)
+            self.client   = Client (self.broker.host, self.broker.port, self.spec)
+            self.client.start ({"LOGIN":"guest","PASSWORD":"guest"})
+            self.channel  = self.client.channel (1)
+            self.mclient  = managementClient (self.spec)
+            self.mchannel = self.mclient.addChannel (self.channel)
+        except socket.error, e:
+            print "Connect Error:", e
+            exit (1)
+
+    def Overview (self):
+        self.ConnectToBroker ()
+        mc  = self.mclient
+        mch = self.mchannel
+        mc.syncWaitForStable (mch)
+        exchanges = mc.syncGetObjects (mch, "exchange")
+        queues    = mc.syncGetObjects (mch, "queue")
+        print "Total Exchanges: %d" % len (exchanges)
+        etype = {}
+        for ex in exchanges:
+            if ex.type not in etype:
+                etype[ex.type] = 1
+            else:
+                etype[ex.type] = etype[ex.type] + 1
+        for typ in etype:
+            print "%15s: %d" % (typ, etype[typ])
+
+        print
+        print "   Total Queues: %d" % len (queues)
+        durable = 0
+        for queue in queues:
+            if queue.durable:
+                durable = durable + 1
+        print "        durable: %d" % durable
+        print "    non-durable: %d" % (len (queues) - durable)
+
+    def ExchangeList (self, filter):
+        self.ConnectToBroker ()
+        mc  = self.mclient
+        mch = self.mchannel
+        mc.syncWaitForStable (mch)
+        exchanges = mc.syncGetObjects (mch, "exchange")
+        print "Type      Bindings  Exchange Name"
+        print "============================================="
+        for ex in exchanges:
+            if self.match (ex.name, filter):
+                print "%-10s%5d     %s" % (ex.type, ex.bindings, ex.name)
+
+    def ExchangeListRecurse (self, filter):
+        self.ConnectToBroker ()
+        mc  = self.mclient
+        mch = self.mchannel
+        mc.syncWaitForStable (mch)
+        exchanges = mc.syncGetObjects (mch, "exchange")
+        bindings  = mc.syncGetObjects (mch, "binding")
+        queues    = mc.syncGetObjects (mch, "queue")
+        for ex in exchanges:
+            if self.match (ex.name, filter):
+                print "Exchange '%s' (%s)" % (ex.name, ex.type)
+                for bind in bindings:
+                    if bind.exchangeRef == ex.id:
+                        qname = "<unknown>"
+                        queue = self.findById (queues, bind.queueRef)
+                        if queue != None:
+                            qname = queue.name
+                        print "    bind [%s] => %s" % (bind.bindingKey, qname)
+            
+
+    def QueueList (self, filter):
+        self.ConnectToBroker ()
+        mc  = self.mclient
+        mch = self.mchannel
+        mc.syncWaitForStable (mch)
+        queues = mc.syncGetObjects (mch, "queue")
+        print "Durable  AutoDel  Excl  Bindings  Queue Name"
+        print "==============================================================="
+        for q in queues:
+            if self.match (q.name, filter):
+                print "%4c%9c%7c%10d    %s" % (tf (q.durable), tf (q.autoDelete), tf (q.exclusive),
+                                               q.bindings, q.name)
+
+    def QueueListRecurse (self, filter):
+        self.ConnectToBroker ()
+        mc  = self.mclient
+        mch = self.mchannel
+        mc.syncWaitForStable (mch)
+        exchanges = mc.syncGetObjects (mch, "exchange")
+        bindings  = mc.syncGetObjects (mch, "binding")
+        queues    = mc.syncGetObjects (mch, "queue")
+        for queue in queues:
+            if self.match (queue.name, filter):
+                print "Queue '%s'" % queue.name
+                for bind in bindings:
+                    if bind.queueRef == queue.id:
+                        ename = "<unknown>"
+                        ex    = self.findById (exchanges, bind.exchangeRef)
+                        if ex != None:
+                            ename = ex.name
+                            if ename == "":
+                                ename = "''"
+                        print "    bind [%s] => %s" % (bind.bindingKey, ename)
+
+    def AddExchange (self, args):
+        if len (args) < 2:
+            Usage ()
+        self.ConnectToBroker ()
+        etype = args[0]
+        ename = args[1]
+        _durable = False
+        if len (args) > 2 and args[2] == "durable":
+            _durable = True
+
+        try:
+            self.channel.exchange_declare (exchange=ename, type=etype, durable=_durable)
+        except Closed, e:
+            print "Failed:", e
+
+    def DelExchange (self, args):
+        if len (args) < 1:
+            Usage ()
+        self.ConnectToBroker ()
+        ename = args[0]
+
+        try:
+            self.channel.exchange_delete (exchange=ename)
+        except Closed, e:
+            print "Failed:", e
+
+    def AddQueue (self, args):
+        if len (args) < 1:
+            Usage ()
+        self.ConnectToBroker ()
+        qname = args[0]
+        _durable = False
+        if len (args) > 1 and args[1] == "durable":
+            _durable = True
+
+        try:
+            self.channel.queue_declare (queue=qname, durable=_durable)
+        except Closed, e:
+            print "Failed:", e
+
+    def DelQueue (self, args):
+        if len (args) < 1:
+            Usage ()
+        self.ConnectToBroker ()
+        qname = args[0]
+
+        try:
+            self.channel.queue_delete (queue=qname)
+        except Closed, e:
+            print "Failed:", e
+
+    def Bind (self, args):
+        if len (args) < 2:
+            Usage ()
+        self.ConnectToBroker ()
+        ename = args[0]
+        qname = args[1]
+        key   = ""
+        if len (args) > 2:
+            key = args[2]
+
+        try:
+            self.channel.queue_bind (queue=qname, exchange=ename, routing_key=key)
+        except Closed, e:
+            print "Failed:", e
+
+    def Unbind (self, args):
+        if len (args) < 2:
+            Usage ()
+        self.ConnectToBroker ()
+        ename = args[0]
+        qname = args[1]
+        key   = ""
+        if len (args) > 2:
+            key = args[2]
+
+        try:
+            self.channel.queue_unbind (queue=qname, exchange=ename, routing_key=key)
+        except Closed, e:
+            print "Failed:", e
+
+    def findById (self, items, id):
+        for item in items:
+            if item.id == id:
+                return item
+        return None
+
+    def match (self, name, filter):
+        if filter == "":
+            return True
+        if name.find (filter) == -1:
+            return False
+        return True
+
+def tf (bool):
+    if bool:
+        return 'Y'
+    return 'N'
+
+##
+## Main Program
+##
+
+try:
+    (optlist, cargs) = getopt.getopt (sys.argv[1:], "s:a:b")
+except:
+    Usage ()
+
+for opt in optlist:
+    if opt[0] == "-s":
+        specpath = opt[1]
+    if opt[0] == "-b":
+        recursive = True
+    if opt[0] == "-a":
+        host = opt[1]
+
+nargs = len (cargs)
+bm  = BrokerManager ()
+bm.SetBroker (Broker (host))
+
+if nargs == 0:
+    bm.Overview ()
+else:
+    cmd = cargs[0]
+    modifier = ""
+    if nargs > 1:
+        modifier = cargs[1]
+    if cmd[0] == 'e':
+        if recursive:
+            bm.ExchangeListRecurse (modifier)
+        else:
+            bm.ExchangeList (modifier)
+    elif cmd[0] == 'q':
+        if recursive:
+            bm.QueueListRecurse (modifier)
+        else:
+            bm.QueueList (modifier)
+    elif cmd == "add":
+        if modifier == "exchange":
+            bm.AddExchange (cargs[2:])
+        elif modifier == "queue":
+            bm.AddQueue (cargs[2:])
+        else:
+            Usage ()
+    elif cmd == "del":
+        if modifier == "exchange":
+            bm.DelExchange (cargs[2:])
+        elif modifier == "queue":
+            bm.DelQueue (cargs[2:])
+        else:
+            Usage ()
+    elif cmd == "bind":
+        bm.Bind (cargs[1:])
+    elif cmd == "unbind":
+        bm.Unbind (cargs[1:])
+    else:
+        Usage ()
+

Propchange: incubator/qpid/trunk/qpid/python/commands/qpid-config
------------------------------------------------------------------------------
    svn:executable = *