You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2022/11/15 22:01:16 UTC

[activemq] branch main updated: (AMQ-9156, AMQ-9167) - Update TopicSubscription to use a new counter for current dispatched count

This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 8554a1464 (AMQ-9156, AMQ-9167) - Update TopicSubscription to use a new counter for current dispatched count
8554a1464 is described below

commit 8554a1464c6471f81ebbcd6c482376ae32cf6808
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
AuthorDate: Tue Nov 15 16:59:01 2022 -0500

    (AMQ-9156, AMQ-9167) - Update TopicSubscription to use a new counter for
    current dispatched count
    
    The previous way of computing the count of using total dispatched minus
    total dequeued didn't work in the case of destination removal and
    messages were not acked. The counter is needed as the dispatched list is
    optional unlike prefetch subs.
---
 .../activemq/broker/region/TopicSubscription.java     | 19 ++++++++++++++-----
 1 file changed, 14 insertions(+), 5 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index faa29edef..18404fa49 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -69,6 +69,12 @@ public class TopicSubscription extends AbstractSubscription {
     protected final Object dispatchLock = new Object();
     protected final List<DispatchedNode> dispatched = new ArrayList<>();
 
+    //Keep track of current dispatched count. This is necessary because the dispatched list is optional
+    //and only used if in flights stats are turned on to save memory. The previous way of calculating current dispatched
+    //of using total dispatched - dequeues doesn't work well because dequeues won't be incremented on destination removal and
+    //no acks recevied. This counter could be removed in the future if we ever decide to require always using the dispatched list.
+    protected final AtomicInteger currentDispatchedCount = new AtomicInteger();
+
     public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
         super(broker, context, info);
         this.usageManager = usageManager;
@@ -250,6 +256,7 @@ public class TopicSubscription extends AbstractSubscription {
                         synchronized(dispatchLock) {
                             matched.remove();
                             getSubscriptionStatistics().getDispatched().increment();
+                            currentDispatchedCount.incrementAndGet();
                             if (isUseTopicSubscriptionInflightStats()) {
                                 dispatched.add(new DispatchedNode(node));
                                 getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
@@ -359,9 +366,10 @@ public class TopicSubscription extends AbstractSubscription {
             synchronized(dispatchLock) {
                 dispatched.removeIf(node -> {
                     if (node.getDestination() == destination) {
-                        //We only need to clean up inflight message size here on the sub stats as
-                        //inflight on destination stat is cleaned up on destroy
+                        //On removal from dispatched need to decrement counters
+                        currentDispatchedCount.decrementAndGet();
                         getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
+                        destination.getDestinationStatistics().getInflight().decrement();
                         return true;
                     }
                     return false;
@@ -435,6 +443,7 @@ public class TopicSubscription extends AbstractSubscription {
     }
 
     private void incrementStatsOnAck(final Destination destination, final MessageAck ack, final int count) {
+        currentDispatchedCount.addAndGet(-count);
         getSubscriptionStatistics().getDequeues().add(count);
         destination.getDestinationStatistics().getDequeues().add(count);
         destination.getDestinationStatistics().getInflight().subtract(count);
@@ -463,8 +472,7 @@ public class TopicSubscription extends AbstractSubscription {
 
     @Override
     public int getDispatchedQueueSize() {
-        return (int)(getSubscriptionStatistics().getDispatched().getCount() -
-                     getSubscriptionStatistics().getDequeues().getCount());
+        return currentDispatchedCount.get();
     }
 
     public int getMaximumPendingMessages() {
@@ -671,6 +679,7 @@ public class TopicSubscription extends AbstractSubscription {
             md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
             synchronized(dispatchLock) {
                 getSubscriptionStatistics().getDispatched().increment();
+                currentDispatchedCount.incrementAndGet();
                 if (isUseTopicSubscriptionInflightStats()) {
                     dispatched.add(new DispatchedNode(node));
                     getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
@@ -772,7 +781,7 @@ public class TopicSubscription extends AbstractSubscription {
             dispatched.clear();
             //Clear any unacked messages from destination inflight stats
             if (destination != null) {
-                destination.getDestinationStatistics().getInflight().subtract(getDispatchedQueueSize());
+                destination.getDestinationStatistics().getInflight().subtract(currentDispatchedCount.get());
             }
         }
     }