You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2015/08/26 12:28:49 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5831 - revisit topic subscriptions

Repository: activemq
Updated Branches:
  refs/heads/master acb8602ad -> ee54f0930


https://issues.apache.org/jira/browse/AMQ-5831 - revisit topic subscriptions


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/ee54f093
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/ee54f093
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/ee54f093

Branch: refs/heads/master
Commit: ee54f09303f52d2753ce9ac8e64008e3e60c2eab
Parents: acb8602
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Wed Aug 26 12:28:19 2015 +0200
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Wed Aug 26 12:28:39 2015 +0200

----------------------------------------------------------------------
 .../broker/region/DurableTopicSubscription.java |   1 +
 .../broker/region/TopicSubscription.java        |  65 +++++-------
 .../apache/activemq/broker/jmx/MBeanTest.java   | 100 +++++++++++++++++--
 .../DurableSubscriptionOffline2Test.java        |   4 +-
 4 files changed, 119 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/ee54f093/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
index 0107c58..cf60fdf 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/DurableTopicSubscription.java
@@ -322,6 +322,7 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
         regionDestination.acknowledge(context, this, ack, node);
         redeliveredMessages.remove(node.getMessageId());
         node.decrementReferenceCount();
+        ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/activemq/blob/ee54f093/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index b20d080..d3e683d 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -279,37 +279,16 @@ public class TopicSubscription extends AbstractSubscription {
         if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck()) {
             if (context.isInTransaction()) {
                 context.getTransaction().addSynchronization(new Synchronization() {
-
                     @Override
                     public void afterCommit() throws Exception {
-                        synchronized (TopicSubscription.this) {
-                            if (singleDestination && destination != null) {
-                                destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
-                            }
-                        }
-                        getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
-                        updateInflightMessageSizeOnAck(ack);
+                        updateStatsOnAck(ack);
                         dispatchMatched();
                     }
                 });
             } else {
-                if (singleDestination && destination != null) {
-                    destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
-                    destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
-                    if (info.isNetworkSubscription()) {
-                        destination.getDestinationStatistics().getForwards().add(ack.getMessageCount());
-                    }
-                }
-                getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
-                updateInflightMessageSizeOnAck(ack);
-            }
-            while (true) {
-                int currentExtension = prefetchExtension.get();
-                int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
-                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
-                    break;
-                }
+                updateStatsOnAck(ack);
             }
+            updatePrefetch(ack);
             dispatchMatched();
             return;
         } else if (ack.isDeliveredAck()) {
@@ -318,19 +297,8 @@ public class TopicSubscription extends AbstractSubscription {
             dispatchMatched();
             return;
         } else if (ack.isExpiredAck()) {
-            if (singleDestination && destination != null) {
-                destination.getDestinationStatistics().getInflight().subtract(ack.getMessageCount());
-                destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
-                destination.getDestinationStatistics().getDequeues().add(ack.getMessageCount());
-            }
-            getSubscriptionStatistics().getDequeues().add(ack.getMessageCount());
-            while (true) {
-                int currentExtension = prefetchExtension.get();
-                int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
-                if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
-                    break;
-                }
-            }
+            updateStatsOnAck(ack);
+            updatePrefetch(ack);
             dispatchMatched();
             return;
         } else if (ack.isRedeliveredAck()) {
@@ -393,10 +361,10 @@ public class TopicSubscription extends AbstractSubscription {
     }
 
     /**
-     * Update the inflight statistics on message ack.
+     * Update the statistics on message ack.
      * @param ack
      */
-    private void updateInflightMessageSizeOnAck(final MessageAck ack) {
+    private void updateStatsOnAck(final MessageAck ack) {
         synchronized(dispatchLock) {
             boolean inAckRange = false;
             List<MessageReference> removeList = new ArrayList<MessageReference>();
@@ -417,6 +385,25 @@ public class TopicSubscription extends AbstractSubscription {
             for (final MessageReference node : removeList) {
                 dispatched.remove(node);
                 getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
+                getSubscriptionStatistics().getDequeues().increment();
+                ((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
+                ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
+                if (info.isNetworkSubscription()) {
+                    ((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
+                }
+                if (ack.isExpiredAck()) {
+                    destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
+                }
+            }
+        }
+    }
+
+    private void updatePrefetch(MessageAck ack) {
+        while (true) {
+            int currentExtension = prefetchExtension.get();
+            int newExtension = Math.max(0, currentExtension - ack.getMessageCount());
+            if (prefetchExtension.compareAndSet(currentExtension, newExtension)) {
+                break;
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ee54f093/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
index acf706d..b9b8e2f 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
@@ -20,21 +20,13 @@ import java.io.BufferedReader;
 import java.io.InputStreamReader;
 import java.net.URI;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
+import javax.jms.*;
 import javax.management.MBeanServer;
 import javax.management.MBeanServerInvocationHandler;
 import javax.management.MalformedObjectNameException;
@@ -59,6 +51,7 @@ import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.memory.list.MessageList;
 import org.apache.activemq.util.JMXSupport;
 import org.apache.activemq.util.URISupport;
 import org.apache.activemq.util.Wait;
@@ -1621,4 +1614,91 @@ public class MBeanTest extends EmbeddedBrokerTestSupport {
             assertNotNull("Message: " + i, consumer.receive(5000));
         }
     }
+
+    public void testTopicView() throws Exception {
+        connection = connectionFactory.createConnection();
+        connection.setClientID("test");
+        Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        connection.start();
+
+        Topic singleTopic = session.createTopic("test.topic");
+        Topic wildcardTopic = session.createTopic("test.>");
+
+        TopicSubscriber durable1 = session.createDurableSubscriber(singleTopic, "single");
+        TopicSubscriber durable2 = session.createDurableSubscriber(wildcardTopic, "wildcard");
+
+        MessageConsumer consumer1 = session.createConsumer(singleTopic);
+        MessageConsumer consumer2 = session.createConsumer(wildcardTopic);
+
+        final ArrayList<Message> messages = new ArrayList<>();
+
+        MessageListener listener = new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                messages.add(message);
+            }
+        };
+
+        durable1.setMessageListener(listener);
+        durable2.setMessageListener(listener);
+        consumer1.setMessageListener(listener);
+        consumer2.setMessageListener(listener);
+
+        MessageProducer producer = session.createProducer(singleTopic);
+        producer.send(session.createTextMessage("test"));
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return messages.size() == 4;
+            }
+        });
+
+        ObjectName topicObjName = assertRegisteredObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test.topic");
+        final TopicViewMBean topicView = MBeanServerInvocationHandler.newProxyInstance(mbeanServer, topicObjName, TopicViewMBean.class, true);
+
+        assertEquals(1, topicView.getEnqueueCount());
+        assertEquals(4, topicView.getDispatchCount());
+        assertEquals(4, topicView.getInFlightCount());
+        assertEquals(0, topicView.getDequeueCount());
+
+        ArrayList<SubscriptionViewMBean> subscriberViews = new ArrayList();
+        for (ObjectName name : topicView.getSubscriptions()) {
+            subscriberViews.add(MBeanServerInvocationHandler.newProxyInstance(mbeanServer, name, SubscriptionViewMBean.class, true));
+        }
+
+        assertEquals(4, subscriberViews.size());
+
+        for (SubscriptionViewMBean subscriberView : subscriberViews) {
+            assertEquals(1, subscriberView.getEnqueueCounter());
+            assertEquals(1, subscriberView.getDispatchedCounter());
+            assertEquals(0, subscriberView.getDequeueCounter());
+        }
+
+        for (Message message : messages) {
+            try {
+                message.acknowledge();
+            } catch (JMSException ignore) {}
+        }
+
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return topicView.getDequeueCount() == 4;
+            }
+        });
+
+        assertEquals(1, topicView.getEnqueueCount());
+        assertEquals(4, topicView.getDispatchCount());
+        assertEquals(0, topicView.getInFlightCount());
+        assertEquals(4, topicView.getDequeueCount());
+
+        for (SubscriptionViewMBean subscriberView : subscriberViews) {
+            assertEquals(1, subscriberView.getEnqueueCounter());
+            assertEquals(1, subscriberView.getDispatchedCounter());
+            assertEquals(1, subscriberView.getDequeueCounter());
+        }
+
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/ee54f093/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
index 960d9ea..f288340 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionOffline2Test.java
@@ -119,7 +119,7 @@ public class DurableSubscriptionOffline2Test extends DurableSubscriptionOfflineT
         ObjectName destinationName = broker.getAdminView().getTopics()[0];
         TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true);
         assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
-        assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+        assertEquals("topic view dequeue not updated", 5, topicView.getDequeueCount());
         assertEquals("inflight", 5, topicView.getInFlightCount());
 
         session.close();
@@ -138,7 +138,7 @@ public class DurableSubscriptionOffline2Test extends DurableSubscriptionOfflineT
 
         // destination view
         assertEquals("correct enqueue", 10, topicView.getEnqueueCount());
-        assertEquals("still zero dequeue, we don't decrement on each sub ack to stop exceeding the enqueue count with multiple subs", 0, topicView.getDequeueCount());
+        assertEquals("topic view dequeue not updated", 5, topicView.getDequeueCount());
         assertEquals("inflight back to 0 after deactivate", 0, topicView.getInFlightCount());
 
         // consume the rest