You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/04/27 14:08:39 UTC
svn commit: r1331372 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/plugin/StatisticsBroker.java
test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
Author: dejanb
Date: Fri Apr 27 12:08:39 2012
New Revision: 1331372
URL: http://svn.apache.org/viewvc?rev=1331372&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3820 - statistics plugin - subscription statistics
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java?rev=1331372&r1=1331371&r2=1331372&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/plugin/StatisticsBroker.java Fri Apr 27 12:08:39 2012
@@ -22,6 +22,8 @@ import org.apache.activemq.broker.Broker
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DestinationStatistics;
import org.apache.activemq.broker.region.RegionBroker;
@@ -37,6 +39,9 @@ import org.apache.activemq.util.IdGenera
import org.apache.activemq.util.LongSequenceGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.management.ObjectName;
import java.io.File;
import java.net.URI;
import java.util.Set;
@@ -50,9 +55,11 @@ public class StatisticsBroker extends Br
private static Logger LOG = LoggerFactory.getLogger(StatisticsBroker.class);
static final String STATS_DESTINATION_PREFIX = "ActiveMQ.Statistics.Destination";
static final String STATS_BROKER_PREFIX = "ActiveMQ.Statistics.Broker";
+ static final String STATS_SUBSCRIPTION_PREFIX = "ActiveMQ.Statistics.Subscription";
private static final IdGenerator ID_GENERATOR = new IdGenerator();
private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
protected final ProducerId advisoryProducerId = new ProducerId();
+ protected BrokerViewMBean brokerView;
/**
*
@@ -80,6 +87,10 @@ public class StatisticsBroker extends Br
STATS_DESTINATION_PREFIX.length());
boolean brokerStats = physicalName.regionMatches(true, 0, STATS_BROKER_PREFIX, 0, STATS_BROKER_PREFIX
.length());
+ boolean subStats = physicalName.regionMatches(true, 0, STATS_SUBSCRIPTION_PREFIX, 0, STATS_SUBSCRIPTION_PREFIX
+ .length());
+ BrokerService brokerService = getBrokerService();
+ RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
if (destStats) {
String queueryName = physicalName.substring(STATS_DESTINATION_PREFIX.length(), physicalName.length());
ActiveMQDestination queryDest = ActiveMQDestination.createDestination(queueryName,msgDest.getDestinationType());
@@ -108,10 +119,11 @@ public class StatisticsBroker extends Br
sendStats(producerExchange.getConnectionContext(), statsMessage, replyTo);
}
}
+ } else if (subStats) {
+ sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getQueueSubscribers(), replyTo);
+ sendSubStats(producerExchange.getConnectionContext(), getBrokerView().getTopicSubscribers(), replyTo);
} else if (brokerStats) {
ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
- BrokerService brokerService = getBrokerService();
- RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
SystemUsage systemUsage = brokerService.getSystemUsage();
DestinationStatistics stats = regionBroker.getDestinationStatistics();
statsMessage.setString("brokerName", regionBroker.getBrokerName());
@@ -165,6 +177,15 @@ public class StatisticsBroker extends Br
}
}
+ BrokerViewMBean getBrokerView() throws Exception {
+ if (this.brokerView == null) {
+ ObjectName brokerName = getBrokerService().getBrokerObjectName();
+ this.brokerView = (BrokerViewMBean) getBrokerService().getManagementContext().newProxyInstance(brokerName,
+ BrokerViewMBean.class, true);
+ }
+ return this.brokerView;
+ }
+
public void start() throws Exception {
super.start();
LOG.info("Starting StatisticsBroker");
@@ -174,6 +195,34 @@ public class StatisticsBroker extends Br
super.stop();
}
+ protected void sendSubStats(ConnectionContext context, ObjectName[] subscribers, ActiveMQDestination replyTo) throws Exception {
+ for (int i = 0; i < subscribers.length; i++) {
+ ObjectName name = subscribers[i];
+ SubscriptionViewMBean subscriber = (SubscriptionViewMBean)getBrokerService().getManagementContext().newProxyInstance(name, SubscriptionViewMBean.class, true);
+ ActiveMQMapMessage statsMessage = prepareSubscriptionMessage(subscriber);
+ sendStats(context, statsMessage, replyTo);
+ }
+ }
+
+ protected ActiveMQMapMessage prepareSubscriptionMessage(SubscriptionViewMBean subscriber) throws JMSException {
+ ActiveMQMapMessage statsMessage = new ActiveMQMapMessage();
+ statsMessage.setString("destinationName", subscriber.getDestinationName());
+ statsMessage.setString("clientId", subscriber.getClientId());
+ statsMessage.setString("connectionId", subscriber.getConnectionId());
+ statsMessage.setLong("sessionId", subscriber.getSessionId());
+ statsMessage.setString("selector", subscriber.getSelector());
+ statsMessage.setLong("enqueueCounter", subscriber.getEnqueueCounter());
+ statsMessage.setLong("dequeueCounter", subscriber.getDequeueCounter());
+ statsMessage.setLong("dispatchedCounter", subscriber.getDispatchedCounter());
+ statsMessage.setLong("dispatchedQueueSize", subscriber.getDispatchedQueueSize());
+ statsMessage.setInt("prefetchSize", subscriber.getPrefetchSize());
+ statsMessage.setInt("maximumPendingMessageLimit", subscriber.getMaximumPendingMessageLimit());
+ statsMessage.setBoolean("exclusive", subscriber.isExclusive());
+ statsMessage.setBoolean("retroactive", subscriber.isRetroactive());
+ statsMessage.setBoolean("slowConsumer", subscriber.isSlowConsumer());
+ return statsMessage;
+ }
+
protected void sendStats(ConnectionContext context, ActiveMQMapMessage msg, ActiveMQDestination replyTo)
throws Exception {
msg.setPersistent(false);
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java?rev=1331372&r1=1331371&r2=1331372&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/plugin/BrokerStatisticsPluginTest.java Fri Apr 27 12:08:39 2012
@@ -16,14 +16,14 @@
*/
package org.apache.activemq.plugin;
+import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URI;
-import java.util.Enumeration;
+
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MapMessage;
@@ -32,7 +32,7 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
-import junit.framework.TestCase;
+import java.net.URI;
/**
* A BrokerStatisticsPluginTest
@@ -92,6 +92,32 @@ public class BrokerStatisticsPluginTest
}
+
+ public void testSubscriptionStats() throws Exception{
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue replyTo = session.createTemporaryQueue();
+ MessageConsumer consumer = session.createConsumer(replyTo);
+ Queue testQueue = session.createQueue("Test.Queue");
+ MessageConsumer testConsumer = session.createConsumer(testQueue);
+ MessageProducer producer = session.createProducer(null);
+ Queue query = session.createQueue(StatisticsBroker.STATS_SUBSCRIPTION_PREFIX);
+ Message msg = session.createMessage();
+
+ producer.send(testQueue,msg);
+
+ msg.setJMSReplyTo(replyTo);
+ producer.send(query,msg);
+ MapMessage reply = (MapMessage) consumer.receive();
+ assertNotNull(reply);
+ assertTrue(reply.getMapNames().hasMoreElements());
+
+ /*for (Enumeration e = reply.getMapNames();e.hasMoreElements();) {
+ String name = e.nextElement().toString();
+ System.err.println(name+"="+reply.getObject(name));
+ }*/
+
+
+ }
protected void setUp() throws Exception {
broker = createBroker();