You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/04/27 14:08:39 UTC

svn commit: r1331372 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/plugin/StatisticsBroker.java test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java

Author: dejanb
Date: Fri Apr 27 12:08:39 2012
New Revision: 1331372

URL: http://svn.apache.org/viewvc?rev=1331372&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3820 - statistics plugin - subscription statistics

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java?rev=1331372&r1=1331371&r2=1331372&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java Fri Apr 27 12:08:39 2012
@@ -22,6 +22,8 @@ import org.apache.activemq.broker.Broker
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.DestinationStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
@@ -37,6 +39,9 @@ import org.apache.activemq.util.IdGenera
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.management.ObjectName;
 import java.io.File;
 import java.net.URI;
 import java.util.Set;
@@ -50,9 +55,11 @@ public class StatisticsBroker extends Br
     private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
     static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
     static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
+    static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription";
     private static final IdGenerator ID_GENERATOR = new IdGenerator();
     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
     protected final ProducerId advisoryProducerId = new ProducerId();
+    protected BrokerViewMBean brokerView;
 
     /**
      * 
@@ -80,6 +87,10 @@ public class StatisticsBroker extends Br
                     STATS_DESTINATION_PREFIX.length());
             boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX
                     .length());
+            boolean subStats = physicalName.regionMatches(true, 0, STATS_SUBSCRIPTION_PREFIX, 0, STATS_SUBSCRIPTION_PREFIX
+                    .length());
+            BrokerService brokerService = getBrokerService();
+            RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
             if (destStats) {
                 String queueryName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length());
                 ActiveMQDestination queryDest = ActiveMQDestination.createDestination(queueryName,msgDest.getDestinationType());
@@ -108,10 +119,11 @@ public class StatisticsBroker extends Br
                         sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
                     }
                 }
+            } else if (subStats) {
+                sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo);
+                sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo);
             } else if (brokerStats) {
                 ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
-                BrokerService brokerService = getBrokerService();
-                RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
                 SystemUsage systemUsage = brokerService.getSystemUsage();
                 DestinationStatistics stats = regionBroker.getDestinationStatistics();
                 statsMessage.setString("brokerName", regionBroker.getBrokerName());
@@ -165,6 +177,15 @@ public class StatisticsBroker extends Br
         }
     }
 
+    BrokerViewMBean getBrokerView() throws Exception {
+        if (this.brokerView == null) {
+            ObjectName brokerName = getBrokerService().getBrokerObjectName();
+            this.brokerView = (BrokerViewMBean) getBrokerService().getManagementContext().newProxyInstance(brokerName,
+                    BrokerViewMBean.class, true);
+        }
+        return this.brokerView;
+    }
+
     public void start() throws Exception {
         super.start();
         LOG.info("Starting StatisticsBroker");
@@ -174,6 +195,34 @@ public class StatisticsBroker extends Br
         super.stop();
     }
 
+    protected void sendSubStats(ConnectionContext context, ObjectName[] subscribers, ActiveMQDestination replyTo) throws Exception {
+        for (int i = 0; i < subscribers.length; i++) {
+            ObjectName name = subscribers[i];
+            SubscriptionViewMBean subscriber = (SubscriptionViewMBean)getBrokerService().getManagementContext().newProxyInstance(name, SubscriptionViewMBean.class, true);
+            ActiveMQMapMessage statsMessage = prepareSubscriptionMessage(subscriber);
+            sendStats(context, statsMessage, replyTo);
+        }
+    }
+
+    protected ActiveMQMapMessage prepareSubscriptionMessage(SubscriptionViewMBean subscriber) throws JMSException {
+        ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
+        statsMessage.setString("destinationName", subscriber.getDestinationName());
+        statsMessage.setString("clientId", subscriber.getClientId());
+        statsMessage.setString("connectionId", subscriber.getConnectionId());
+        statsMessage.setLong("sessionId", subscriber.getSessionId());
+        statsMessage.setString("selector", subscriber.getSelector());
+        statsMessage.setLong("enqueueCounter", subscriber.getEnqueueCounter());
+        statsMessage.setLong("dequeueCounter", subscriber.getDequeueCounter());
+        statsMessage.setLong("dispatchedCounter", subscriber.getDispatchedCounter());
+        statsMessage.setLong("dispatchedQueueSize", subscriber.getDispatchedQueueSize());
+        statsMessage.setInt("prefetchSize", subscriber.getPrefetchSize());
+        statsMessage.setInt("maximumPendingMessageLimit", subscriber.getMaximumPendingMessageLimit());
+        statsMessage.setBoolean("exclusive", subscriber.isExclusive());
+        statsMessage.setBoolean("retroactive", subscriber.isRetroactive());
+        statsMessage.setBoolean("slowConsumer", subscriber.isSlowConsumer());
+        return statsMessage;
+    }
+
     protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo)
             throws Exception {
         msg.setPersistent(false);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java?rev=1331372&r1=1331371&r2=1331372&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java Fri Apr 27 12:08:39 2012
@@ -16,14 +16,14 @@
  */
 package org.apache.activemq.plugin;
 
+import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.net.URI;
-import java.util.Enumeration;
+
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.MapMessage;
@@ -32,7 +32,7 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
-import junit.framework.TestCase;
+import java.net.URI;
 
 /**
  * A BrokerStatisticsPluginTest
@@ -92,6 +92,32 @@ public class BrokerStatisticsPluginTest 
         
         
     }
+
+    public void testSubscriptionStats() throws Exception{
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue replyTo = session.createTemporaryQueue();
+        MessageConsumer consumer = session.createConsumer(replyTo);
+        Queue testQueue = session.createQueue("Test.Queue");
+        MessageConsumer testConsumer = session.createConsumer(testQueue);
+        MessageProducer producer = session.createProducer(null);
+        Queue query = session.createQueue(StatisticsBroker.STATS_SUBSCRIPTION_PREFIX);
+        Message msg = session.createMessage();
+
+        producer.send(testQueue,msg);
+
+        msg.setJMSReplyTo(replyTo);
+        producer.send(query,msg);
+        MapMessage reply = (MapMessage) consumer.receive();
+        assertNotNull(reply);
+        assertTrue(reply.getMapNames().hasMoreElements());
+
+        /*for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
+            String name = e.nextElement().toString();
+            System.err.println(name+"="+reply.getObject(name));
+        }*/
+
+
+    }
     
     protected void setUp() throws Exception {
         broker = createBroker();