You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/13 01:38:41 UTC
svn commit: r1349585 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/plugin/StatisticsBroker.java
test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
Author: tabish
Date: Tue Jun 12 23:38:40 2012
New Revision: 1349585
URL: http://svn.apache.org/viewvc?rev=1349585&view=rev
Log:
New feature and test for: https://issues.apache.org/jira/browse/AMQ-3878
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java?rev=1349585&r1=1349584&r2=1349585&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java Tue Jun 12 23:38:40 2012
@@ -16,6 +16,13 @@
*/
package org.apache.activemq.plugin;
+import java.io.File;
+import java.net.URI;
+import java.util.Set;
+
+import javax.jms.JMSException;
+import javax.management.ObjectName;
+
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
@@ -39,22 +46,17 @@ import org.apache.activemq.util.IdGenera
import org.apache.activemq.util.LongSequenceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import javax.jms.JMSException;
-import javax.management.ObjectName;
-import java.io.File;
-import java.net.URI;
-import java.util.Set;
/**
* A StatisticsBroker You can retrieve a Map Message for a Destination - or
* Broker containing statistics as key-value pairs The message must contain a
* replyTo Destination - else its ignored
- *
+ *
*/
public class StatisticsBroker extends BrokerFilter {
private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
+ static final String STATS_BROKER_RESET_HEADER = "ActiveMQ.Statistics.Broker.Reset";
static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription";
private static final IdGenerator ID_GENERATOR = new IdGenerator();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
@@ -62,9 +64,9 @@ public class StatisticsBroker extends Br
protected BrokerViewMBean brokerView;
/**
- *
+ *
* Constructor
- *
+ *
* @param next
*/
public StatisticsBroker(Broker next) {
@@ -74,7 +76,7 @@ public class StatisticsBroker extends Br
/**
* Sets the persistence mode
- *
+ *
* @see org.apache.activemq.broker.BrokerFilter#send(org.apache.activemq.broker.ProducerBrokerExchange,
* org.apache.activemq.command.Message)
*/
@@ -123,6 +125,11 @@ public class StatisticsBroker extends Br
sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo);
sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo);
} else if (brokerStats) {
+
+ if (messageSend.getProperties().containsKey(STATS_BROKER_RESET_HEADER)) {
+ getBrokerView().resetStatistics();
+ }
+
ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
SystemUsage systemUsage = brokerService.getSystemUsage();
DestinationStatistics stats = regionBroker.getDestinationStatistics();
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java?rev=1349585&r1=1349584&r2=1349585&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java Tue Jun 12 23:38:40 2012
@@ -16,13 +16,7 @@
*/
package org.apache.activemq.plugin;
-import junit.framework.TestCase;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.broker.BrokerFactory;
-import org.apache.activemq.broker.BrokerPlugin;
-import org.apache.activemq.broker.BrokerService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.net.URI;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -32,7 +26,15 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
-import java.net.URI;
+
+import junit.framework.TestCase;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A BrokerStatisticsPluginTest
@@ -41,10 +43,10 @@ import java.net.URI;
*/
public class BrokerStatisticsPluginTest extends TestCase{
private static final Logger LOG = LoggerFactory.getLogger(BrokerStatisticsPluginTest.class);
-
+
private Connection connection;
private BrokerService broker;
-
+
public void testBrokerStats() throws Exception{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue replyTo = session.createTemporaryQueue();
@@ -63,10 +65,36 @@ public class BrokerStatisticsPluginTest
System.err.println(name+"="+reply.getObject(name));
}
*/
-
-
}
-
+
+ public void testBrokerStatsReset() throws Exception{
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue replyTo = session.createTemporaryQueue();
+ MessageConsumer consumer = session.createConsumer(replyTo);
+ Queue testQueue = session.createQueue("Test.Queue");
+ Queue query = session.createQueue(StatisticsBroker.STATS_BROKER_PREFIX);
+ MessageProducer producer = session.createProducer(null);
+
+ producer.send(testQueue, session.createMessage());
+
+ Message msg = session.createMessage();
+ msg.setJMSReplyTo(replyTo);
+ producer.send(query, msg);
+ MapMessage reply = (MapMessage) consumer.receive(10*1000);
+ assertNotNull(reply);
+ assertTrue(reply.getMapNames().hasMoreElements());
+ assertTrue(reply.getLong("enqueueCount") >= 1);
+
+ msg = session.createMessage();
+ msg.setBooleanProperty(StatisticsBroker.STATS_BROKER_RESET_HEADER, true);
+ msg.setJMSReplyTo(replyTo);
+ producer.send(query, msg);
+ reply = (MapMessage) consumer.receive(10*1000);
+ assertNotNull(reply);
+ assertTrue(reply.getMapNames().hasMoreElements());
+ assertEquals(0, reply.getLong("enqueueCount"));
+ }
+
public void testDestinationStats() throws Exception{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue replyTo = session.createTemporaryQueue();
@@ -75,9 +103,9 @@ public class BrokerStatisticsPluginTest
MessageProducer producer = session.createProducer(null);
Queue query = session.createQueue(StatisticsBroker.STATS_DESTINATION_PREFIX + testQueue.getQueueName());
Message msg = session.createMessage();
-
+
producer.send(testQueue,msg);
-
+
msg.setJMSReplyTo(replyTo);
producer.send(query,msg);
MapMessage reply = (MapMessage) consumer.receive();
@@ -89,10 +117,9 @@ public class BrokerStatisticsPluginTest
System.err.println(name+"="+reply.getObject(name));
}
*/
-
-
}
+ @SuppressWarnings("unused")
public void testSubscriptionStats() throws Exception{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue replyTo = session.createTemporaryQueue();
@@ -115,17 +142,15 @@ public class BrokerStatisticsPluginTest
String name = e.nextElement().toString();
System.err.println(name+"="+reply.getObject(name));
}*/
-
-
}
-
+
protected void setUp() throws Exception {
broker = createBroker();
ConnectionFactory factory = new ActiveMQConnectionFactory(broker.getTransportConnectorURIsAsMap().get("tcp"));
connection = factory.createConnection();
connection.start();
}
-
+
protected void tearDown() throws Exception{
if (this.connection != null) {
this.connection.close();
@@ -134,9 +159,8 @@ public class BrokerStatisticsPluginTest
this.broker.stop();
}
}
-
+
protected BrokerService createBroker() throws Exception {
- //return createBroker("org/apache/activemq/plugin/statistics-plugin-broker.xml");
BrokerService answer = new BrokerService();
BrokerPlugin[] plugins = new BrokerPlugin[1];
plugins[0] = new StatisticsBrokerPlugin();
@@ -146,7 +170,7 @@ public class BrokerStatisticsPluginTest
answer.start();
return answer;
}
-
+
protected BrokerService createBroker(String uri) throws Exception {
LOG.info("Loading broker configuration from the classpath with URI: " + uri);
return BrokerFactory.createBroker(new URI("xbean:" + uri));