You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2022/11/03 14:42:58 UTC
[activemq] branch activemq-5.17.x updated: AMQ-9156 - Make sure in flight metrics are properly decremented on subscription destroys and dispatch failures
This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch activemq-5.17.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.17.x by this push:
new 9c5a6219e AMQ-9156 - Make sure in flight metrics are properly decremented on subscription destroys and dispatch failures
9c5a6219e is described below
commit 9c5a6219eafa3d02a4c00aaa00234063468ea411
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
AuthorDate: Thu Nov 3 10:38:02 2022 -0400
AMQ-9156 - Make sure in flight metrics are properly decremented on
subscription destroys and dispatch failures
(cherry picked from commit 58666afffde7eb43509d155e709e76e6fdba8084)
---
.../broker/region/PrefetchSubscription.java | 5 ++
.../activemq/broker/region/TopicSubscription.java | 24 +++++++-
.../AbstractInflightMessageSizeTest.java | 72 ++++++++++++++++++++--
3 files changed, 94 insertions(+), 7 deletions(-)
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index 2e70062b2..c82c0760f 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -610,10 +610,12 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
for (MessageReference r : dispatched) {
if (r.getRegionDestination() == destination) {
references.add(r);
+ //Decrement the size as we are removing and redispatching all references
getSubscriptionStatistics().getInflightMessageSize().addSize(-r.getSize());
}
}
redispatch.addAll(0, references);
+ //Clean up in flight message stats on the destination after dispatched is cleared
destination.getDestinationStatistics().getInflight().subtract(references.size());
dispatched.removeAll(references);
}
@@ -730,6 +732,9 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
if (nodeDest != null) {
if (node != QueueMessageReference.NULL_MESSAGE) {
nodeDest.getDestinationStatistics().getDispatched().increment();
+ //We still increment here as the dispatched list is still tracking references at this point
+ //Metrics will get cleaned up in addReferencesAndUpdateRedispatch() when the dispatched
+ //list is also cleaned up as the failure causes the subscription to close
incrementPrefetchCounter(node);
LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}",
info.getConsumerId(), message.getMessageId(), message.getDestination(),
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
index 8b623bdbe..4e6268302 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
@@ -216,7 +216,6 @@ public class TopicSubscription extends AbstractSubscription {
* Discard any expired messages from the matched list. Called from a
* synchronized block.
*
- * @throws IOException
*/
protected void removeExpiredMessages() throws IOException {
try {
@@ -354,6 +353,23 @@ public class TopicSubscription extends AbstractSubscription {
return null;
}
+ @Override
+ public List<MessageReference> remove(ConnectionContext context, Destination destination) throws Exception {
+ if (isUseTopicSubscriptionInflightStats()) {
+ synchronized(dispatchLock) {
+ for (DispatchedNode node : dispatched) {
+ if (node.getDestination()== destination) {
+ //We only need to clean up inflight message size here on the sub stats as
+ //inflight on destination stat is cleaned up on destroy
+ getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
+ }
+ }
+ dispatched.clear();
+ }
+ }
+ return super.remove(context, destination);
+ }
+
/**
* Occurs when a pull times out. If nothing has been dispatched since the
* timeout was setup, then send the NULL message.
@@ -692,6 +708,8 @@ public class TopicSubscription extends AbstractSubscription {
public void onFailure() {
Destination regionDestination = (Destination) node.getRegionDestination();
regionDestination.getDestinationStatistics().getDispatched().increment();
+ //We still increment here as metrics get cleaned up on destroy()
+ //as the failure causes the subscription to close
regionDestination.getDestinationStatistics().getInflight().increment();
node.decrementReferenceCount();
}
@@ -749,6 +767,10 @@ public class TopicSubscription extends AbstractSubscription {
setSlowConsumer(false);
synchronized(dispatchLock) {
dispatched.clear();
+ //Clear any unacked messages from destination inflight stats
+ if (destination != null) {
+ destination.getDestinationStatistics().getInflight().subtract(getDispatchedQueueSize());
+ }
}
}
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
index 12db79f10..05b1ef030 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
@@ -16,33 +16,33 @@
*/
package org.apache.activemq.statistics;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
-
+import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.TestSupport;
+import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerPlugin;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
-import org.apache.activemq.broker.region.AbstractSubscription;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assume;
@@ -69,8 +69,9 @@ public abstract class AbstractInflightMessageSizeTest {
final protected String destName = "testDest";
//use 10 second wait for assertions instead of the 30 default
- final protected long WAIT_DURATION = 10 * 1000;
- final protected long SLEEP_DURATION = 500;
+ protected final long WAIT_DURATION = 10 * 1000;
+ protected final long SLEEP_DURATION = 500;
+ protected final AtomicBoolean failOnDispatch = new AtomicBoolean();
@Parameters
public static Collection<Object[]> data() {
@@ -102,6 +103,7 @@ public abstract class AbstractInflightMessageSizeTest {
@Before
public void setUp() throws Exception {
+ failOnDispatch.set(false);
brokerService = new BrokerService();
brokerService.setDeleteAllMessagesOnStartup(true);
TransportConnector tcp = brokerService
@@ -111,6 +113,15 @@ public abstract class AbstractInflightMessageSizeTest {
PolicyMap pMap = new PolicyMap();
pMap.setDefaultEntry(policy);
brokerService.setDestinationPolicy(pMap);
+ brokerService.setPlugins(new BrokerPlugin[]{broker -> new BrokerFilter(broker) {
+ @Override
+ public void preProcessDispatch(MessageDispatch messageDispatch) {
+ super.preProcessDispatch(messageDispatch);
+ if (failOnDispatch.get()) {
+ throw new RuntimeException("fail dispatch");
+ }
+ }
+ }});
brokerService.start();
//used to test optimizeAcknowledge works
@@ -307,6 +318,55 @@ public abstract class AbstractInflightMessageSizeTest {
Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
}
+ @Test(timeout=60000)
+ public void testInflightMessageSizeDispatchFailure() throws Exception {
+ Assume.assumeTrue(useTopicSubscriptionInflightStats);
+
+ //Fail on all dispatches
+ failOnDispatch.set(true);
+
+ //Need to reset each time here on send because dispatch will cause the connection to close
+ try {
+ sendMessages(1);
+ } catch (Exception e) {
+ //expected as session should close
+ }
+
+ //Wait for session to fail
+ assertTrue(Wait.waitFor(() -> ((ActiveMQSession) session).isClosed(), WAIT_DURATION, SLEEP_DURATION));
+
+ //Make sure all the stats are cleaned up on failure of dispatches
+ assertTrue("Destination inflight message count should be 0",
+ Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION));
+ assertTrue("Consumers size should be 0 due to failure or Inflight sub dispatched message count should be 0 for durable sub",
+ Wait.waitFor(() -> amqDestination.getConsumers().size() == 0 ||
+ getSubscription().getDispatchedQueueSize() == 0, WAIT_DURATION, SLEEP_DURATION));
+ assertTrue("Consumers size should be 0 due to failure or Inflight message size should be 0 for durable sub",
+ Wait.waitFor(() -> amqDestination.getConsumers().size() == 0 ||
+ getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
+ }
+
+ @Test(timeout=60000)
+ public void testInflightMessageSizeConsumerClosed() throws Exception {
+ Assume.assumeTrue(useTopicSubscriptionInflightStats);
+ sendMessages(10);
+
+ //Wait for the 10 messages to get dispatched and then close the consumer to test cleanup
+ assertTrue("Should be 10 in flight messages",
+ Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 10, WAIT_DURATION, SLEEP_DURATION));
+ consumer.close();
+
+ //Make sure all the stats are cleaned up on failure of dispatches
+ assertTrue("Destination inflight message count should be 0",
+ Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION));
+ assertTrue("Consumers size should be 0 due to failure or Inflight sub dispatched message count should be 0 for durable sub",
+ Wait.waitFor(() -> amqDestination.getConsumers().size() == 0 ||
+ getSubscription().getDispatchedQueueSize() == 0, WAIT_DURATION, SLEEP_DURATION));
+ assertTrue("Consumers size should be 0 due to failure or Inflight message size should be 0 for durable sub",
+ Wait.waitFor(() -> amqDestination.getConsumers().size() == 0 ||
+ getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
+ }
+
protected long sendMessages(int count) throws JMSException {
return sendMessages(count, null);
}