You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/13 01:38:41 UTC

svn commit: r1349585 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/plugin/StatisticsBroker.java test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java

Author: tabish
Date: Tue Jun 12 23:38:40 2012
New Revision: 1349585

URL: http://svn.apache.org/viewvc?rev=1349585&view=rev
Log:
New feature and test for: https://issues.apache.org/jira/browse/AMQ-3878

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java?rev=1349585&r1=1349584&r2=1349585&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java Tue Jun 12 23:38:40 2012
@@ -16,6 +16,13 @@
  */
 package org.apache.activemq.plugin;
 
+import java.io.File;
+import java.net.URI;
+import java.util.Set;
+
+import javax.jms.JMSException;
+import javax.management.ObjectName;
+
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
@@ -39,22 +46,17 @@ import org.apache.activemq.util.IdGenera
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import javax.jms.JMSException;
-import javax.management.ObjectName;
-import java.io.File;
-import java.net.URI;
-import java.util.Set;
 /**
  * A StatisticsBroker You can retrieve a Map Message for a Destination - or
  * Broker containing statistics as key-value pairs The message must contain a
  * replyTo Destination - else its ignored
- * 
+ *
  */
 public class StatisticsBroker extends BrokerFilter {
     private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
     static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
     static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
+    static final String STATS_BROKER_RESET_HEADER = "ActiveMQ.Statistics.Broker.Reset";
     static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription";
     private static final IdGenerator ID_GENERATOR = new IdGenerator();
     private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
@@ -62,9 +64,9 @@ public class StatisticsBroker extends Br
     protected BrokerViewMBean brokerView;
 
     /**
-     * 
+     *
      * Constructor
-     * 
+     *
      * @param next
      */
     public StatisticsBroker(Broker next) {
@@ -74,7 +76,7 @@ public class StatisticsBroker extends Br
 
     /**
      * Sets the persistence mode
-     * 
+     *
      * @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange,
      *      org.apache.activemq.command.Message)
      */
@@ -123,6 +125,11 @@ public class StatisticsBroker extends Br
                 sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo);
                 sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo);
             } else if (brokerStats) {
+
+                if (messageSend.getProperties().containsKey(STATS_BROKER_RESET_HEADER)) {
+                    getBrokerView().resetStatistics();
+                }
+
                 ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
                 SystemUsage systemUsage = brokerService.getSystemUsage();
                 DestinationStatistics stats = regionBroker.getDestinationStatistics();

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java?rev=1349585&r1=1349584&r2=1349585&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java Tue Jun 12 23:38:40 2012
@@ -16,13 +16,7 @@
  */
 package org.apache.activemq.plugin;
 
-import junit.framework.TestCase;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.net.URI;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -32,7 +26,15 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
-import java.net.URI;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A BrokerStatisticsPluginTest
@@ -41,10 +43,10 @@ import java.net.URI;
  */
 public class BrokerStatisticsPluginTest extends TestCase{
     private static final Logger LOG = LoggerFactory.getLogger(BrokerStatisticsPluginTest.class);
-    
+
     private Connection connection;
     private BrokerService broker;
-    
+
     public void testBrokerStats() throws Exception{
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Queue replyTo = session.createTemporaryQueue();
@@ -63,10 +65,36 @@ public class BrokerStatisticsPluginTest 
             System.err.println(name+"="+reply.getObject(name));
         }
         */
-        
-        
     }
-    
+
+    public void testBrokerStatsReset() throws Exception{
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue replyTo = session.createTemporaryQueue();
+        MessageConsumer consumer = session.createConsumer(replyTo);
+        Queue testQueue = session.createQueue("Test.Queue");
+        Queue query = session.createQueue(StatisticsBroker.STATS_BROKER_PREFIX);
+        MessageProducer producer = session.createProducer(null);
+
+        producer.send(testQueue, session.createMessage());
+
+        Message msg = session.createMessage();
+        msg.setJMSReplyTo(replyTo);
+        producer.send(query, msg);
+        MapMessage reply = (MapMessage) consumer.receive(10*1000);
+        assertNotNull(reply);
+        assertTrue(reply.getMapNames().hasMoreElements());
+        assertTrue(reply.getLong("enqueueCount") >= 1);
+
+        msg = session.createMessage();
+        msg.setBooleanProperty(StatisticsBroker.STATS_BROKER_RESET_HEADER, true);
+        msg.setJMSReplyTo(replyTo);
+        producer.send(query, msg);
+        reply = (MapMessage) consumer.receive(10*1000);
+        assertNotNull(reply);
+        assertTrue(reply.getMapNames().hasMoreElements());
+        assertEquals(0, reply.getLong("enqueueCount"));
+    }
+
     public void testDestinationStats() throws Exception{
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Queue replyTo = session.createTemporaryQueue();
@@ -75,9 +103,9 @@ public class BrokerStatisticsPluginTest 
         MessageProducer producer = session.createProducer(null);
         Queue query = session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + testQueue.getQueueName());
         Message msg = session.createMessage();
-        
+
         producer.send(testQueue,msg);
-        
+
         msg.setJMSReplyTo(replyTo);
         producer.send(query,msg);
         MapMessage reply = (MapMessage) consumer.receive();
@@ -89,10 +117,9 @@ public class BrokerStatisticsPluginTest 
             System.err.println(name+"="+reply.getObject(name));
         }
         */
-        
-        
     }
 
+    @SuppressWarnings("unused")
     public void testSubscriptionStats() throws Exception{
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Queue replyTo = session.createTemporaryQueue();
@@ -115,17 +142,15 @@ public class BrokerStatisticsPluginTest 
             String name = e.nextElement().toString();
             System.err.println(name+"="+reply.getObject(name));
         }*/
-
-
     }
-    
+
     protected void setUp() throws Exception {
         broker = createBroker();
         ConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectorURIsAsMap().get("tcp"));
         connection = factory.createConnection();
         connection.start();
     }
-    
+
     protected void tearDown() throws Exception{
         if (this.connection != null) {
             this.connection.close();
@@ -134,9 +159,8 @@ public class BrokerStatisticsPluginTest 
             this.broker.stop();
         }
     }
-    
+
     protected BrokerService createBroker() throws Exception {
-        //return createBroker("org/apache/activemq/plugin/statistics-plugin-broker.xml");
         BrokerService answer = new BrokerService();
         BrokerPlugin[] plugins = new BrokerPlugin[1];
         plugins[0] = new StatisticsBrokerPlugin();
@@ -146,7 +170,7 @@ public class BrokerStatisticsPluginTest 
         answer.start();
         return answer;
     }
-    
+
     protected BrokerService createBroker(String uri) throws Exception {
         LOG.info("Loading broker configuration from the classpath with URI: " + uri);
         return BrokerFactory.createBroker(new URI("xbean:" + uri));