You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/02/25 20:41:17 UTC
svn commit: r747897 - in /qpid/trunk/qpid/python: commands/qpid-cluster
commands/qpid-stat qpid/disp.py
Author: tross
Date: Wed Feb 25 19:41:17 2009
New Revision: 747897
URL: http://svn.apache.org/viewvc?rev=747897&view=rev
Log:
Added a new utility for viewing broker stats.
Fixed a bug in qpid-cluster that causes failure when username/password are
included in the broker URL.
Added:
qpid/trunk/qpid/python/commands/qpid-stat (with props)
Modified:
qpid/trunk/qpid/python/commands/qpid-cluster
qpid/trunk/qpid/python/qpid/disp.py
Modified: qpid/trunk/qpid/python/commands/qpid-cluster
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-cluster?rev=747897&r1=747896&r2=747897&view=diff
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-cluster (original)
+++ qpid/trunk/qpid/python/commands/qpid-cluster Wed Feb 25 19:41:17 2009
@@ -56,6 +56,9 @@
class IpAddr:
def __init__(self, text):
+ if text.find("@") != -1:
+ tokens = text.split("@")
+ text = tokens[1]
if text.find(":") != -1:
tokens = text.split(":")
text = tokens[0]
Added: qpid/trunk/qpid/python/commands/qpid-stat
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-stat?rev=747897&view=auto
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-stat (added)
+++ qpid/trunk/qpid/python/commands/qpid-stat Wed Feb 25 19:41:17 2009
@@ -0,0 +1,385 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import locale
+import socket
+import re
+from qmf.console import Session, Console
+from qpid.disp import Display
+
+_host = "localhost"
+_top = False
+_types = ""
+pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
+
+def Usage ():
+ print "Usage: qpid-stat [OPTIONS] [broker-addr]"
+ print
+ print " broker-addr is in the form: [username/password@] hostname | ip-address [:<port>]"
+ print " ex: localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
+ print
+# print "General Options:"
+# print " -n [--numeric] Don't resolve names"
+# print " -t [--top] Repeatedly display top items"
+# print
+ print "Display Options:"
+ print
+ print " -b Show Brokers"
+ print " -c Show Connections"
+# print " -s Show Sessions"
+# print " -e Show Exchanges"
+# print " -q Show Queues"
+ print
+ sys.exit (1)
+
+def num(value):
+ if value < 2000:
+ return str(value)
+ value /= 1000
+ if value < 2000:
+ return str(value) + "k"
+ value /= 1000
+ if value < 2000:
+ return str(value) + "m"
+ value /= 1000
+ return str(value) + "g"
+
+class IpAddr:
+ def __init__(self, text):
+ if text.find("@") != -1:
+ tokens = text.split("@")
+ text = tokens[1]
+ if text.find(":") != -1:
+ tokens = text.split(":")
+ text = tokens[0]
+ self.port = int(tokens[1])
+ else:
+ self.port = 5672
+ self.dottedQuad = socket.gethostbyname(text)
+ nums = self.dottedQuad.split(".")
+ self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
+
+ def bestAddr(self, addrPortList):
+ bestDiff = 0xFFFFFFFF
+ bestAddr = None
+ for addrPort in addrPortList:
+ diff = IpAddr(addrPort[0]).addr ^ self.addr
+ if diff < bestDiff:
+ bestDiff = diff
+ bestAddr = addrPort
+ return bestAddr
+
+class Broker(object):
+ def __init__(self, qmf, broker):
+ self.broker = broker
+ list = qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker)
+ bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _broker=broker)[0]
+ self.currentTime = bobj.getTimestamps()[0]
+ try:
+ self.uptime = bobj.uptime
+ except:
+ self.uptime = 0
+ self.connections = {}
+ self.sessions = {}
+ self.exchanges = {}
+ self.queues = {}
+ for conn in list:
+ if pattern.match(conn.address):
+ self.connections[conn.getObjectId()] = conn
+ list = qmf.getObjects(_class="session", _package="org.apache.qpid.broker", _broker=broker)
+ for sess in list:
+ if sess.connectionRef in self.connections:
+ self.sessions[sess.getObjectId()] = sess
+ list = qmf.getObjects(_class="exchange", _package="org.apache.qpid.broker", _broker=broker)
+ for exchange in list:
+ self.exchanges[exchange.getObjectId()] = exchange
+ list = qmf.getObjects(_class="queue", _package="org.apache.qpid.broker", _broker=broker)
+ for queue in list:
+ self.queues[queue.getObjectId()] = queue
+
+ def getName(self):
+ return self.broker.getUrl()
+
+ def getCurrentTime(self):
+ return self.currentTime
+
+ def getUptime(self):
+ return self.uptime
+
+class BrokerManager(Console):
+ def __init__(self):
+ self.brokerName = None
+ self.qmf = None
+ self.broker = None
+ self.brokers = []
+ self.cluster = None
+
+ def SetBroker(self, brokerUrl):
+ self.url = brokerUrl
+ self.qmf = Session()
+ self.broker = self.qmf.addBroker(brokerUrl)
+ agents = self.qmf.getAgents()
+ for a in agents:
+ if a.getAgentBank() == 0:
+ self.brokerAgent = a
+
+ def Disconnect(self):
+ if self.broker:
+ self.qmf.delBroker(self.broker)
+
+ def _getCluster(self):
+ packages = self.qmf.getPackages()
+ if "org.apache.qpid.cluster" not in packages:
+ return None
+
+ clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+ if len(clusters) == 0:
+ print "Clustering is installed but not enabled on the broker."
+ return None
+
+ self.cluster = clusters[0]
+
+ def _getHostList(self, urlList):
+ hosts = []
+ hostAddr = IpAddr(_host)
+ for url in urlList:
+ if url.find("amqp:") != 0:
+ raise Exception("Invalid URL 1")
+ url = url[5:]
+ addrs = str(url).split(",")
+ addrList = []
+ for addr in addrs:
+ tokens = addr.split(":")
+ if len(tokens) != 3:
+ raise Exception("Invalid URL 2")
+ addrList.append((tokens[1], tokens[2]))
+
+ # Find the address in the list that is most likely to be in the same subnet as the address
+ # with which we made the original QMF connection. This increases the probability that we will
+ # be able to reach the cluster member.
+
+ best = hostAddr.bestAddr(addrList)
+ bestUrl = best[0] + ":" + best[1]
+ hosts.append(bestUrl)
+ return hosts
+
+ def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None):
+ if len(subs) == 0:
+ return
+ this = subs[0]
+ remaining = subs[1:]
+ newindent = indent + " "
+ if this == 'b':
+ pass
+ elif this == 'c':
+ if broker:
+ for oid in broker.connections:
+ iconn = broker.connections[oid]
+ self.printConnSub(indent, broker.getName(), iconn)
+ self.displaySubs(remaining, newindent, broker=broker, conn=iconn,
+ sess=sess, exchange=exchange, queue=queue)
+ elif this == 's':
+ pass
+ elif this == 'e':
+ pass
+ elif this == 'q':
+ pass
+ print
+
+ def displayBroker(self, subs):
+ disp = Display(prefix=" ")
+ heads = ('Broker', 'cluster', 'uptime', 'conn', 'sess', 'exch', 'queue')
+ rows = []
+ for broker in self.brokers:
+ if self.cluster:
+ ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status)
+ else:
+ ctext = "<standalone>"
+ utext = ""
+ if broker.getUptime() > 0:
+ utext = disp.duration(broker.getUptime())
+ row = (broker.getName(), ctext, utext,
+ str(len(broker.connections)), str(len(broker.sessions)),
+ str(len(broker.exchanges)), str(len(broker.queues)))
+ rows.append(row)
+ disp.table("Brokers", heads, rows)
+
+ def displayConn(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ if self.cluster:
+ heads.append('broker')
+ heads.append('client addr')
+ heads.append('client(pid)')
+ heads.append('auth')
+ heads.append('connected')
+ heads.append('idle')
+ heads.append('msgIn')
+ heads.append('msgOut')
+ rows = []
+ for broker in self.brokers:
+ for oid in broker.connections:
+ conn = broker.connections[oid]
+ row = []
+ if self.cluster:
+ row.append(broker.getName())
+ row.append(conn.address)
+ procpid = ""
+ if conn.remoteProcessName:
+ procpid += conn.remoteProcessName
+ if conn.remotePid:
+ procpid += "(%d)" % conn.remotePid
+ row.append(procpid)
+ row.append(conn.authIdentity)
+ row.append(disp.duration(broker.getCurrentTime() - conn.getTimestamps()[1]))
+ idle = broker.getCurrentTime() - conn.getTimestamps()[0]
+ if idle < 10000000000:
+ itext = ""
+ else:
+ itext = disp.duration(idle)
+ row.append(itext)
+ row.append(num(conn.framesFromClient))
+ row.append(num(conn.framesToClient))
+ rows.append(row)
+ title = "Connections"
+ if self.cluster:
+ title += " for cluster '%s'" % self.cluster.clusterName
+ disp.table(title, heads, rows)
+
+ def displaySession(self, subs):
+ disp = Display(prefix=" ")
+
+ def displayExchange(self, subs):
+ disp = Display(prefix=" ")
+ heads = []
+ if self.cluster:
+ heads.append('broker')
+ heads.append("exchange")
+ heads.append("type")
+ heads.append("dur")
+ heads.append("bind")
+ heads.append("msgIn")
+ heads.append("msgOut")
+ heads.append("msgDrop")
+ heads.append("byteIn")
+ heads.append("byteOut")
+ heads.append("byteDrop")
+ rows = []
+ for broker in self.brokers:
+ for oid in broker.exchanges:
+ ex = broker.exchanges[oid]
+ row = []
+ if self.cluster:
+ row.append(broker.getName())
+ if ex.durable:
+ dur = "Y"
+ else:
+ dur = ""
+ row.append(ex.name)
+ row.append(ex.type)
+ row.append(dur)
+ row.append(num(ex.bindingCount))
+ row.append(num(ex.msgReceives))
+ row.append(num(ex.msgRoutes))
+ row.append(num(ex.msgDrops))
+ row.append(num(ex.byteReceives))
+ row.append(num(ex.byteRoutes))
+ row.append(num(ex.byteDrops))
+ rows.append(row)
+ title = "Exchanges"
+ if self.cluster:
+ title += " for cluster '%s'" % self.cluster.clusterName
+ disp.table(title, heads, rows)
+
+ def displayMain(self, main, subs):
+ if main == 'b': self.displayBroker(subs)
+ elif main == 'c': self.displayConn(subs)
+ elif main == 's': self.displaySession(subs)
+ elif main == 'e': self.displayExchange(subs)
+
+ def display(self):
+ self._getCluster()
+ if self.cluster:
+ memberList = self.cluster.members.split(";")
+ hostList = self._getHostList(memberList)
+ self.qmf.delBroker(self.broker)
+ self.broker = None
+ for host in hostList:
+ b = self.qmf.addBroker(host)
+ self.brokers.append(Broker(self.qmf, b))
+ else:
+ self.brokers.append(Broker(self.qmf, self.broker))
+
+ self.displayMain(_types[0], _types[1:])
+
+
+##
+## Main Program
+##
+
+try:
+ longOpts = ("top", "numeric")
+ (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "bc", longOpts)
+except:
+ Usage()
+
+try:
+ encoding = locale.getpreferredencoding()
+ cargs = [a.decode(encoding) for a in encArgs]
+except:
+ cargs = encArgs
+
+for opt in optlist:
+ if opt[0] == "-t" or opt[0] == "--top":
+ _top = True
+ elif opt[0] == "-n" or opt[0] == "--numeric":
+ _numeric = True
+ elif len(opt[0]) == 2:
+ char = opt[0][1]
+ if "bcseq".find(char) != -1:
+ _types += char
+ else:
+ Usage()
+ else:
+ Usage()
+
+if len(_types) == 0:
+ Usage()
+
+nargs = len(cargs)
+bm = BrokerManager()
+
+if nargs == 1:
+ _host = cargs[0]
+
+try:
+ bm.SetBroker(_host)
+ bm.display()
+except KeyboardInterrupt:
+ print
+except Exception,e:
+ print "Failed:", e.args
+ sys.exit(1)
+
+bm.Disconnect()
Propchange: qpid/trunk/qpid/python/commands/qpid-stat
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/trunk/qpid/python/qpid/disp.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/disp.py?rev=747897&r1=747896&r2=747897&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/disp.py (original)
+++ qpid/trunk/qpid/python/qpid/disp.py Wed Feb 25 19:41:17 2009
@@ -24,13 +24,20 @@
class Display:
""" Display formatting for QPID Management CLI """
- def __init__ (self):
- self.tableSpacing = 2
- self.tablePrefix = " "
+ def __init__ (self, spacing=2, prefix=" "):
+ self.tableSpacing = spacing
+ self.tablePrefix = prefix
self.timestampFormat = "%X"
def table (self, title, heads, rows):
""" Print a formatted table with autosized columns """
+
+ # Pad the rows to the number of heads
+ for row in rows:
+ diff = len(heads) - len(row)
+ for idx in range(diff):
+ row.append("")
+
print title
if len (rows) == 0:
return
@@ -77,3 +84,19 @@
def timestamp (self, nsec):
""" Format a nanosecond-since-the-epoch timestamp for printing """
return strftime (self.timestampFormat, gmtime (nsec / 1000000000))
+
+ def duration(self, nsec):
+ if nsec < 0: nsec = 0
+ sec = nsec / 1000000000
+ min = sec / 60
+ hour = min / 60
+ day = hour / 24
+ result = ""
+ if day > 0:
+ result = "%dd " % day
+ if hour > 0 or result != "":
+ result += "%dh " % (hour % 24)
+ if min > 0 or result != "":
+ result += "%dm " % (min % 60)
+ result += "%ds" % (sec % 60)
+ return result
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org