You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2022/11/03 14:42:58 UTC

[activemq] branch activemq-5.17.x updated: AMQ-9156 - Make sure in flight metrics are properly decremented on subscription destroys and dispatch failures

This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.17.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.17.x by this push:
     new 9c5a6219e AMQ-9156 - Make sure in flight metrics are properly decremented on subscription destroys and dispatch failures
9c5a6219e is described below

commit 9c5a6219eafa3d02a4c00aaa00234063468ea411
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
AuthorDate: Thu Nov 3 10:38:02 2022 -0400

    AMQ-9156 - Make sure in flight metrics are properly decremented on
    subscription destroys and dispatch failures
    
    (cherry picked from commit 58666afffde7eb43509d155e709e76e6fdba8084)
---
 .../broker/region/PrefetchSubscription.java        |  5 ++
 .../activemq/broker/region/TopicSubscription.java  | 24 +++++++-
 .../AbstractInflightMessageSizeTest.java           | 72 ++++++++++++++++++++--
 3 files changed, 94 insertions(+), 7 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 2e70062b2..c82c0760f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -610,10 +610,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
         for (MessageReference r : dispatched) {
             if (r.getRegionDestination() == destination) {
                 references.add(r);
+                //Decrement the size as we are removing and redispatching all references
                 getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize());
             }
         }
         redispatch.addAll(0, references);
+        //Clean up in flight message stats on the destination after dispatched is cleared
         destination.getDestinationStatistics().getInflight().subtract(references.size());
         dispatched.removeAll(references);
     }
@@ -730,6 +732,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                     if (nodeDest != null) {
                         if (node != QueueMessageReference.NULL_MESSAGE) {
                             nodeDest.getDestinationStatistics().getDispatched().increment();
+                            //We still increment here as the dispatched list is still tracking references at this point
+                            //Metrics will get cleaned up in addReferencesAndUpdateRedispatch() when the dispatched
+                            //list is also cleaned up as the failure causes the subscription to close
                             incrementPrefetchCounter(node);
                             LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}",
                                     info.getConsumerId(), message.getMessageId(), message.getDestination(),
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 8b623bdbe..4e6268302 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -216,7 +216,6 @@ public class TopicSubscription extends AbstractSubscription {
      * Discard any expired messages from the matched list. Called from a
      * synchronized block.
      *
-     * @throws IOException
      */
     protected void removeExpiredMessages() throws IOException {
         try {
@@ -354,6 +353,23 @@ public class TopicSubscription extends AbstractSubscription {
         return null;
     }
 
+    @Override
+    public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
+        if (isUseTopicSubscriptionInflightStats()) {
+            synchronized(dispatchLock) {
+                for (DispatchedNode node : dispatched) {
+                    if (node.getDestination()== destination) {
+                        //We only need to clean up inflight message size here on the sub stats as
+                        //inflight on destination stat is cleaned up on destroy
+                        getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
+                    }
+                }
+                dispatched.clear();
+            }
+        }
+        return super.remove(context, destination);
+    }
+
     /**
      * Occurs when a pull times out. If nothing has been dispatched since the
      * timeout was setup, then send the NULL message.
@@ -692,6 +708,8 @@ public class TopicSubscription extends AbstractSubscription {
                     public void onFailure() {
                         Destination regionDestination = (Destination) node.getRegionDestination();
                         regionDestination.getDestinationStatistics().getDispatched().increment();
+                        //We still increment here as metrics get cleaned up on destroy()
+                        //as the failure causes the subscription to close
                         regionDestination.getDestinationStatistics().getInflight().increment();
                         node.decrementReferenceCount();
                     }
@@ -749,6 +767,10 @@ public class TopicSubscription extends AbstractSubscription {
         setSlowConsumer(false);
         synchronized(dispatchLock) {
             dispatched.clear();
+            //Clear any unacked messages from destination inflight stats
+            if (destination != null) {
+                destination.getDestinationStatistics().getInflight().subtract(getDispatchedQueueSize());
+            }
         }
     }
 
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
index 12db79f10..05b1ef030 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
@@ -16,33 +16,33 @@
  */
 package org.apache.activemq.statistics;
 
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Random;
-
+import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.ActiveMQPrefetchPolicy;
 import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.AbstractSubscription;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.MessageDispatch;
 import org.apache.activemq.util.Wait;
 import org.junit.After;
 import org.junit.Assume;
@@ -69,8 +69,9 @@ public abstract class AbstractInflightMessageSizeTest {
     final protected String destName = "testDest";
 
     //use 10 second wait for assertions instead of the 30 default
-    final protected long WAIT_DURATION = 10 * 1000;
-    final protected long SLEEP_DURATION =  500;
+    protected final long WAIT_DURATION = 10 * 1000;
+    protected final long SLEEP_DURATION =  500;
+    protected final AtomicBoolean failOnDispatch = new AtomicBoolean();
 
     @Parameters
     public static Collection<Object[]> data() {
@@ -102,6 +103,7 @@ public abstract class AbstractInflightMessageSizeTest {
 
     @Before
     public void setUp() throws Exception {
+        failOnDispatch.set(false);
         brokerService = new BrokerService();
         brokerService.setDeleteAllMessagesOnStartup(true);
         TransportConnector tcp = brokerService
@@ -111,6 +113,15 @@ public abstract class AbstractInflightMessageSizeTest {
         PolicyMap pMap = new PolicyMap();
         pMap.setDefaultEntry(policy);
         brokerService.setDestinationPolicy(pMap);
+        brokerService.setPlugins(new BrokerPlugin[]{broker -> new BrokerFilter(broker) {
+            @Override
+            public void preProcessDispatch(MessageDispatch messageDispatch) {
+                super.preProcessDispatch(messageDispatch);
+                if (failOnDispatch.get()) {
+                    throw new RuntimeException("fail dispatch");
+                }
+            }
+        }});
 
         brokerService.start();
         //used to test optimizeAcknowledge works
@@ -307,6 +318,55 @@ public abstract class AbstractInflightMessageSizeTest {
                 Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
     }
 
+    @Test(timeout=60000)
+    public void testInflightMessageSizeDispatchFailure() throws Exception {
+        Assume.assumeTrue(useTopicSubscriptionInflightStats);
+
+        //Fail on all dispatches
+        failOnDispatch.set(true);
+
+        //Need to reset each time here on send because dispatch will cause the connection to close
+        try {
+            sendMessages(1);
+        } catch (Exception e) {
+            //expected as session should close
+        }
+
+        //Wait for session to fail
+        assertTrue(Wait.waitFor(() -> ((ActiveMQSession) session).isClosed(), WAIT_DURATION, SLEEP_DURATION));
+
+        //Make sure all the stats are cleaned up on failure of dispatches
+        assertTrue("Destination inflight message count should be 0",
+            Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION));
+        assertTrue("Consumers size should be 0 due to failure or Inflight sub dispatched message count should be 0 for durable sub",
+            Wait.waitFor(() -> amqDestination.getConsumers().size() == 0 ||
+                getSubscription().getDispatchedQueueSize() == 0, WAIT_DURATION, SLEEP_DURATION));
+        assertTrue("Consumers size should be 0 due to failure or Inflight message size should be 0 for durable sub",
+            Wait.waitFor(() -> amqDestination.getConsumers().size() == 0 ||
+                getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
+    }
+
+    @Test(timeout=60000)
+    public void testInflightMessageSizeConsumerClosed() throws Exception {
+        Assume.assumeTrue(useTopicSubscriptionInflightStats);
+        sendMessages(10);
+
+        //Wait for the 10 messages to get dispatched and then close the consumer to test cleanup
+        assertTrue("Should be 10 in flight messages",
+            Wait.waitFor(() ->  amqDestination.getDestinationStatistics().getInflight().getCount() == 10, WAIT_DURATION, SLEEP_DURATION));
+        consumer.close();
+
+        //Make sure all the stats are cleaned up on failure of dispatches
+        assertTrue("Destination inflight message count should be 0",
+            Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION));
+        assertTrue("Consumers size should be 0 due to failure or Inflight sub dispatched message count should be 0 for durable sub",
+            Wait.waitFor(() -> amqDestination.getConsumers().size() == 0 ||
+                getSubscription().getDispatchedQueueSize() == 0, WAIT_DURATION, SLEEP_DURATION));
+        assertTrue("Consumers size should be 0 due to failure or Inflight message size should be 0 for durable sub",
+            Wait.waitFor(() -> amqDestination.getConsumers().size() == 0 ||
+                getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
+    }
+
     protected long sendMessages(int count) throws JMSException {
         return sendMessages(count, null);
     }