You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2010/04/21 13:46:54 UTC
svn commit: r936267 - in /qpid/trunk/qpid/java: build.xml
management/example/src/main/java/org/apache/qpid/example/jmxexample/QueueInformation.java
Author: ritchiem
Date: Wed Apr 21 11:46:54 2010
New Revision: 936267
URL: http://svn.apache.org/viewvc?rev=936267&view=rev
Log:
QPID-2525: Updated Build.xml to build Management Examples
Updated QueueInformation to allow the specification of queues to monitor and the attributes to print.
Updated to only use a single connection rather than one per query.
Perform connection cleanup on shutdown
Modified:
qpid/trunk/qpid/java/build.xml
qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/QueueInformation.java
Modified: qpid/trunk/qpid/java/build.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/build.xml?rev=936267&r1=936266&r2=936267&view=diff
==============================================================================
--- qpid/trunk/qpid/java/build.xml (original)
+++ qpid/trunk/qpid/java/build.xml Wed Apr 21 11:46:54 2010
@@ -23,7 +23,7 @@
<import file="common.xml"/>
<property name="modules.core" value="junit-toolkit common management/common broker client tools"/>
- <property name="modules.examples" value="client/example"/>
+ <property name="modules.examples" value="client/example management/example"/>
<property name="modules.tests" value="systests perftests integrationtests testkit"/>
<property name="modules.management" value="management/client management/eclipse-plugin management/agent management/console"/>
<property name="modules.plugin" value="broker-plugins"/>
Modified: qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/QueueInformation.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/QueueInformation.java?rev=936267&r1=936266&r2=936267&view=diff
==============================================================================
--- qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/QueueInformation.java (original)
+++ qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/QueueInformation.java Wed Apr 21 11:46:54 2010
@@ -20,32 +20,47 @@
*/
package org.apache.qpid.example.jmxexample;
-import java.io.IOException;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
+import javax.management.InstanceNotFoundException;
+import javax.management.IntrospectionException;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;
import javax.management.MBeanServerConnection;
+import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
+import javax.management.ReflectionException;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
-import javax.net.ssl.SSLException;
-
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Set;
/**
* Connects to a server and queries all info for Queues
* Includes _tmp queues thus covering queues underlying topic subscriptions
*/
-public class QueueInformation {
+public class QueueInformation
+{
+
+ private static long _previousTimePoint = 0l;
+ private static Map<String, Long> _previousRMC = new HashMap<String, Long>();
+ private static Map<String, Long> _previousMC = new HashMap<String, Long>();
+ private static MBeanServerConnection _mbsc;
+ private static final String DEFAULT_DATE_FORMAT = System.getProperty("qpid.dateFormat", "yyyy-MM-dd HH:mm:ss");
+ private static final SimpleDateFormat _formatter = new SimpleDateFormat(DEFAULT_DATE_FORMAT);
+
+ private static final String QUEUE_ARGS = "queues=";
+ private static Set<String> _queueNames;
+ private static final String ATTRIB_ARGS = "attributes=";
+ private static Set<String> _attribNames;
- private static long previousTimePoint = 0l;
- private static Map<String, Long> previousRMC = new HashMap<String, Long>();
- private static Map<String, Long> previousMC = new HashMap<String, Long>();
+ private static MBeanAttributeInfo[] _attribInfo;
+ private static String _vhost;
/**
* Params:
@@ -55,117 +70,262 @@ public class QueueInformation {
* 3: username, e.g. guest
* 4: pwd, e.g. guest
* 5: loop pause, no value indicates one-off, any other value is millisecs
+ * ..: attributes=<csv attribute list> , queue=<csv queue list>
+ * The queue list can use wildcards such as * and ?. Basically any value
+ * that JMX will accept in the query string for t name='' value of the queue.
*/
- public static void main(String[] args) throws Exception {
- if (args.length < 5) {
+ public static void main(String[] args) throws Exception
+ {
+ if (args.length < 5 || args.length > 8)
+ {
System.out.println("Usage: ");
- System.out.println("<host> <port> <vhost> <username> <pwd> <loop pause time in millisecs>");
+ System.out.println("<host> <port> <vhost> <username> <pwd> [<loop pause time in millisecs>] [queues=<queue list csv>] [attributes=<attribute list csv>]");
return;
}
String host = args[0];
int port = Integer.parseInt(args[1]);
- String vhost = args[2];
+ _vhost = args[2];
String usr = args[3];
String pwd = args[4];
- long pause = args.length == 6 ? Long.parseLong(args[5]) : -1l;
+ long pause = -1;
- getDetails(30000, host, port, vhost, usr, pwd, pause, true);
- previousTimePoint = System.currentTimeMillis();
-
- if (pause > 0) {
- while (true) {
- Thread.currentThread().sleep(pause);
- getDetails(30000, host, port, vhost, usr, pwd, pause, false);
- previousTimePoint = System.currentTimeMillis();
+ if (args.length > 5)
+ {
+ try
+ {
+ pause = Long.parseLong(args[5]);
+ }
+ catch (NumberFormatException nfe)
+ {
+ // If it wasn't a queue or attribute request then show error
+ if (!!args[5].startsWith(QUEUE_ARGS) &&
+ !!args[5].startsWith(ATTRIB_ARGS))
+ {
+ System.out.println("Unknown argument '" + args[5] + "'");
+ System.exit(1);
+ }
}
}
- }
- private static void getDetails(int timeout, String host, int port, String vhost, String username, String password, long pause, boolean printHeader) throws Exception {
- JMXConnector con = getJMXConnection(timeout, host, port, username, password);
- MBeanServerConnection mbsc = con.getMBeanServerConnection();
+ //Process remaining args
+ // Skip arg 5 if we have assigned pause a value
+ int arg = (pause == -1) ? 5 : 6;
+ for (; args.length > arg; arg++)
+ {
+ processCommandLine(args[arg]);
+ }
- // Gets all Queues names
- Set<ObjectName> names = mbsc.queryNames(new ObjectName("org.apache.qpid:type=VirtualHost.Queue,*"), null);
+ JMXConnector con = getJMXConnection(host, port, usr, pwd);
- // Print header
- if (names.size() > 0 && printHeader) {
- System.out.print("Time,");
+ _mbsc = con.getMBeanServerConnection();
+
+ Set<ObjectName> names = _mbsc.queryNames(new ObjectName("org.apache.qpid:type=VirtualHost.Queue,VirtualHost=" + _vhost + ",*"), null);
- MBeanAttributeInfo[] attInfo = ((MBeanInfo) mbsc.getMBeanInfo((ObjectName) names.toArray()[0])).getAttributes();
- for (int i = 0; attInfo.length > i; i++) {
- System.out.print(attInfo[i].getName() + ",");
+ // Print header
+ if (names.size() > 0)
+ {
+ System.out.print("Time");
+
+ MBeanAttributeInfo[] attributeList = getAttributeList(names.toArray(new ObjectName[1])[0]);
+
+ for (int i = 0; attributeList.length > i; i++)
+ {
+ System.out.print(", " + attributeList[i].getName());
}
// Include update rate calculations
- if (pause > 0) {
- System.out.print("Consumption rate,");
- System.out.print("Buildup rate");
+ if (pause > 0)
+ {
+ System.out.print(", Consumption rate");
+ System.out.print(", Receive rate");
}
System.out.print("\n");
}
+ else
+ {
+ System.out.println("No queues found on specified vhost unable to continue.");
+ System.exit(1);
+ }
- // Traverse objects and print data on a row basis
- for (ObjectName object : names) {
- MBeanAttributeInfo[] attInfo = ((MBeanInfo) mbsc.getMBeanInfo(object)).getAttributes();
+ try
+ {
+ do
+ {
+ getDetails(pause > -1);
+ if (pause > 0)
+ {
+ _previousTimePoint = System.currentTimeMillis();
+ Thread.currentThread().sleep(pause);
+ }
+ }
+ while (pause > 0);
+ }
+ finally
+ {
+ con.close();
+ }
+ }
+
+ private static MBeanAttributeInfo[] getAttributeList(ObjectName name)
+ throws InstanceNotFoundException, IntrospectionException, ReflectionException, IOException
+ {
+ if (_attribInfo == null)
+ {
+ MBeanAttributeInfo[] allAttribs = ((MBeanInfo) _mbsc.getMBeanInfo((ObjectName) name)).getAttributes();
+
+ if (_attribNames != null && _attribNames.size() != 0)
+ {
+ LinkedList<MBeanAttributeInfo> tmpList = new LinkedList<MBeanAttributeInfo>();
+
+ for (MBeanAttributeInfo attribute : allAttribs)
+ {
+ if (_attribNames.contains(attribute.getName()))
+ {
+ tmpList.add(attribute);
+ }
+ }
+
+ _attribInfo = tmpList.toArray(new MBeanAttributeInfo[tmpList.size()]);
+ }
+ else
+ {
+ _attribInfo = allAttribs;
+ }
+ }
+ return _attribInfo;
+ }
+
+ private static void processCommandLine(String arg)
+ {
+ if (arg.startsWith(QUEUE_ARGS))
+ {
+ String[] queues = arg.substring(arg.indexOf("=") + 1).split(",");
+
+ _queueNames = new HashSet<String>();
+
+ for (String queue : queues)
+ {
+ if (queue.length() > 0)
+ {
+ _queueNames.add(queue);
+ }
+ }
+
+ if (_queueNames.size() == 0)
+ {
+ System.out.println("No Queues specified on queue argument: '" + arg + "'");
+ System.exit(1);
+ }
+
+ }
+ else if (arg.startsWith(ATTRIB_ARGS))
+ {
+ String[] attribs = arg.substring(arg.indexOf("=") + 1).split(",");
+
+ _attribNames = new HashSet<String>();
+
+ for (String attrib : attribs)
+ {
+ if (attrib.length() > 0)
+ {
+ _attribNames.add(attrib);
+ }
+ }
+
+ if (_attribNames.size() == 0)
+ {
+ System.out.println("No Attributes specified on attribute argument: '" + arg + "'");
+ System.exit(1);
+ }
+ }
+ else
+ {
+ System.out.println("Unknown argument '" + arg + "'");
+ System.exit(1);
+ }
+
+ }
+
+ private static void getDetails(boolean printRates) throws Exception
+ {
+ for (ObjectName object : getMatchingObjects())
+ {
+ try
+ {
+
+ // There should normally be only one but allow queue Names such as tmp_*
+ // Line format is
+ // <time> <attributes value>, [<attribute value, ]* <consumption rate> <buildup rate>
- if (object.getCanonicalKeyPropertyListString().indexOf("VirtualHost=" + vhost) >= 0) {
String name = object.getKeyProperty("name");
- // Include the "ping" queue
- if (name.equals("ping")) {
- System.out.print(Calendar.getInstance().getTime().toString().substring(11, 19) + ",");
- for (int i = 0; attInfo.length > i; i++) {
- System.out.print(mbsc.getAttribute(object, attInfo[i].getName()) + ",");
+ Date todaysDate = new java.util.Date();
+
+ System.out.print(_formatter.format(todaysDate));
+
+ MBeanAttributeInfo[] attributeList = getAttributeList(object);
+
+ for (int i = 0; attributeList.length > i; i++)
+ {
+ System.out.print(", " + _mbsc.getAttribute(object, attributeList[i].getName()));
+ }
+
+ // Output consumption rate calc
+ if (printRates)
+ {
+ double timeDelta = (System.currentTimeMillis() - _previousTimePoint) / 1000.0f;
+
+ long rmc2 = (Long) _mbsc.getAttribute(object, "ReceivedMessageCount");
+ long mc2 = (Integer) _mbsc.getAttribute(object, "MessageCount");
+
+ long rmc1 = 0l;
+ if (_previousRMC.get(name) != null)
+ {
+ rmc1 = _previousRMC.get(name);
+ }
+ long mc1 = 0l;
+ if (_previousMC.get(name) != null)
+ {
+ mc1 = _previousMC.get(name);
}
- System.out.print("\n");
- // Include the "tmp_*" queues
- } else if (name.indexOf("tmp") >= 0) {
- System.out.print(Calendar.getInstance().getTime().toString().substring(11, 19) + ",");
- for (int i = 0; attInfo.length > i; i++) {
- System.out.print(mbsc.getAttribute(object, attInfo[i].getName()) + ",");
+ // If we don't have a previous value then ensure we print 0
+ if (rmc1 == 0)
+ {
+ rmc1 = rmc2;
}
- // Output consumption rate calc
- if (pause > 0) {
- double timeDelta = (System.currentTimeMillis() - previousTimePoint) / 1000.0f;
-
- long rmc2 = (Long) mbsc.getAttribute(object, "ReceivedMessageCount");
- long mc2 = (Integer) mbsc.getAttribute(object, "MessageCount");
-
- long rmc1 = 0l;
- if (previousRMC.get(name) != null) {
- rmc1 = previousRMC.get(name);
- }
- long mc1 = 0l;
- if (previousMC.get(name) != null) {
- mc1 = previousMC.get(name);
- }
-
- if (rmc1 > 0) {
- double consumptionRate = ((rmc2 - rmc1) - (mc2 - mc1)) / timeDelta;
- System.out.print(consumptionRate);
-
- System.out.print(",");
-
- double buildupRate = (mc2 - mc1) / timeDelta;
- System.out.print(buildupRate);
- } else {
- System.out.print("null");
- }
+ double consumptionRate = ((rmc2 - rmc1) - (mc2 - mc1)) / timeDelta;
+ System.out.print(", ");
- previousRMC.put(name, rmc2);
- previousMC.put(name, mc2);
- }
+ System.out.print(String.format("%.2f", consumptionRate));
+
+ System.out.print(", ");
+ double buildupRate = (mc2 - mc1) / timeDelta;
+ System.out.print(String.format("%.2f", buildupRate));
- System.out.print("\n");
+ _previousRMC.put(name, rmc2);
+ _previousMC.put(name, mc2);
}
}
- }
+ catch (InstanceNotFoundException e)
+ {
+ System.out.print(" ..queue has been removed.");
+ }
+ catch (Exception e)
+ {
+ System.out.print(" ..error :" + e.getMessage());
+ }
+ finally
+ {
+ System.out.print("\n");
+ }
+
+ }// for ObjectName
}
- private static JMXConnector getJMXConnection(int timeout, String host, int port, String username, String password) throws SSLException, IOException, Exception {
+ private static JMXConnector getJMXConnection(String host, int port, String username, String password) throws IOException
+ {
//Open JMX connection
JMXServiceURL jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + host + ":" + port + "/jmxrmi");
Map<String, Object> env = new HashMap<String, Object>();
@@ -173,5 +333,30 @@ public class QueueInformation {
JMXConnector con = JMXConnectorFactory.connect(jmxUrl, env);
return con;
}
+
+ public static ObjectName[] getMatchingObjects() throws IOException, MalformedObjectNameException
+ {
+
+ // Gets all Queues names
+ if (_queueNames == null)
+ {
+ _queueNames = new HashSet<String>();
+ _queueNames.add("*");
+ }
+
+ Set<ObjectName> requestedObjects = new HashSet<ObjectName>();
+
+ for (String queue : _queueNames)
+ {
+ Set<ObjectName> matchingObjects = _mbsc.queryNames(new ObjectName("org.apache.qpid:type=VirtualHost.Queue,VirtualHost=" + _vhost + ",name=" + queue + ",*"), null);
+
+ if (!matchingObjects.isEmpty())
+ {
+ requestedObjects.addAll(matchingObjects);
+ }
+ }
+
+ return requestedObjects.toArray(new ObjectName[requestedObjects.size()]);
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org