You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by tr...@apache.org on 2009/02/25 20:41:17 UTC

svn commit: r747897 - in /qpid/trunk/qpid/python: commands/qpid-cluster commands/qpid-stat qpid/disp.py

Author: tross
Date: Wed Feb 25 19:41:17 2009
New Revision: 747897

URL: http://svn.apache.org/viewvc?rev=747897&view=rev
Log:
Added a new utility for viewing broker stats.
Fixed a bug in qpid-cluster that causes failure when username/password are
included in the broker URL.

Added:
    qpid/trunk/qpid/python/commands/qpid-stat   (with props)
Modified:
    qpid/trunk/qpid/python/commands/qpid-cluster
    qpid/trunk/qpid/python/qpid/disp.py

Modified: qpid/trunk/qpid/python/commands/qpid-cluster
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-cluster?rev=747897&r1=747896&r2=747897&view=diff
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-cluster (original)
+++ qpid/trunk/qpid/python/commands/qpid-cluster Wed Feb 25 19:41:17 2009
@@ -56,6 +56,9 @@
 
 class IpAddr:
     def __init__(self, text):
+        if text.find("@") != -1:
+            tokens = text.split("@")
+            text = tokens[1]
         if text.find(":") != -1:
             tokens = text.split(":")
             text = tokens[0]

Added: qpid/trunk/qpid/python/commands/qpid-stat
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/commands/qpid-stat?rev=747897&view=auto
==============================================================================
--- qpid/trunk/qpid/python/commands/qpid-stat (added)
+++ qpid/trunk/qpid/python/commands/qpid-stat Wed Feb 25 19:41:17 2009
@@ -0,0 +1,385 @@
+#!/usr/bin/env python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import getopt
+import sys
+import locale
+import socket
+import re
+from qmf.console import Session, Console
+from qpid.disp import Display
+
+_host = "localhost"
+_top = False
+_types = ""
+pattern = re.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+:\\d+$")
+
+def Usage ():
+    print "Usage:  qpid-stat [OPTIONS] [broker-addr]"
+    print
+    print "             broker-addr is in the form:   [username/password@] hostname | ip-address [:<port>]"
+    print "             ex:  localhost, 10.1.1.7:10000, broker-host:10000, guest/guest@localhost"
+    print
+#    print "General Options:"
+#    print "     -n [--numeric]  Don't resolve names"
+#    print "     -t [--top]      Repeatedly display top items"
+#    print
+    print "Display Options:"
+    print
+    print "     -b   Show Brokers"
+    print "     -c   Show Connections"
+#    print "     -s   Show Sessions"
+#    print "     -e   Show Exchanges"
+#    print "     -q   Show Queues"
+    print
+    sys.exit (1)
+
+def num(value):
+    if value < 2000:
+        return str(value)
+    value /= 1000
+    if value < 2000:
+        return str(value) + "k"
+    value /= 1000
+    if value < 2000:
+        return str(value) + "m"
+    value /= 1000
+    return str(value) + "g"
+
+class IpAddr:
+    def __init__(self, text):
+        if text.find("@") != -1:
+            tokens = text.split("@")
+            text = tokens[1]
+        if text.find(":") != -1:
+            tokens = text.split(":")
+            text = tokens[0]
+            self.port = int(tokens[1])
+        else:
+            self.port = 5672
+        self.dottedQuad = socket.gethostbyname(text)
+        nums = self.dottedQuad.split(".")
+        self.addr = (int(nums[0]) << 24) + (int(nums[1]) << 16) + (int(nums[2]) << 8) + int(nums[3])
+
+    def bestAddr(self, addrPortList):
+        bestDiff = 0xFFFFFFFF
+        bestAddr = None
+        for addrPort in addrPortList:
+            diff = IpAddr(addrPort[0]).addr ^ self.addr
+            if diff < bestDiff:
+                bestDiff = diff
+                bestAddr = addrPort
+        return bestAddr
+
+class Broker(object):
+    def __init__(self, qmf, broker):
+        self.broker = broker
+        list = qmf.getObjects(_class="connection", _package="org.apache.qpid.broker", _broker=broker)
+        bobj = qmf.getObjects(_class="broker", _package="org.apache.qpid.broker", _broker=broker)[0]
+        self.currentTime = bobj.getTimestamps()[0]
+        try:
+            self.uptime = bobj.uptime
+        except:
+            self.uptime = 0
+        self.connections = {}
+        self.sessions = {}
+        self.exchanges = {}
+        self.queues = {}
+        for conn in list:
+            if pattern.match(conn.address):
+                self.connections[conn.getObjectId()] = conn
+        list = qmf.getObjects(_class="session", _package="org.apache.qpid.broker", _broker=broker)
+        for sess in list:
+            if sess.connectionRef in self.connections:
+                self.sessions[sess.getObjectId()] = sess
+        list = qmf.getObjects(_class="exchange", _package="org.apache.qpid.broker", _broker=broker)
+        for exchange in list:
+            self.exchanges[exchange.getObjectId()] = exchange
+        list = qmf.getObjects(_class="queue", _package="org.apache.qpid.broker", _broker=broker)
+        for queue in list:
+            self.queues[queue.getObjectId()] = queue
+
+    def getName(self):
+        return self.broker.getUrl()
+
+    def getCurrentTime(self):
+        return self.currentTime
+
+    def getUptime(self):
+        return self.uptime
+
+class BrokerManager(Console):
+    def __init__(self):
+        self.brokerName = None
+        self.qmf        = None
+        self.broker     = None
+        self.brokers    = []
+        self.cluster    = None
+
+    def SetBroker(self, brokerUrl):
+        self.url = brokerUrl
+        self.qmf = Session()
+        self.broker = self.qmf.addBroker(brokerUrl)
+        agents = self.qmf.getAgents()
+        for a in agents:
+            if a.getAgentBank() == 0:
+                self.brokerAgent = a
+
+    def Disconnect(self):
+        if self.broker:
+            self.qmf.delBroker(self.broker)
+
+    def _getCluster(self):
+        packages = self.qmf.getPackages()
+        if "org.apache.qpid.cluster" not in packages:
+            return None
+
+        clusters = self.qmf.getObjects(_class="cluster", _agent=self.brokerAgent)
+        if len(clusters) == 0:
+            print "Clustering is installed but not enabled on the broker."
+            return None
+
+        self.cluster = clusters[0]
+
+    def _getHostList(self, urlList):
+        hosts = []
+        hostAddr = IpAddr(_host)
+        for url in urlList:
+            if url.find("amqp:") != 0:
+                raise Exception("Invalid URL 1")
+            url = url[5:]
+            addrs = str(url).split(",")
+            addrList = []
+            for addr in addrs:
+                tokens = addr.split(":")
+                if len(tokens) != 3:
+                    raise Exception("Invalid URL 2")
+                addrList.append((tokens[1], tokens[2]))
+
+            # Find the address in the list that is most likely to be in the same subnet as the address
+            # with which we made the original QMF connection.  This increases the probability that we will
+            # be able to reach the cluster member.
+
+            best = hostAddr.bestAddr(addrList)
+            bestUrl = best[0] + ":" + best[1]
+            hosts.append(bestUrl)
+        return hosts
+
+    def displaySubs(self, subs, indent, broker=None, conn=None, sess=None, exchange=None, queue=None):
+        if len(subs) == 0:
+            return
+        this = subs[0]
+        remaining = subs[1:]
+        newindent = indent + "  "
+        if this == 'b':
+            pass
+        elif this == 'c':
+            if broker:
+                for oid in broker.connections:
+                    iconn = broker.connections[oid]
+                    self.printConnSub(indent, broker.getName(), iconn)
+                    self.displaySubs(remaining, newindent, broker=broker, conn=iconn,
+                                     sess=sess, exchange=exchange, queue=queue)
+        elif this == 's':
+            pass
+        elif this == 'e':
+            pass
+        elif this == 'q':
+            pass
+        print
+
+    def displayBroker(self, subs):
+        disp = Display(prefix="  ")
+        heads = ('Broker', 'cluster', 'uptime', 'conn', 'sess', 'exch', 'queue')
+        rows = []
+        for broker in self.brokers:
+            if self.cluster:
+                ctext = "%s(%s)" % (self.cluster.clusterName, self.cluster.status)
+            else:
+                ctext = "<standalone>"
+            utext = ""
+            if broker.getUptime() > 0:
+                utext = disp.duration(broker.getUptime())
+            row = (broker.getName(), ctext, utext,
+                   str(len(broker.connections)), str(len(broker.sessions)),
+                   str(len(broker.exchanges)), str(len(broker.queues)))
+            rows.append(row)
+        disp.table("Brokers", heads, rows)
+
+    def displayConn(self, subs):
+        disp = Display(prefix="  ")
+        heads = []
+        if self.cluster:
+            heads.append('broker')
+        heads.append('client addr')
+        heads.append('client(pid)')
+        heads.append('auth')
+        heads.append('connected')
+        heads.append('idle')
+        heads.append('msgIn')
+        heads.append('msgOut')
+        rows = []
+        for broker in self.brokers:
+            for oid in broker.connections:
+                conn = broker.connections[oid]
+                row = []
+                if self.cluster:
+                    row.append(broker.getName())
+                row.append(conn.address)
+                procpid = ""
+                if conn.remoteProcessName:
+                    procpid += conn.remoteProcessName
+                if conn.remotePid:
+                    procpid += "(%d)" % conn.remotePid
+                row.append(procpid)
+                row.append(conn.authIdentity)
+                row.append(disp.duration(broker.getCurrentTime() - conn.getTimestamps()[1]))
+                idle = broker.getCurrentTime() - conn.getTimestamps()[0]
+                if idle < 10000000000:
+                    itext = ""
+                else:
+                    itext = disp.duration(idle)
+                row.append(itext)
+                row.append(num(conn.framesFromClient))
+                row.append(num(conn.framesToClient))
+                rows.append(row)
+        title = "Connections"
+        if self.cluster:
+            title += " for cluster '%s'" % self.cluster.clusterName
+        disp.table(title, heads, rows)
+
+    def displaySession(self, subs):
+        disp = Display(prefix="  ")
+
+    def displayExchange(self, subs):
+        disp = Display(prefix="  ")
+        heads = []
+        if self.cluster:
+            heads.append('broker')
+        heads.append("exchange")
+        heads.append("type")
+        heads.append("dur")
+        heads.append("bind")
+        heads.append("msgIn")
+        heads.append("msgOut")
+        heads.append("msgDrop")
+        heads.append("byteIn")
+        heads.append("byteOut")
+        heads.append("byteDrop")
+        rows = []
+        for broker in self.brokers:
+            for oid in broker.exchanges:
+                ex = broker.exchanges[oid]
+                row = []
+                if self.cluster:
+                    row.append(broker.getName())
+                if ex.durable:
+                    dur = "Y"
+                else:
+                    dur = ""
+                row.append(ex.name)
+                row.append(ex.type)
+                row.append(dur)
+                row.append(num(ex.bindingCount))
+                row.append(num(ex.msgReceives))
+                row.append(num(ex.msgRoutes))
+                row.append(num(ex.msgDrops))
+                row.append(num(ex.byteReceives))
+                row.append(num(ex.byteRoutes))
+                row.append(num(ex.byteDrops))
+                rows.append(row)
+        title = "Exchanges"
+        if self.cluster:
+            title += " for cluster '%s'" % self.cluster.clusterName
+        disp.table(title, heads, rows)
+
+    def displayMain(self, main, subs):
+        if   main == 'b': self.displayBroker(subs)
+        elif main == 'c': self.displayConn(subs)
+        elif main == 's': self.displaySession(subs)
+        elif main == 'e': self.displayExchange(subs)
+
+    def display(self):
+        self._getCluster()
+        if self.cluster:
+            memberList = self.cluster.members.split(";")
+            hostList = self._getHostList(memberList)
+            self.qmf.delBroker(self.broker)
+            self.broker = None
+            for host in hostList:
+                b = self.qmf.addBroker(host)
+                self.brokers.append(Broker(self.qmf, b))
+        else:
+            self.brokers.append(Broker(self.qmf, self.broker))
+
+        self.displayMain(_types[0], _types[1:])
+
+
+##
+## Main Program
+##
+
+try:
+    longOpts = ("top", "numeric")
+    (optlist, encArgs) = getopt.gnu_getopt(sys.argv[1:], "bc", longOpts)
+except:
+    Usage()
+
+try:
+    encoding = locale.getpreferredencoding()
+    cargs = [a.decode(encoding) for a in encArgs]
+except:
+    cargs = encArgs
+
+for opt in optlist:
+    if opt[0] == "-t" or opt[0] == "--top":
+        _top = True
+    elif opt[0] == "-n" or opt[0] == "--numeric":
+        _numeric = True
+    elif len(opt[0]) == 2:
+        char = opt[0][1]
+        if "bcseq".find(char) != -1:
+            _types += char
+        else:
+            Usage()
+    else:
+        Usage()
+
+if len(_types) == 0:
+    Usage()
+
+nargs = len(cargs)
+bm    = BrokerManager()
+
+if nargs == 1:
+    _host = cargs[0]
+
+try:
+    bm.SetBroker(_host)
+    bm.display()
+except KeyboardInterrupt:
+    print
+except Exception,e:
+    print "Failed:", e.args
+    sys.exit(1)
+
+bm.Disconnect()

Propchange: qpid/trunk/qpid/python/commands/qpid-stat
------------------------------------------------------------------------------
    svn:executable = *

Modified: qpid/trunk/qpid/python/qpid/disp.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/disp.py?rev=747897&r1=747896&r2=747897&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/disp.py (original)
+++ qpid/trunk/qpid/python/qpid/disp.py Wed Feb 25 19:41:17 2009
@@ -24,13 +24,20 @@
 class Display:
   """ Display formatting for QPID Management CLI """
   
-  def __init__ (self):
-    self.tableSpacing    = 2
-    self.tablePrefix     = "    "
+  def __init__ (self, spacing=2, prefix="    "):
+    self.tableSpacing    = spacing
+    self.tablePrefix     = prefix
     self.timestampFormat = "%X"
 
   def table (self, title, heads, rows):
     """ Print a formatted table with autosized columns """
+
+    # Pad the rows to the number of heads
+    for row in rows:
+      diff = len(heads) - len(row)
+      for idx in range(diff):
+        row.append("")
+
     print title
     if len (rows) == 0:
       return
@@ -77,3 +84,19 @@
   def timestamp (self, nsec):
     """ Format a nanosecond-since-the-epoch timestamp for printing """
     return strftime (self.timestampFormat, gmtime (nsec / 1000000000))
+
+  def duration(self, nsec):
+    if nsec < 0: nsec = 0
+    sec = nsec / 1000000000
+    min = sec / 60
+    hour = min / 60
+    day = hour / 24
+    result = ""
+    if day > 0:
+      result = "%dd " % day
+    if hour > 0 or result != "":
+      result += "%dh " % (hour % 24)
+    if min > 0 or result != "":
+      result += "%dm " % (min % 60)
+    result += "%ds" % (sec % 60)
+    return result



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org