You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by pv...@apache.org on 2006/09/27 09:58:28 UTC
svn commit: r450371 - in /incubator/activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/region/Topic.java
test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Author: pvillacorta
Date: Wed Sep 27 00:58:28 2006
New Revision: 450371
URL: http://svn.apache.org/viewvc?view=rev&rev=450371
Log:
AMQ-928 - decrement consumer count only when a subscription was removed (add null check)
- add test
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?view=diff&rev=450371&r1=450370&r2=450371
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Wed Sep 27 00:58:28 2006
@@ -93,11 +93,10 @@
public void addSubscription(ConnectionContext context, final Subscription sub) throws Exception {
sub.add(context, this);
+ destinationStatistics.getConsumers().increment();
if ( !sub.getConsumerInfo().isDurable() ) {
-
- destinationStatistics.getConsumers().increment();
-
+
// Do a retroactive recovery if needed.
if (sub.getConsumerInfo().isRetroactive()) {
@@ -139,8 +138,10 @@
public void deleteSubscription(ConnectionContext context, SubscriptionKey key) throws IOException {
if (store != null) {
store.deleteSubscription(key.clientId, key.subscriptionName);
- durableSubcribers.remove(key);
- destinationStatistics.getConsumers().decrement();
+ Object removed = durableSubcribers.remove(key);
+ if(removed != null) {
+ destinationStatistics.getConsumers().decrement();
+ }
}
}
Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?view=diff&rev=450371&r1=450370&r2=450371
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java Wed Sep 27 00:58:28 2006
@@ -19,6 +19,7 @@
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Topic;
import javax.jms.Connection;
import javax.jms.Message;
@@ -73,6 +74,7 @@
// messages on a queue
assertQueueBrowseWorks();
assertCreateAndDestroyDurableSubscriptions();
+ assertConsumerCounts();
}
public void testMoveMessagesBySelector() throws Exception {
@@ -203,6 +205,57 @@
// now lets try destroy it
broker.destroyDurableSubscriber(clientID, "subscriber1");
assertEquals("Durable subscriber count", 1, broker.getDurableTopicSubscribers().length);
+ }
+
+ protected void assertConsumerCounts() throws Exception {
+ ObjectName brokerName = assertRegisteredObjectName(domain + ":Type=Broker,BrokerName=localhost");
+ BrokerViewMBean broker = (BrokerViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, brokerName, BrokerViewMBean.class, true);
+
+ //create 2 topics
+ broker.addTopic(getDestinationString() + "1");
+ broker.addTopic(getDestinationString() + "2");
+
+ ObjectName topicObjName1 = assertRegisteredObjectName(domain + ":Type=Topic,BrokerName=localhost,Destination="+getDestinationString() + "1");
+ ObjectName topicObjName2 = assertRegisteredObjectName(domain + ":Type=Topic,BrokerName=localhost,Destination="+getDestinationString() + "2");
+ TopicViewMBean topic1 = (TopicViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName1, TopicViewMBean.class, true);
+ TopicViewMBean topic2 = (TopicViewMBean) MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName2, TopicViewMBean.class, true);
+
+ assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount());
+ assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
+
+ String topicName = getDestinationString();
+ String selector = null;
+
+ //create 1 subscriber for each topic
+ broker.createDurableSubscriber(clientID, "topic1.subscriber1", topicName + "1", selector);
+ broker.createDurableSubscriber(clientID, "topic2.subscriber1", topicName + "2", selector);
+
+ assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
+ assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());
+
+ //create 1 more subscriber for topic1
+ broker.createDurableSubscriber(clientID, "topic1.subscriber2", topicName + "1", selector);
+
+ assertEquals("topic1 Durable subscriber count", 2, topic1.getConsumerCount());
+ assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());
+
+ //destroy topic1 subscriber
+ broker.destroyDurableSubscriber(clientID, "topic1.subscriber1");
+
+ assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
+ assertEquals("topic2 Durable subscriber count", 1, topic2.getConsumerCount());
+
+ // destroy topic2 subscriber
+ broker.destroyDurableSubscriber(clientID, "topic2.subscriber1");
+
+ assertEquals("topic1 Durable subscriber count", 1, topic1.getConsumerCount());
+ assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
+
+ //destroy remaining topic1 subscriber
+ broker.destroyDurableSubscriber(clientID, "topic1.subscriber2");
+
+ assertEquals("topic1 Durable subscriber count", 0, topic1.getConsumerCount());
+ assertEquals("topic2 Durable subscriber count", 0, topic2.getConsumerCount());
}
protected ObjectName assertRegisteredObjectName(String name) throws MalformedObjectNameException, NullPointerException {