You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2010/02/18 12:06:40 UTC

svn commit: r911326 - in /qpid/trunk/qpid/java/management/example: ./ src/ src/main/ src/main/java/ src/main/java/org/ src/main/java/org/apache/ src/main/java/org/apache/qpid/ src/main/java/org/apache/qpid/example/ src/main/java/org/apache/qpid/example...

Author: robbie
Date: Thu Feb 18 11:06:39 2010
New Revision: 911326

URL: http://svn.apache.org/viewvc?rev=911326&view=rev
Log:
Add some initial examples of using jmx to add new queues and delete messages from tmp_* queues. Create new management/examples submodule to hold them

Added:
    qpid/trunk/qpid/java/management/example/
    qpid/trunk/qpid/java/management/example/build.xml
    qpid/trunk/qpid/java/management/example/src/
    qpid/trunk/qpid/java/management/example/src/main/
    qpid/trunk/qpid/java/management/example/src/main/java/
    qpid/trunk/qpid/java/management/example/src/main/java/org/
    qpid/trunk/qpid/java/management/example/src/main/java/org/apache/
    qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/
    qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/
    qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/
    qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/AddQueue.java
    qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/DeleteMessagesFromTopOfTmp.java

Added: qpid/trunk/qpid/java/management/example/build.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/example/build.xml?rev=911326&view=auto
==============================================================================
--- qpid/trunk/qpid/java/management/example/build.xml (added)
+++ qpid/trunk/qpid/java/management/example/build.xml Thu Feb 18 11:06:39 2010
@@ -0,0 +1,27 @@
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements.  See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership.  The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License.  You may obtain a copy of the License at
+ - 
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ - 
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied.  See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<project name="Management Examples" default="build">
+
+    <property name="module.depends" value="management/common"/>
+
+    <import file="../../module.xml"/>
+
+</project>

Added: qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/AddQueue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/AddQueue.java?rev=911326&view=auto
==============================================================================
--- qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/AddQueue.java (added)
+++ qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/AddQueue.java Thu Feb 18 11:06:39 2010
@@ -0,0 +1,149 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.    
+ * 
+ */
+package org.apache.qpid.example.jmxexample;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+import org.apache.qpid.management.common.mbeans.ManagedBroker;
+import org.apache.qpid.management.common.mbeans.ManagedExchange;
+
+public class AddQueue
+{
+
+    public static void main(String[] args)
+    {
+        //Example: add 'newqueue' to the 'test' virtualhost and bind to the 'amq.direct' exchange
+        //TODO: take these parameters as arguments
+        
+        addQueue("test", "amq.direct", "newqueue");
+    }
+    
+    private static JMXConnector getJMXConnection() throws Exception
+    {
+        //TODO: Take these parameters as main+method arguments
+        String host = "localhost";
+        int port = 8999;
+        String username = "admin";
+        String password = "admin";
+        
+        Map<String, Object> env = new HashMap<String, Object>();
+        JMXServiceURL jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + host + ":" + port + "/jmxrmi");
+
+        //Add user credential's to environment map for RMIConnector startup. 
+        env.put(JMXConnector.CREDENTIALS, new String[] {username,password});
+        
+        return JMXConnectorFactory.connect(jmxUrl, env);
+    }
+    
+    public static boolean addQueue(String virHost, String exchName, String queueName) {
+
+        JMXConnector jmxc = null;
+        try 
+        {
+            jmxc = getJMXConnection();
+            
+            MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
+
+            ObjectName hostManagerObjectName = new ObjectName( 
+                    "org.apache.qpid:" + 
+                    "type=VirtualHost.VirtualHostManager," + 
+                    "VirtualHost=" + virHost + ",*"); 
+
+            Set<ObjectName> vhostManagers = mbsc.queryNames(hostManagerObjectName, null);
+            
+            if(vhostManagers.size() == 0)
+            {
+                //The vhostManager MBean wasnt found, cant procede
+                return false;
+            }
+            
+            ManagedBroker vhostManager = (ManagedBroker) MBeanServerInvocationHandler.newProxyInstance(
+                                              mbsc, (ObjectName) vhostManagers.toArray()[0], ManagedBroker.class, false);
+                        
+            ObjectName customExchangeObjectName = new ObjectName(
+                    "org.apache.qpid:" +
+                    "type=VirtualHost.Exchange," +
+                    "VirtualHost=" + virHost + "," +
+                    "name=" + exchName + "," + 
+                    "ExchangeType=direct,*");
+
+            Set<ObjectName> exchanges = mbsc.queryNames(customExchangeObjectName, null);
+            
+            if(exchanges.size() == 0)
+            {
+                //The exchange doesnt exist, cant procede.
+                return false;
+            }
+
+            //create the MBean proxy
+            ManagedExchange managedExchange = (ManagedExchange) MBeanServerInvocationHandler.newProxyInstance(
+                        mbsc, (ObjectName) exchanges.toArray()[0], ManagedExchange.class, false);
+              
+            try
+            {
+                //create the new durable queue and bind it.
+                vhostManager.createNewQueue(queueName, null, true);
+                managedExchange.createNewBinding(queueName,queueName);
+            }
+            catch (Exception e)
+            {
+                System.out.println("Could not add queue due to exception :" + e.getMessage());
+                e.printStackTrace();
+                return false;
+            }
+
+            return true;
+
+        }
+        catch (Exception e)
+        {
+            System.out.println("Could not add queue due to error :" + e.getMessage());
+            e.printStackTrace();
+        } 
+        finally
+        {
+            if(jmxc != null)
+            {
+                try
+                {
+                    jmxc.close();
+                }
+                catch (IOException e)
+                {
+                    //ignore
+                }
+            }
+        }
+                
+        return false;
+        
+    }
+
+}

Added: qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/DeleteMessagesFromTopOfTmp.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/DeleteMessagesFromTopOfTmp.java?rev=911326&view=auto
==============================================================================
--- qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/DeleteMessagesFromTopOfTmp.java (added)
+++ qpid/trunk/qpid/java/management/example/src/main/java/org/apache/qpid/example/jmxexample/DeleteMessagesFromTopOfTmp.java Thu Feb 18 11:06:39 2010
@@ -0,0 +1,119 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.example.jmxexample;
+
+import java.util.Set;
+
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+import javax.management.remote.JMXConnector;
+import javax.management.remote.JMXConnectorFactory;
+import javax.management.remote.JMXServiceURL;
+
+/**
+ * Connects to a server and queries all info for tmp_* named queues, determines 
+ * their message count, and if this is above a given threshold deletes the 
+ * specified number of messages from the front of the queue
+ */
+public class DeleteMessagesFromTopOfTmp
+{
+    /**
+     * Params:
+     * 0: host, e.g. myserver.mydomain.com
+     * 1: port, e.g. 8999
+     * 2: Number of messages to delete, e.g. 1000
+     * 3: Threshold MessageCount on queue required before deletion will be undertaken e.g. 5000
+     */
+    public static void main(String[] args) throws Exception
+    {
+        if (args.length < 4)
+        {
+            System.out.println("Usage: ");
+            System.out.println("<host> <port> <numMsgsToDel> <minRequiredQueueMsgCount>");
+            return;
+        }
+
+        String host = args[0];
+        int port = Integer.parseInt(args[1]);
+        int numToDel = Integer.parseInt(args[2]);
+        int numRequired = Integer.parseInt(args[3]);
+
+        deleteFromTop(host, port, numToDel, numRequired);
+    }
+
+    private static void deleteFromTop(String host, int port, 
+                                      int numMsgsToDel, int minRequiredQueueMsgCount) throws Exception
+    {
+        JMXConnector con = getJMXConnection(host, port);
+        MBeanServerConnection mbsc = con.getMBeanServerConnection();
+
+        // Gets all tmp_* queue MBean ObjectNames
+        Set<ObjectName> names = mbsc.queryNames(
+                                new ObjectName("org.apache.qpid:type=VirtualHost.Queue,name=tmp_*,*"), null);
+
+        // Traverse objects and delete specified number of message if the min msg count is breached
+        for (ObjectName queueObjectName : names)
+        {
+            String queueName = queueObjectName.getKeyProperty("name");
+            System.out.println("Checking message count on queue: " + queueName);
+
+            long mc = (Integer) mbsc.getAttribute(queueObjectName, "MessageCount");
+
+            if(mc >= minRequiredQueueMsgCount)
+            {
+                System.out.println("MessageCount (" + mc + ") is above the specified threshold ("
+                                   + minRequiredQueueMsgCount + ")");
+                System.out.println("Deleting first " + numMsgsToDel + " messages on queue: " + queueName);
+
+                int i;
+                for(i=0; i<numMsgsToDel; i++)
+                {
+                    try
+                    {
+                        mbsc.invoke(queueObjectName,"deleteMessageFromTop",null,null);
+                    }
+                    catch(Exception e)
+                    {
+                        System.out.println("Exception whilst deleting message" + i +" from queue: " +e);
+                        break;
+                    }
+                }
+            }
+            else
+            {
+                System.out.println("MessageCount (" + mc + ") is below the specified threshold ("
+                                   + minRequiredQueueMsgCount + ")");
+                System.out.println("Not deleting any messages on queue: " + queueName);
+            }
+        }
+    }
+
+    private static JMXConnector getJMXConnection(String host, int port) throws Exception
+    {
+        //Open JMX connection
+        JMXServiceURL jmxUrl = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://" + host + ":" + port + "/jmxrmi");
+        JMXConnector con = JMXConnectorFactory.connect(jmxUrl);
+        return con;
+    }
+}
+
+
+



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org