You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ns...@apache.org on 2008/03/28 21:53:49 UTC
svn commit: r642375 - in /incubator/qpid/trunk/qpid/python/commands: ./
qpid-config
Author: nsantos
Date: Fri Mar 28 13:53:44 2008
New Revision: 642375
URL: http://svn.apache.org/viewvc?rev=642375&view=rev
Log:
QPID-885: patch from Ted Ross
Added:
incubator/qpid/trunk/qpid/python/commands/
incubator/qpid/trunk/qpid/python/commands/qpid-config (with props)
Added: incubator/qpid/trunk/qpid/python/commands/qpid-config
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/commands/qpid-config?rev=642375&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/python/commands/qpid-config (added)
+++ incubator/qpid/trunk/qpid/python/commands/qpid-config Fri Mar 28 13:53:44 2008
@@ -0,0 +1,342 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import socket
+import qpid
+from threading import Condition
+from qpid.management import managementClient
+from qpid.peer import Closed
+from qpid.client import Client
+from time import sleep
+
+defspecpath = "/usr/share/amqp/amqp.0-10-preview.xml"
+specpath = defspecpath
+recursive = False
+host = "localhost"
+
+def Usage ():
+ print "Usage: qpid-config [OPTIONS]"
+ print " qpid-config [OPTIONS] exchanges [filter-string]"
+ print " qpid-config [OPTIONS] queues [filter-string]"
+ print " qpid-config [OPTIONS] add exchange <type> <name> [durable]"
+ print " qpid-config [OPTIONS] del exchange <name>"
+ print " qpid-config [OPTIONS] add queue <name> [durable]"
+ print " qpid-config [OPTIONS] del queue <name>"
+ print " qpid-config [OPTIONS] bind <exchange-name> <queue-name> [binding-key]"
+ print " qpid-config [OPTIONS] unbind <exchange-name> <queue-name> [binding-key]"
+ print
+ print "Options:"
+ print " -b show bindings"
+ print " -a <broker-addr> default: localhost"
+ print " broker-addr is in the form: hostname | ip-address [:<port>]"
+ print " ex: localhost, 10.1.1.7:10000, broker-host:10000"
+ print " -s <amqp-spec-file> default:", defspecpath
+ print
+ sys.exit (1)
+
+class Broker:
+ def __init__ (self, text):
+ colon = text.find (":")
+ if colon == -1:
+ host = text
+ self.port = 5672
+ else:
+ host = text[:colon]
+ self.port = int (text[colon+1:])
+ self.host = socket.gethostbyname (host)
+
+ def name (self):
+ return self.host + ":" + str (self.port)
+
+class BrokerManager:
+ def __init__ (self):
+ self.dest = None
+ self.src = None
+ self.broker = None
+
+ def SetBroker (self, broker):
+ self.broker = broker
+
+ def ConnectToBroker (self):
+ try:
+ self.spec = qpid.spec.load (specpath)
+ self.client = Client (self.broker.host, self.broker.port, self.spec)
+ self.client.start ({"LOGIN":"guest","PASSWORD":"guest"})
+ self.channel = self.client.channel (1)
+ self.mclient = managementClient (self.spec)
+ self.mchannel = self.mclient.addChannel (self.channel)
+ except socket.error, e:
+ print "Connect Error:", e
+ exit (1)
+
+ def Overview (self):
+ self.ConnectToBroker ()
+ mc = self.mclient
+ mch = self.mchannel
+ mc.syncWaitForStable (mch)
+ exchanges = mc.syncGetObjects (mch, "exchange")
+ queues = mc.syncGetObjects (mch, "queue")
+ print "Total Exchanges: %d" % len (exchanges)
+ etype = {}
+ for ex in exchanges:
+ if ex.type not in etype:
+ etype[ex.type] = 1
+ else:
+ etype[ex.type] = etype[ex.type] + 1
+ for typ in etype:
+ print "%15s: %d" % (typ, etype[typ])
+
+ print
+ print " Total Queues: %d" % len (queues)
+ durable = 0
+ for queue in queues:
+ if queue.durable:
+ durable = durable + 1
+ print " durable: %d" % durable
+ print " non-durable: %d" % (len (queues) - durable)
+
+ def ExchangeList (self, filter):
+ self.ConnectToBroker ()
+ mc = self.mclient
+ mch = self.mchannel
+ mc.syncWaitForStable (mch)
+ exchanges = mc.syncGetObjects (mch, "exchange")
+ print "Type Bindings Exchange Name"
+ print "============================================="
+ for ex in exchanges:
+ if self.match (ex.name, filter):
+ print "%-10s%5d %s" % (ex.type, ex.bindings, ex.name)
+
+ def ExchangeListRecurse (self, filter):
+ self.ConnectToBroker ()
+ mc = self.mclient
+ mch = self.mchannel
+ mc.syncWaitForStable (mch)
+ exchanges = mc.syncGetObjects (mch, "exchange")
+ bindings = mc.syncGetObjects (mch, "binding")
+ queues = mc.syncGetObjects (mch, "queue")
+ for ex in exchanges:
+ if self.match (ex.name, filter):
+ print "Exchange '%s' (%s)" % (ex.name, ex.type)
+ for bind in bindings:
+ if bind.exchangeRef == ex.id:
+ qname = "<unknown>"
+ queue = self.findById (queues, bind.queueRef)
+ if queue != None:
+ qname = queue.name
+ print " bind [%s] => %s" % (bind.bindingKey, qname)
+
+
+ def QueueList (self, filter):
+ self.ConnectToBroker ()
+ mc = self.mclient
+ mch = self.mchannel
+ mc.syncWaitForStable (mch)
+ queues = mc.syncGetObjects (mch, "queue")
+ print "Durable AutoDel Excl Bindings Queue Name"
+ print "==============================================================="
+ for q in queues:
+ if self.match (q.name, filter):
+ print "%4c%9c%7c%10d %s" % (tf (q.durable), tf (q.autoDelete), tf (q.exclusive),
+ q.bindings, q.name)
+
+ def QueueListRecurse (self, filter):
+ self.ConnectToBroker ()
+ mc = self.mclient
+ mch = self.mchannel
+ mc.syncWaitForStable (mch)
+ exchanges = mc.syncGetObjects (mch, "exchange")
+ bindings = mc.syncGetObjects (mch, "binding")
+ queues = mc.syncGetObjects (mch, "queue")
+ for queue in queues:
+ if self.match (queue.name, filter):
+ print "Queue '%s'" % queue.name
+ for bind in bindings:
+ if bind.queueRef == queue.id:
+ ename = "<unknown>"
+ ex = self.findById (exchanges, bind.exchangeRef)
+ if ex != None:
+ ename = ex.name
+ if ename == "":
+ ename = "''"
+ print " bind [%s] => %s" % (bind.bindingKey, ename)
+
+ def AddExchange (self, args):
+ if len (args) < 2:
+ Usage ()
+ self.ConnectToBroker ()
+ etype = args[0]
+ ename = args[1]
+ _durable = False
+ if len (args) > 2 and args[2] == "durable":
+ _durable = True
+
+ try:
+ self.channel.exchange_declare (exchange=ename, type=etype, durable=_durable)
+ except Closed, e:
+ print "Failed:", e
+
+ def DelExchange (self, args):
+ if len (args) < 1:
+ Usage ()
+ self.ConnectToBroker ()
+ ename = args[0]
+
+ try:
+ self.channel.exchange_delete (exchange=ename)
+ except Closed, e:
+ print "Failed:", e
+
+ def AddQueue (self, args):
+ if len (args) < 1:
+ Usage ()
+ self.ConnectToBroker ()
+ qname = args[0]
+ _durable = False
+ if len (args) > 1 and args[1] == "durable":
+ _durable = True
+
+ try:
+ self.channel.queue_declare (queue=qname, durable=_durable)
+ except Closed, e:
+ print "Failed:", e
+
+ def DelQueue (self, args):
+ if len (args) < 1:
+ Usage ()
+ self.ConnectToBroker ()
+ qname = args[0]
+
+ try:
+ self.channel.queue_delete (queue=qname)
+ except Closed, e:
+ print "Failed:", e
+
+ def Bind (self, args):
+ if len (args) < 2:
+ Usage ()
+ self.ConnectToBroker ()
+ ename = args[0]
+ qname = args[1]
+ key = ""
+ if len (args) > 2:
+ key = args[2]
+
+ try:
+ self.channel.queue_bind (queue=qname, exchange=ename, routing_key=key)
+ except Closed, e:
+ print "Failed:", e
+
+ def Unbind (self, args):
+ if len (args) < 2:
+ Usage ()
+ self.ConnectToBroker ()
+ ename = args[0]
+ qname = args[1]
+ key = ""
+ if len (args) > 2:
+ key = args[2]
+
+ try:
+ self.channel.queue_unbind (queue=qname, exchange=ename, routing_key=key)
+ except Closed, e:
+ print "Failed:", e
+
+ def findById (self, items, id):
+ for item in items:
+ if item.id == id:
+ return item
+ return None
+
+ def match (self, name, filter):
+ if filter == "":
+ return True
+ if name.find (filter) == -1:
+ return False
+ return True
+
+def tf (bool):
+ if bool:
+ return 'Y'
+ return 'N'
+
+##
+## Main Program
+##
+
+try:
+ (optlist, cargs) = getopt.getopt (sys.argv[1:], "s:a:b")
+except:
+ Usage ()
+
+for opt in optlist:
+ if opt[0] == "-s":
+ specpath = opt[1]
+ if opt[0] == "-b":
+ recursive = True
+ if opt[0] == "-a":
+ host = opt[1]
+
+nargs = len (cargs)
+bm = BrokerManager ()
+bm.SetBroker (Broker (host))
+
+if nargs == 0:
+ bm.Overview ()
+else:
+ cmd = cargs[0]
+ modifier = ""
+ if nargs > 1:
+ modifier = cargs[1]
+ if cmd[0] == 'e':
+ if recursive:
+ bm.ExchangeListRecurse (modifier)
+ else:
+ bm.ExchangeList (modifier)
+ elif cmd[0] == 'q':
+ if recursive:
+ bm.QueueListRecurse (modifier)
+ else:
+ bm.QueueList (modifier)
+ elif cmd == "add":
+ if modifier == "exchange":
+ bm.AddExchange (cargs[2:])
+ elif modifier == "queue":
+ bm.AddQueue (cargs[2:])
+ else:
+ Usage ()
+ elif cmd == "del":
+ if modifier == "exchange":
+ bm.DelExchange (cargs[2:])
+ elif modifier == "queue":
+ bm.DelQueue (cargs[2:])
+ else:
+ Usage ()
+ elif cmd == "bind":
+ bm.Bind (cargs[1:])
+ elif cmd == "unbind":
+ bm.Unbind (cargs[1:])
+ else:
+ Usage ()
+
Propchange: incubator/qpid/trunk/qpid/python/commands/qpid-config
------------------------------------------------------------------------------
svn:executable = *