You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/01/16 14:56:29 UTC

svn commit: r612459 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq: advisory/AdvisoryBroker.java broker/BrokerFilter.java broker/BrokerService.java broker/region/Region.java broker/region/RegionBroker.java

Author: rajdavies
Date: Wed Jan 16 05:56:24 2008
New Revision: 612459

URL: http://svn.apache.org/viewvc?rev=612459&view=rev
Log:
set correct consumer count on consumer advisories

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=612459&r1=612458&r2=612459&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Wed Jan 16 05:56:24 2008
@@ -17,6 +17,7 @@
 package org.apache.activemq.advisory;
 
 import java.util.Iterator;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.activemq.broker.Broker;
@@ -83,7 +84,7 @@
         if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
             consumers.put(info.getConsumerId(), info);
-            fireConsumerAdvisory(context, topic, info);
+            fireConsumerAdvisory(context,info.getDestination(), topic, info);
         } else {
 
             // We need to replay all the previously collected state objects
@@ -114,7 +115,7 @@
                 for (Iterator<ProducerInfo> iter = producers.values().iterator(); iter.hasNext();) {
                     ProducerInfo value = iter.next();
                     ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(value.getDestination());
-                    fireProducerAdvisory(context, topic, value, info.getConsumerId());
+                    fireProducerAdvisory(context, value.getDestination(),topic, value, info.getConsumerId());
                 }
             }
 
@@ -123,7 +124,7 @@
                 for (Iterator<ConsumerInfo> iter = consumers.values().iterator(); iter.hasNext();) {
                     ConsumerInfo value = iter.next();
                     ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(value.getDestination());
-                    fireConsumerAdvisory(context, topic, value, info.getConsumerId());
+                    fireConsumerAdvisory(context,value.getDestination(), topic, value, info.getConsumerId());
                 }
             }
         }
@@ -219,7 +220,7 @@
         if (!AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
             ActiveMQTopic topic = AdvisorySupport.getConsumerAdvisoryTopic(info.getDestination());
             consumers.remove(info.getConsumerId());
-            fireConsumerAdvisory(context, topic, info.createRemoveCommand());
+            fireConsumerAdvisory(context,info.getDestination(), topic, info.createRemoveCommand());
         }
     }
 
@@ -230,7 +231,7 @@
         if (info.getDestination() != null && !AdvisorySupport.isAdvisoryTopic(info.getDestination())) {
             ActiveMQTopic topic = AdvisorySupport.getProducerAdvisoryTopic(info.getDestination());
             producers.remove(info.getProducerId());
-            fireProducerAdvisory(context, topic, info.createRemoveCommand());
+            fireProducerAdvisory(context, info.getDestination(),topic, info.createRemoveCommand());
         }
     }
 
@@ -253,21 +254,28 @@
         fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
     }
 
-    protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
-        fireConsumerAdvisory(context, topic, command, null);
+    protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command) throws Exception {
+        fireConsumerAdvisory(context, consumerDestination,topic, command, null);
     }
 
-    protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
+    protected void fireConsumerAdvisory(ConnectionContext context, ActiveMQDestination consumerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
         ActiveMQMessage advisoryMessage = new ActiveMQMessage();
-        advisoryMessage.setIntProperty("consumerCount", consumers.size());
+        int count = 0;
+        Set<Destination>set = getDestinations(consumerDestination);
+        if (set != null) {
+            for (Destination dest:set) {
+                count += dest.getDestinationStatistics().getConsumers().getCount();
+            }
+        }
+        advisoryMessage.setIntProperty("consumerCount", count);
         fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);
     }
 
-    protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command) throws Exception {
-        fireProducerAdvisory(context, topic, command, null);
+    protected void fireProducerAdvisory(ConnectionContext context,ActiveMQDestination producerDestination, ActiveMQTopic topic, Command command) throws Exception {
+        fireProducerAdvisory(context,producerDestination, topic, command, null);
     }
 
-    protected void fireProducerAdvisory(ConnectionContext context, ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
+    protected void fireProducerAdvisory(ConnectionContext context, ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, ConsumerId targetConsumerId) throws Exception {
         ActiveMQMessage advisoryMessage = new ActiveMQMessage();
         advisoryMessage.setIntProperty("producerCount", producers.size());
         fireAdvisory(context, topic, command, targetConsumerId, advisoryMessage);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=612459&r1=612458&r2=612459&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java Wed Jan 16 05:56:24 2008
@@ -65,7 +65,7 @@
         return next.getDestinationMap();
     }
 
-    public Set getDestinations(ActiveMQDestination destination) {
+    public Set <Destination>getDestinations(ActiveMQDestination destination) {
         return next.getDestinations(destination);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=612459&r1=612458&r2=612459&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Jan 16 05:56:24 2008
@@ -725,16 +725,20 @@
         if (persistenceAdapter == null) {
             persistenceAdapter = createPersistenceAdapter();
             configureService(persistenceAdapter);
+            this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
         }
         return persistenceAdapter;
     }
 
     /**
      * Sets the persistence adaptor implementation to use for this broker
+     * @throws IOException 
      */
-    public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
+    public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
         this.persistenceAdapter = persistenceAdapter;
         configureService(this.persistenceAdapter);
+        this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
+        
     }
 
     public TaskRunnerFactory getTaskRunnerFactory() {
@@ -1311,6 +1315,24 @@
                     throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e.getMessage(), e);
                 }
             }
+        }
+    }
+    
+    protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
+        MBeanServer mbeanServer = getManagementContext().getMBeanServer();
+        if (mbeanServer != null) {
+
+          
+        }
+        return adaptor;
+    }
+
+    protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
+        if (isUseJmx()) {
+            MBeanServer mbeanServer = getManagementContext().getMBeanServer();
+            if (mbeanServer != null) {
+                
+            }       
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java?rev=612459&r1=612458&r2=612459&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java Wed Jan 16 05:56:24 2008
@@ -131,6 +131,6 @@
      * 
      * @return a set of matching destination objects.
      */
-    Set getDestinations(ActiveMQDestination destination);
+    Set <Destination>getDestinations(ActiveMQDestination destination);
     
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=612459&r1=612458&r2=612459&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Wed Jan 16 05:56:24 2008
@@ -120,7 +120,7 @@
         return answer;
     }
 
-    public Set getDestinations(ActiveMQDestination destination) {
+    public Set <Destination> getDestinations(ActiveMQDestination destination) {
         switch (destination.getDestinationType()) {
         case ActiveMQDestination.QUEUE_TYPE:
             return queueRegion.getDestinations(destination);