You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/01/16 14:56:29 UTC
svn commit: r612459 - in
/activemq/trunk/activemq-core/src/main/java/org/apache/activemq:
advisory/AdvisoryBroker.java broker/BrokerFilter.java
broker/BrokerService.java broker/region/Region.java
broker/region/RegionBroker.java
Author: rajdavies
Date: Wed Jan 16 05:56:24 2008
New Revision: 612459
URL: http://svn.apache.org/viewvc?rev=612459&view=rev
Log:
set correct consumer count on consumer advisories
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=612459&r1=612458&r2=612459&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Wed Jan 16 05:56:24 2008
@@ -17,6 +17,7 @@
package org.apache.activemq.advisory;
import java.util.Iterator;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.activemq.broker.Broker;
@@ -83,7 +84,7 @@
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
consumers.put(info.getConsumerId(), info);
- fireConsumerAdvisory(context, topic, info);
+ fireConsumerAdvisory(context,info.getDestination(), topic, info);
} else {
// We need to replay all the previously collected state objects
@@ -114,7 +115,7 @@
for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext();) {
ProducerInfo value = iter.next();
ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination());
- fireProducerAdvisory(context, topic, value, info.getConsumerId());
+ fireProducerAdvisory(context, value.getDestination(),topic, value, info.getConsumerId());
}
}
@@ -123,7 +124,7 @@
for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext();) {
ConsumerInfo value = iter.next();
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
- fireConsumerAdvisory(context, topic, value, info.getConsumerId());
+ fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId());
}
}
}
@@ -219,7 +220,7 @@
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
consumers.remove(info.getConsumerId());
- fireConsumerAdvisory(context, topic, info.createRemoveCommand());
+ fireConsumerAdvisory(context,info.getDestination(), topic, info.createRemoveCommand());
}
}
@@ -230,7 +231,7 @@
if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
producers.remove(info.getProducerId());
- fireProducerAdvisory(context, topic, info.createRemoveCommand());
+ fireProducerAdvisory(context, info.getDestination(),topic, info.createRemoveCommand());
}
}
@@ -253,21 +254,28 @@
fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
}
- protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
- fireConsumerAdvisory(context, topic, command, null);
+ protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command) throws Exception {
+ fireConsumerAdvisory(context, consumerDestination,topic, command, null);
}
- protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
+ protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
- advisoryMessage.setIntProperty("consumerCount", consumers.size());
+ int count = 0;
+ Set<Destination>set = getDestinations(consumerDestination);
+ if (set != null) {
+ for (Destination dest:set) {
+ count += dest.getDestinationStatistics().getConsumers().getCount();
+ }
+ }
+ advisoryMessage.setIntProperty("consumerCount", count);
fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
}
- protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
- fireProducerAdvisory(context, topic, command, null);
+ protected void fireProducerAdvisory(ConnectionContext context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception {
+ fireProducerAdvisory(context,producerDestination, topic, command, null);
}
- protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
+ protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
ActiveMQMessage advisoryMessage = new ActiveMQMessage();
advisoryMessage.setIntProperty("producerCount", producers.size());
fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=612459&r1=612458&r2=612459&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Wed Jan 16 05:56:24 2008
@@ -65,7 +65,7 @@
return next.getDestinationMap();
}
- public Set getDestinations(ActiveMQDestination destination) {
+ public Set <Destination>getDestinations(ActiveMQDestination destination) {
return next.getDestinations(destination);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=612459&r1=612458&r2=612459&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Jan 16 05:56:24 2008
@@ -725,16 +725,20 @@
if (persistenceAdapter == null) {
persistenceAdapter = createPersistenceAdapter();
configureService(persistenceAdapter);
+ this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
}
return persistenceAdapter;
}
/**
* Sets the persistence adaptor implementation to use for this broker
+ * @throws IOException
*/
- public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
+ public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
this.persistenceAdapter = persistenceAdapter;
configureService(this.persistenceAdapter);
+ this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
+
}
public TaskRunnerFactory getTaskRunnerFactory() {
@@ -1311,6 +1315,24 @@
throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
}
}
+ }
+ }
+
+ protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
+ MBeanServer mbeanServer = getManagementContext().getMBeanServer();
+ if (mbeanServer != null) {
+
+
+ }
+ return adaptor;
+ }
+
+ protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
+ if (isUseJmx()) {
+ MBeanServer mbeanServer = getManagementContext().getMBeanServer();
+ if (mbeanServer != null) {
+
+ }
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java?rev=612459&r1=612458&r2=612459&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java Wed Jan 16 05:56:24 2008
@@ -131,6 +131,6 @@
*
* @return a set of matching destination objects.
*/
- Set getDestinations(ActiveMQDestination destination);
+ Set <Destination>getDestinations(ActiveMQDestination destination);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=612459&r1=612458&r2=612459&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Wed Jan 16 05:56:24 2008
@@ -120,7 +120,7 @@
return answer;
}
- public Set getDestinations(ActiveMQDestination destination) {
+ public Set <Destination> getDestinations(ActiveMQDestination destination) {
switch (destination.getDestinationType()) {
case ActiveMQDestination.QUEUE_TYPE:
return queueRegion.getDestinations(destination);