You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2015/08/26 12:28:49 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5831 -
revisit topic subscriptions
Repository: activemq
Updated Branches:
refs/heads/master acb8602ad -> ee54f0930
https://issues.apache.org/jira/browse/AMQ-5831 - revisit topic subscriptions
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ee54f093
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ee54f093
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ee54f093
Branch: refs/heads/master
Commit: ee54f09303f52d2753ce9ac8e64008e3e60c2eab
Parents: acb8602
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Wed Aug 26 12:28:19 2015 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Wed Aug 26 12:28:39 2015 +0200
----------------------------------------------------------------------
.../broker/region/DurableTopicSubscription.java | 1 +
.../broker/region/TopicSubscription.java | 65 +++++-------
.../apache/activemq/broker/jmx/MBeanTest.java | 100 +++++++++++++++++--
.../DurableSubscriptionOffline2Test.java | 4 +-
4 files changed, 119 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/ee54f093/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 0107c58..cf60fdf 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -322,6 +322,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
regionDestination.acknowledge(context, this, ack, node);
redeliveredMessages.remove(node.getMessageId());
node.decrementReferenceCount();
+ ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq/blob/ee54f093/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index b20d080..d3e683d 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -279,37 +279,16 @@ public class TopicSubscription extends AbstractSubscription {
if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new Synchronization() {
-
@Override
public void afterCommit() throws Exception {
- synchronized (TopicSubscription.this) {
- if (singleDestination && destination != null) {
- destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
- }
- }
- getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
- updateInflightMessageSizeOnAck(ack);
+ updateStatsOnAck(ack);
dispatchMatched();
}
});
} else {
- if (singleDestination && destination != null) {
- destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
- destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
- if (info.isNetworkSubscription()) {
- destination.getDestinationStatistics().getForwards().add(ack.getMessageCount());
- }
- }
- getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
- updateInflightMessageSizeOnAck(ack);
- }
- while (true) {
- int currentExtension = prefetchExtension.get();
- int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
- if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
- break;
- }
+ updateStatsOnAck(ack);
}
+ updatePrefetch(ack);
dispatchMatched();
return;
} else if (ack.isDeliveredAck()) {
@@ -318,19 +297,8 @@ public class TopicSubscription extends AbstractSubscription {
dispatchMatched();
return;
} else if (ack.isExpiredAck()) {
- if (singleDestination && destination != null) {
- destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
- destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
- destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
- }
- getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
- while (true) {
- int currentExtension = prefetchExtension.get();
- int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
- if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
- break;
- }
- }
+ updateStatsOnAck(ack);
+ updatePrefetch(ack);
dispatchMatched();
return;
} else if (ack.isRedeliveredAck()) {
@@ -393,10 +361,10 @@ public class TopicSubscription extends AbstractSubscription {
}
/**
- * Update the inflight statistics on message ack.
+ * Update the statistics on message ack.
* @param ack
*/
- private void updateInflightMessageSizeOnAck(final MessageAck ack) {
+ private void updateStatsOnAck(final MessageAck ack) {
synchronized(dispatchLock) {
boolean inAckRange = false;
List<MessageReference> removeList = new ArrayList<MessageReference>();
@@ -417,6 +385,25 @@ public class TopicSubscription extends AbstractSubscription {
for (final MessageReference node : removeList) {
dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
+ getSubscriptionStatistics().getDequeues().increment();
+ ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
+ ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
+ if (info.isNetworkSubscription()) {
+ ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
+ }
+ if (ack.isExpiredAck()) {
+ destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
+ }
+ }
+ }
+ }
+
+ private void updatePrefetch(MessageAck ack) {
+ while (true) {
+ int currentExtension = prefetchExtension.get();
+ int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
+ if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
+ break;
}
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ee54f093/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
index acf706d..b9b8e2f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
@@ -20,21 +20,13 @@ import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URL;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
+import javax.jms.*;
import javax.management.MBeanServer;
import javax.management.MBeanServerInvocationHandler;
import javax.management.MalformedObjectNameException;
@@ -59,6 +51,7 @@ import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.memory.list.MessageList;
import org.apache.activemq.util.JMXSupport;
import org.apache.activemq.util.URISupport;
import org.apache.activemq.util.Wait;
@@ -1621,4 +1614,91 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
assertNotNull("Message: " + i, consumer.receive(5000));
}
}
+
+ public void testTopicView() throws Exception {
+ connection = connectionFactory.createConnection();
+ connection.setClientID("test");
+ Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ connection.start();
+
+ Topic singleTopic = session.createTopic("test.topic");
+ Topic wildcardTopic = session.createTopic("test.>");
+
+ TopicSubscriber durable1 = session.createDurableSubscriber(singleTopic, "single");
+ TopicSubscriber durable2 = session.createDurableSubscriber(wildcardTopic, "wildcard");
+
+ MessageConsumer consumer1 = session.createConsumer(singleTopic);
+ MessageConsumer consumer2 = session.createConsumer(wildcardTopic);
+
+ final ArrayList<Message> messages = new ArrayList<>();
+
+ MessageListener listener = new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ messages.add(message);
+ }
+ };
+
+ durable1.setMessageListener(listener);
+ durable2.setMessageListener(listener);
+ consumer1.setMessageListener(listener);
+ consumer2.setMessageListener(listener);
+
+ MessageProducer producer = session.createProducer(singleTopic);
+ producer.send(session.createTextMessage("test"));
+
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return messages.size() == 4;
+ }
+ });
+
+ ObjectName topicObjName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test.topic");
+ final TopicViewMBean topicView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName, TopicViewMBean.class, true);
+
+ assertEquals(1, topicView.getEnqueueCount());
+ assertEquals(4, topicView.getDispatchCount());
+ assertEquals(4, topicView.getInFlightCount());
+ assertEquals(0, topicView.getDequeueCount());
+
+ ArrayList<SubscriptionViewMBean> subscriberViews = new ArrayList();
+ for (ObjectName name : topicView.getSubscriptions()) {
+ subscriberViews.add(MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true));
+ }
+
+ assertEquals(4, subscriberViews.size());
+
+ for (SubscriptionViewMBean subscriberView : subscriberViews) {
+ assertEquals(1, subscriberView.getEnqueueCounter());
+ assertEquals(1, subscriberView.getDispatchedCounter());
+ assertEquals(0, subscriberView.getDequeueCounter());
+ }
+
+ for (Message message : messages) {
+ try {
+ message.acknowledge();
+ } catch (JMSException ignore) {}
+ }
+
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return topicView.getDequeueCount() == 4;
+ }
+ });
+
+ assertEquals(1, topicView.getEnqueueCount());
+ assertEquals(4, topicView.getDispatchCount());
+ assertEquals(0, topicView.getInFlightCount());
+ assertEquals(4, topicView.getDequeueCount());
+
+ for (SubscriptionViewMBean subscriberView : subscriberViews) {
+ assertEquals(1, subscriberView.getEnqueueCounter());
+ assertEquals(1, subscriberView.getDispatchedCounter());
+ assertEquals(1, subscriberView.getDequeueCounter());
+ }
+
+
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/ee54f093/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
index 960d9ea..f288340 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
@@ -119,7 +119,7 @@ public class DurableSubscriptionOffline2Test extends DurableSubscriptionOfflineT
ObjectName destinationName = broker.getAdminView().getTopics()[0];
TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
- assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+ assertEquals("topic view dequeue not updated", 5, topicView.getDequeueCount());
assertEquals("inflight", 5, topicView.getInFlightCount());
session.close();
@@ -138,7 +138,7 @@ public class DurableSubscriptionOffline2Test extends DurableSubscriptionOfflineT
// destination view
assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
- assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+ assertEquals("topic view dequeue not updated", 5, topicView.getDequeueCount());
assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount());
// consume the rest