You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by pv...@apache.org on 2006/09/27 09:58:28 UTC

svn commit: r450371 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/region/Topic.java test/java/org/apache/activemq/broker/jmx/MBeanTest.java

Author: pvillacorta
Date: Wed Sep 27 00:58:28 2006
New Revision: 450371

URL: http://svn.apache.org/viewvc?view=rev&rev=450371
Log:
AMQ-928 - decrement consumer count only when a subscription was removed (add null check)
        - add test

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=450371&r1=450370&r2=450371
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Wed Sep 27 00:58:28 2006
@@ -93,11 +93,10 @@
     public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
         
         sub.add(context, this);
+        destinationStatistics.getConsumers().increment();
 
         if ( !sub.getConsumerInfo().isDurable() ) {
-            
-            destinationStatistics.getConsumers().increment();
-            
+
             // Do a retroactive recovery if needed.
             if (sub.getConsumerInfo().isRetroactive()) {
                 
@@ -139,8 +138,10 @@
     public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
         if (store != null) {
             store.deleteSubscription(key.clientId, key.subscriptionName);
-            durableSubcribers.remove(key);
-            destinationStatistics.getConsumers().decrement();
+            Object removed = durableSubcribers.remove(key);
+            if(removed != null) {
+                destinationStatistics.getConsumers().decrement();
+            }
         }
     }
     

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?view=diff&rev=450371&r1=450370&r2=450371
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Wed Sep 27 00:58:28 2006
@@ -19,6 +19,7 @@
 
 import org.apache.activemq.EmbeddedBrokerTestSupport;
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Topic;
 
 import javax.jms.Connection;
 import javax.jms.Message;
@@ -73,6 +74,7 @@
         // messages on a queue
         assertQueueBrowseWorks();
         assertCreateAndDestroyDurableSubscriptions();
+        assertConsumerCounts();
     }
     
     public void testMoveMessagesBySelector() throws Exception {
@@ -203,6 +205,57 @@
         // now lets try destroy it
         broker.destroyDurableSubscriber(clientID, "subscriber1");
         assertEquals("Durable subscriber count", 1, broker.getDurableTopicSubscribers().length);
+    }
+
+    protected void assertConsumerCounts() throws Exception {
+        ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
+        BrokerViewMBean broker = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
+
+        //create 2 topics
+        broker.addTopic(getDestinationString() + "1");
+        broker.addTopic(getDestinationString() + "2");
+
+        ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":Type=Topic,BrokerName=localhost,Destination="+getDestinationString() + "1");
+        ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":Type=Topic,BrokerName=localhost,Destination="+getDestinationString() + "2");
+        TopicViewMBean topic1 = (TopicViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true);
+        TopicViewMBean topic2 = (TopicViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true);
+
+        assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount());
+        assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
+
+        String topicName = getDestinationString();
+        String selector = null;
+
+        //create 1 subscriber for each topic
+        broker.createDurableSubscriber(clientID, "topic1.subscriber1", topicName + "1", selector);
+        broker.createDurableSubscriber(clientID, "topic2.subscriber1", topicName + "2", selector);
+
+        assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
+        assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());
+
+        //create 1 more subscriber for topic1
+        broker.createDurableSubscriber(clientID, "topic1.subscriber2", topicName + "1", selector);
+
+        assertEquals("topic1 Durable subscriber count", 2, topic1.getConsumerCount());
+        assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());
+
+        //destroy topic1 subscriber
+        broker.destroyDurableSubscriber(clientID, "topic1.subscriber1");
+
+        assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
+        assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());
+
+        // destroy topic2 subscriber
+        broker.destroyDurableSubscriber(clientID, "topic2.subscriber1");
+
+        assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
+        assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
+
+        //destroy remaining topic1 subscriber
+        broker.destroyDurableSubscriber(clientID, "topic1.subscriber2");
+
+        assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount());
+        assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
     }
 
     protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException {