You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/04/21 13:46:54 UTC

svn commit: r936267 - in /qpid/trunk/qpid/java: build.xml management/example/src/main/java/org/apache/qpid/example/jmxexample/QueueInformation.java

Author: ritchiem
Date: Wed Apr 21 11:46:54 2010
New Revision: 936267

URL: http://svn.apache.org/viewvc?rev=936267&view=rev
Log:
QPID-2525: Updated Build.xml to build Management Examples
Updated QueueInformation to allow the specification of queues to monitor and the attributes to print.
Updated to only use a single connection rather than one per query.
Perform connection cleanup on shutdown

Modified:
    qpid/trunk/qpid/java/build.xml
    qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/QueueInformation.java

Modified: qpid/trunk/qpid/java/build.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/build.xml?rev=936267&r1=936266&r2=936267&view=diff
==============================================================================
--- qpid/trunk/qpid/java/build.xml (original)
+++ qpid/trunk/qpid/java/build.xml Wed Apr 21 11:46:54 2010
@@ -23,7 +23,7 @@
   <import file="common.xml"/>
 
   <property name="modules.core"       value="junit-toolkit common management/common broker client tools"/>
-  <property name="modules.examples"   value="client/example"/>
+  <property name="modules.examples"   value="client/example management/example"/>
   <property name="modules.tests"      value="systests perftests integrationtests testkit"/>
   <property name="modules.management" value="management/client management/eclipse-plugin management/agent management/console"/>
   <property name="modules.plugin"     value="broker-plugins"/>

Modified: qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/QueueInformation.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/QueueInformation.java?rev=936267&r1=936266&r2=936267&view=diff
==============================================================================
--- qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/QueueInformation.java (original)
+++ qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/QueueInformation.java Wed Apr 21 11:46:54 2010
@@ -20,32 +20,47 @@
  */
 package org.apache.qpid.example.jmxexample;
 
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
+import javax.management.InstanceNotFoundException;
+import javax.management.IntrospectionException;
 import javax.management.MBeanAttributeInfo;
 import javax.management.MBeanInfo;
 import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+import javax.management.ReflectionException;
 import javax.management.remote.JMXConnector;
 import javax.management.remote.JMXConnectorFactory;
 import javax.management.remote.JMXServiceURL;
-import javax.net.ssl.SSLException;
-
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * Connects to a server and queries all info for Queues
  * Includes _tmp queues thus covering queues underlying topic subscriptions
  */
-public class QueueInformation {
+public class QueueInformation
+{
+
+    private static long _previousTimePoint = 0l;
+    private static Map<String, Long> _previousRMC = new HashMap<String, Long>();
+    private static Map<String, Long> _previousMC = new HashMap<String, Long>();
+    private static MBeanServerConnection _mbsc;
+    private static final String DEFAULT_DATE_FORMAT = System.getProperty("qpid.dateFormat", "yyyy-MM-dd HH:mm:ss");
+    private static final SimpleDateFormat _formatter = new SimpleDateFormat(DEFAULT_DATE_FORMAT);
+
+    private static final String QUEUE_ARGS = "queues=";
+    private static Set<String> _queueNames;
+    private static final String ATTRIB_ARGS = "attributes=";
+    private static Set<String> _attribNames;
 
-    private static long previousTimePoint = 0l;
-    private static Map<String, Long> previousRMC = new HashMap<String, Long>();
-    private static Map<String, Long> previousMC = new HashMap<String, Long>();
+    private static MBeanAttributeInfo[] _attribInfo;
+    private static String _vhost;
 
     /**
      * Params:
@@ -55,117 +70,262 @@ public class QueueInformation {
      * 3: username, e.g. guest
      * 4: pwd, e.g. guest
      * 5: loop pause, no value indicates one-off, any other value is millisecs
+     * ..: attributes=<csv attribute list> , queue=<csv queue list>
+     * The queue list can use wildcards such as * and ?. Basically any value
+     * that JMX will accept in the query string for t name='' value of the queue.
      */
-    public static void main(String[] args) throws Exception {
-        if (args.length < 5) {
+    public static void main(String[] args) throws Exception
+    {
+        if (args.length < 5 || args.length > 8)
+        {
             System.out.println("Usage: ");
-            System.out.println("<host> <port> <vhost> <username> <pwd> <loop pause time in millisecs>");
+            System.out.println("<host> <port> <vhost> <username> <pwd> [<loop pause time in millisecs>] [queues=<queue list csv>] [attributes=<attribute list csv>]");
             return;
         }
         String host = args[0];
         int port = Integer.parseInt(args[1]);
-        String vhost = args[2];
+        _vhost = args[2];
         String usr = args[3];
         String pwd = args[4];
-        long pause = args.length == 6 ? Long.parseLong(args[5]) : -1l;
+        long pause = -1;
 
-        getDetails(30000, host, port, vhost, usr, pwd, pause, true);
-        previousTimePoint = System.currentTimeMillis();
-
-        if (pause > 0) {
-            while (true) {
-                Thread.currentThread().sleep(pause);
-                getDetails(30000, host, port, vhost, usr, pwd, pause, false);
-                previousTimePoint = System.currentTimeMillis();
+        if (args.length > 5)
+        {
+            try
+            {
+                pause = Long.parseLong(args[5]);
+            }
+            catch (NumberFormatException nfe)
+            {
+                // If it wasn't a queue or attribute request then show error
+                if (!!args[5].startsWith(QUEUE_ARGS) &&
+                    !!args[5].startsWith(ATTRIB_ARGS))
+                {
+                    System.out.println("Unknown argument '" + args[5] + "'");
+                    System.exit(1);
+                }
             }
         }
-    }
 
-    private static void getDetails(int timeout, String host, int port, String vhost, String username, String password, long pause, boolean printHeader) throws Exception {
-        JMXConnector con = getJMXConnection(timeout, host, port, username, password);
-        MBeanServerConnection mbsc = con.getMBeanServerConnection();
+        //Process remaining args        
+        // Skip arg 5 if we have assigned pause a value
+        int arg = (pause == -1) ? 5 : 6;
+        for (; args.length > arg; arg++)
+        {
+            processCommandLine(args[arg]);
+        }
 
-        // Gets all Queues names
-        Set<ObjectName> names = mbsc.queryNames(new ObjectName("org.apache.qpid:type=VirtualHost.Queue,*"), null);
+        JMXConnector con = getJMXConnection(host, port, usr, pwd);
 
-        // Print header
-        if (names.size() > 0 && printHeader) {
-            System.out.print("Time,");
+        _mbsc = con.getMBeanServerConnection();
+
+        Set<ObjectName> names = _mbsc.queryNames(new ObjectName("org.apache.qpid:type=VirtualHost.Queue,VirtualHost=" + _vhost + ",*"), null);
 
-            MBeanAttributeInfo[] attInfo = ((MBeanInfo) mbsc.getMBeanInfo((ObjectName) names.toArray()[0])).getAttributes();
-            for (int i = 0; attInfo.length > i; i++) {
-                System.out.print(attInfo[i].getName() + ",");
+        // Print header
+        if (names.size() > 0)
+        {
+            System.out.print("Time");
+
+            MBeanAttributeInfo[] attributeList = getAttributeList(names.toArray(new ObjectName[1])[0]);
+
+            for (int i = 0; attributeList.length > i; i++)
+            {
+                System.out.print(", " + attributeList[i].getName());
             }
 
             // Include update rate calculations
-            if (pause > 0) {
-                System.out.print("Consumption rate,");
-                System.out.print("Buildup rate");
+            if (pause > 0)
+            {
+                System.out.print(", Consumption rate");
+                System.out.print(", Receive rate");
             }
             System.out.print("\n");
         }
+        else
+        {
+            System.out.println("No queues found on specified vhost unable to continue.");
+            System.exit(1);
+        }
 
-        // Traverse objects and print data on a row basis
-        for (ObjectName object : names) {
-            MBeanAttributeInfo[] attInfo = ((MBeanInfo) mbsc.getMBeanInfo(object)).getAttributes();
+        try
+        {
+            do
+            {
+                getDetails(pause > -1);
+                if (pause > 0)
+                {
+                    _previousTimePoint = System.currentTimeMillis();
+                    Thread.currentThread().sleep(pause);
+                }
+            }
+            while (pause > 0);
+        }
+        finally
+        {
+            con.close();
+        }
+    }
+
+    private static MBeanAttributeInfo[] getAttributeList(ObjectName name)
+            throws InstanceNotFoundException, IntrospectionException, ReflectionException, IOException
+    {
+        if (_attribInfo == null)
+        {
+            MBeanAttributeInfo[] allAttribs = ((MBeanInfo) _mbsc.getMBeanInfo((ObjectName) name)).getAttributes();
+
+            if (_attribNames != null && _attribNames.size() != 0)
+            {
+                LinkedList<MBeanAttributeInfo> tmpList = new LinkedList<MBeanAttributeInfo>();
+
+                for (MBeanAttributeInfo attribute : allAttribs)
+                {
+                    if (_attribNames.contains(attribute.getName()))
+                    {
+                        tmpList.add(attribute);
+                    }
+                }
+
+                _attribInfo = tmpList.toArray(new MBeanAttributeInfo[tmpList.size()]);
+            }
+            else
+            {
+                _attribInfo = allAttribs;
+            }
+        }
+        return _attribInfo;
+    }
+
+    private static void processCommandLine(String arg)
+    {
+        if (arg.startsWith(QUEUE_ARGS))
+        {
+            String[] queues = arg.substring(arg.indexOf("=") + 1).split(",");
+
+            _queueNames = new HashSet<String>();
+
+            for (String queue : queues)
+            {
+                if (queue.length() > 0)
+                {
+                    _queueNames.add(queue);
+                }
+            }
+
+            if (_queueNames.size() == 0)
+            {
+                System.out.println("No Queues specified on queue argument: '" + arg + "'");
+                System.exit(1);
+            }
+
+        }
+        else if (arg.startsWith(ATTRIB_ARGS))
+        {
+            String[] attribs = arg.substring(arg.indexOf("=") + 1).split(",");
+
+            _attribNames = new HashSet<String>();
+
+            for (String attrib : attribs)
+            {
+                if (attrib.length() > 0)
+                {
+                    _attribNames.add(attrib);
+                }
+            }
+
+            if (_attribNames.size() == 0)
+            {
+                System.out.println("No Attributes specified on attribute argument: '" + arg + "'");
+                System.exit(1);
+            }
+        }
+        else
+        {
+            System.out.println("Unknown argument '" + arg + "'");
+            System.exit(1);
+        }
+
+    }
+
+    private static void getDetails(boolean printRates) throws Exception
+    {
+        for (ObjectName object : getMatchingObjects())
+        {
+            try
+            {
+
+                // There should normally be only one but allow queue Names such as tmp_*
+                // Line format is
+                // <time> <attributes value>, [<attribute value, ]* <consumption rate> <buildup rate>
 
-            if (object.getCanonicalKeyPropertyListString().indexOf("VirtualHost=" + vhost) >= 0) {
                 String name = object.getKeyProperty("name");
 
-                // Include the "ping" queue
-                if (name.equals("ping")) {
-                    System.out.print(Calendar.getInstance().getTime().toString().substring(11, 19) + ",");
-                    for (int i = 0; attInfo.length > i; i++) {
-                        System.out.print(mbsc.getAttribute(object, attInfo[i].getName()) + ",");
+                Date todaysDate = new java.util.Date();
+
+                System.out.print(_formatter.format(todaysDate));
+
+                MBeanAttributeInfo[] attributeList = getAttributeList(object);
+
+                for (int i = 0; attributeList.length > i; i++)
+                {
+                    System.out.print(", " + _mbsc.getAttribute(object, attributeList[i].getName()));
+                }
+
+                // Output consumption rate calc
+                if (printRates)
+                {
+                    double timeDelta = (System.currentTimeMillis() - _previousTimePoint) / 1000.0f;
+
+                    long rmc2 = (Long) _mbsc.getAttribute(object, "ReceivedMessageCount");
+                    long mc2 = (Integer) _mbsc.getAttribute(object, "MessageCount");
+
+                    long rmc1 = 0l;
+                    if (_previousRMC.get(name) != null)
+                    {
+                        rmc1 = _previousRMC.get(name);
+                    }
+                    long mc1 = 0l;
+                    if (_previousMC.get(name) != null)
+                    {
+                        mc1 = _previousMC.get(name);
                     }
-                    System.out.print("\n");
 
-                    // Include the "tmp_*" queues
-                } else if (name.indexOf("tmp") >= 0) {
-                    System.out.print(Calendar.getInstance().getTime().toString().substring(11, 19) + ",");
-                    for (int i = 0; attInfo.length > i; i++) {
-                        System.out.print(mbsc.getAttribute(object, attInfo[i].getName()) + ",");
+                    // If we don't have a previous value then ensure we print 0
+                    if (rmc1 == 0)
+                    {
+                        rmc1 = rmc2;
                     }
 
-                    // Output consumption rate calc
-                    if (pause > 0) {
-                        double timeDelta = (System.currentTimeMillis() - previousTimePoint) / 1000.0f;
-
-                        long rmc2 = (Long) mbsc.getAttribute(object, "ReceivedMessageCount");
-                        long mc2 = (Integer) mbsc.getAttribute(object, "MessageCount");
-
-                        long rmc1 = 0l;
-                        if (previousRMC.get(name) != null) {
-                            rmc1 = previousRMC.get(name);
-                        }
-                        long mc1 = 0l;
-                        if (previousMC.get(name) != null) {
-                            mc1 = previousMC.get(name);
-                        }
-
-                        if (rmc1 > 0) {
-                            double consumptionRate = ((rmc2 - rmc1) - (mc2 - mc1)) / timeDelta;
-                            System.out.print(consumptionRate);
-
-                            System.out.print(",");
-
-                            double buildupRate = (mc2 - mc1) / timeDelta;
-                            System.out.print(buildupRate);
-                        } else {
-                            System.out.print("null");
-                        }
+                    double consumptionRate = ((rmc2 - rmc1) - (mc2 - mc1)) / timeDelta;
+                    System.out.print(", ");
 
-                        previousRMC.put(name, rmc2);
-                        previousMC.put(name, mc2);
-                    }
+                    System.out.print(String.format("%.2f", consumptionRate));
+
+                    System.out.print(", ");
+                    double buildupRate = (mc2 - mc1) / timeDelta;
+                    System.out.print(String.format("%.2f", buildupRate));
 
-                    System.out.print("\n");
+                    _previousRMC.put(name, rmc2);
+                    _previousMC.put(name, mc2);
                 }
             }
-        }
+            catch (InstanceNotFoundException e)
+            {
+                System.out.print(" ..queue has been removed.");
+            }
+            catch (Exception e)
+            {
+                System.out.print(" ..error :" + e.getMessage());
+            }
+            finally
+            {
+                System.out.print("\n");
+            }
+
+        }// for ObjectName
     }
 
-    private static JMXConnector getJMXConnection(int timeout, String host, int port, String username, String password) throws SSLException, IOException, Exception {
+    private static JMXConnector getJMXConnection(String host, int port, String username, String password) throws IOException
+    {
         //Open JMX connection
         JMXServiceURL jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + host + ":" + port + "/jmxrmi");
         Map<String, Object> env = new HashMap<String, Object>();
@@ -173,5 +333,30 @@ public class QueueInformation {
         JMXConnector con = JMXConnectorFactory.connect(jmxUrl, env);
         return con;
     }
+
+    public static ObjectName[] getMatchingObjects() throws IOException, MalformedObjectNameException
+    {
+
+        // Gets all Queues names
+        if (_queueNames == null)
+        {
+            _queueNames = new HashSet<String>();
+            _queueNames.add("*");
+        }
+
+        Set<ObjectName> requestedObjects = new HashSet<ObjectName>();
+
+        for (String queue : _queueNames)
+        {
+            Set<ObjectName> matchingObjects = _mbsc.queryNames(new ObjectName("org.apache.qpid:type=VirtualHost.Queue,VirtualHost=" + _vhost + ",name=" + queue + ",*"), null);
+
+            if (!matchingObjects.isEmpty())
+            {
+                requestedObjects.addAll(matchingObjects);
+            }
+        }
+
+        return requestedObjects.toArray(new ObjectName[requestedObjects.size()]);
+    }
 }
 



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org