You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2020/06/10 15:41:12 UTC

[activemq] branch activemq-5.15.x updated: AMQ-7496 - Properly decrement inflight message size on message expiration

This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
     new 1be7873  AMQ-7496 - Properly decrement inflight message size on message expiration
1be7873 is described below

commit 1be7873a35aab1ee1d6f14b8c47def83cc56cd45
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
AuthorDate: Wed Jun 10 09:10:09 2020 -0400

    AMQ-7496 - Properly decrement inflight message size on message
    expiration
    
    Also clean up some of the handling of inflight metrics in Prefetch
    subscription
    
    (cherry picked from commit cc0bcdd5dc6856c655d5e40e179db5b455a7efb1)
---
 .../broker/region/PrefetchSubscription.java        |  33 ++++---
 .../AbstractInflightMessageSizeTest.java           | 110 +++++++++++----------
 2 files changed, 76 insertions(+), 67 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
index fe7e732..83136f2 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
@@ -174,7 +174,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                             pending.remove();
                             createMessageDispatch(node, node.getMessage());
                             dispatched.add(node);
-                            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
                             onDispatch(node, node.getMessage());
                         }
                         return;
@@ -224,7 +223,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                         // Don't remove the nodes until we are committed.
                         if (!context.isInTransaction()) {
                             getSubscriptionStatistics().getDequeues().increment();
-                            ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
                             removeList.add(node);
                             contractPrefetchExtension(1);
                         } else {
@@ -240,7 +238,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                 }
                 for (final MessageReference node : removeList) {
                     dispatched.remove(node);
-                    getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
+                    decrementPrefetchCounter(node);
                 }
                 // this only happens after a reconnect - get an ack which is not
                 // valid
@@ -256,9 +254,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                         // Don't remove the nodes until we are committed - immediateAck option
                         if (!context.isInTransaction()) {
                             getSubscriptionStatistics().getDequeues().increment();
-                            ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
                             dispatched.remove(node);
-                            getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
+                            decrementPrefetchCounter(node);
                             contractPrefetchExtension(1);
                         } else {
                             registerRemoveSync(context, node);
@@ -306,7 +303,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                             regionDestination.messageExpired(context, this, node);
                         }
                         iter.remove();
-                        nodeDest.getDestinationStatistics().getInflight().decrement();
+                        decrementPrefetchCounter(node);
 
                         if (ack.getLastMessageId().equals(messageId)) {
                             contractPrefetchExtension(1);
@@ -364,8 +361,6 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                     if (inAckRange) {
                         sendToDLQ(context, node, ack.getPoisonCause());
                         Destination nodeDest = (Destination) node.getRegionDestination();
-                        nodeDest.getDestinationStatistics()
-                        .getInflight().decrement();
                         removeList.add(node);
                         getSubscriptionStatistics().getDequeues().increment();
                         index++;
@@ -380,7 +375,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                 }
                 for (final MessageReference node : removeList) {
                     dispatched.remove(node);
-                    getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
+                    decrementPrefetchCounter(node);
                 }
                 if (!callDispatchMatched) {
                     throw new JMSException(
@@ -414,8 +409,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                         synchronized (dispatchLock) {
                             getSubscriptionStatistics().getDequeues().increment();
                             dispatched.remove(node);
-                            getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
-                            nodeDest.getDestinationStatistics().getInflight().decrement();
+                            decrementPrefetchCounter(node);
                         }
                         contractPrefetchExtension(1);
                         nodeDest.wakeup();
@@ -697,9 +691,8 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
 
         MessageDispatch md = createMessageDispatch(node, message);
         if (node != QueueMessageReference.NULL_MESSAGE) {
-            getSubscriptionStatistics().getDispatched().increment();
             dispatched.add(node);
-            getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
+            getSubscriptionStatistics().getDispatched().increment();
         }
         if (getPrefetchSize() == 0) {
             while (true) {
@@ -726,7 +719,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
                     if (nodeDest != null) {
                         if (node != QueueMessageReference.NULL_MESSAGE) {
                             nodeDest.getDestinationStatistics().getDispatched().increment();
-                            nodeDest.getDestinationStatistics().getInflight().increment();
+                            incrementPrefetchCounter(node);
                             LOG.trace("{} failed to dispatch: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
                         }
                     }
@@ -748,7 +741,7 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
         if (nodeDest != null) {
             if (node != QueueMessageReference.NULL_MESSAGE) {
                 nodeDest.getDestinationStatistics().getDispatched().increment();
-                nodeDest.getDestinationStatistics().getInflight().increment();
+                incrementPrefetchCounter(node);
                 LOG.trace("{} dispatched: {} - {}, dispatched: {}, inflight: {}", new Object[]{ info.getConsumerId(), message.getMessageId(), message.getDestination(), getSubscriptionStatistics().getDispatched().getCount(), dispatched.size() });
             }
         }
@@ -850,4 +843,14 @@ public abstract class PrefetchSubscription extends AbstractSubscription {
             LOG.trace("Caught exception during dispatch after prefetch change.", e);
         }
     }
+
+    private void incrementPrefetchCounter(final MessageReference node) {
+        ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().increment();
+        getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
+    }
+
+    private void decrementPrefetchCounter(final MessageReference node) {
+        ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
+        getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
+    }
 }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
index a127feb..81484e9 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.statistics;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.Arrays;
@@ -36,6 +37,7 @@ import org.apache.activemq.ActiveMQSession;
 import org.apache.activemq.TestSupport;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.AbstractSubscription;
 import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
@@ -151,21 +153,14 @@ public abstract class AbstractInflightMessageSizeTest {
 
         final long size = sendMessages(10);
 
-        assertTrue("Inflight message size should be greater than the content length sent", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return getSubscription().getInFlightMessageSize() > size;
-            }
-        }));
+        assertTrue("Inflight message size should be greater than the content length sent",
+                Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > size));
+        assertTrue("Inflight message count should equal number of messages sent",
+                Wait.waitFor(() -> getSubscription().getDispatchedCounter() == 10));
 
         receiveMessages(10);
 
-        assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return getSubscription().getInFlightMessageSize() == 0;
-            }
-        }));
+        assertTrue("Inflight message size should be 0", Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0));
     }
 
     /**
@@ -176,30 +171,26 @@ public abstract class AbstractInflightMessageSizeTest {
     @Test(timeout=15000)
     public void testInflightMessageSizePrefetchFilled() throws Exception {
         Assume.assumeTrue(useTopicSubscriptionInflightStats);
+        //turn off extension to make the test reliable
+        ((AbstractSubscription)getSubscription()).setUsePrefetchExtension(false);
 
         final long size = sendMessages(prefetch);
 
-        assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return getSubscription().getInFlightMessageSize() > size;
-            }
-        }));
+        assertTrue("Inflight message size should be greater than content length",
+                Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > size));
+        assertTrue("Inflight message count should equal number of messages sent",
+                Wait.waitFor(() -> getSubscription().getDispatchedCounter() == prefetch));
 
         final long inFlightSize = getSubscription().getInFlightMessageSize();
         sendMessages(10);
 
         //Prefetch has been filled, so the size should not change with 10 more messages
-        assertEquals("Inflight message size should not change", inFlightSize, getSubscription().getInFlightMessageSize());
-
+        assertTrue("Inflight message count should equal number of messages sent",
+                Wait.waitFor(() -> getSubscription().getDispatchedCounter() == prefetch));
+        assertTrue("Inflight message size should not change", Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == inFlightSize));
         receiveMessages(prefetch + 10);
 
-        assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return getSubscription().getInFlightMessageSize() == 0;
-            }
-        }));
+        assertTrue("Inflight message size should be 0", Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0));
     }
 
     /**
@@ -213,33 +204,22 @@ public abstract class AbstractInflightMessageSizeTest {
 
         final long size = sendMessages(prefetch - 10);
 
-        assertTrue("Inflight message size should be greater than content length", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return getSubscription().getInFlightMessageSize() > size;
-            }
-        }));
+        assertTrue("Inflight message size should be greater than content length",
+                Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > size));
+        assertTrue("Inflight message count should equal number of messages sent",
+                Wait.waitFor(() -> getSubscription().getDispatchedCounter() == prefetch - 10));
 
         //capture the inflight size and send 10 more messages
         final long inFlightSize = getSubscription().getInFlightMessageSize();
         sendMessages(10);
 
         //Prefetch has NOT been filled, so the size should rise with 10 more messages
-        assertTrue("Inflight message size should be greater than previous inlight size", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return getSubscription().getInFlightMessageSize() > inFlightSize;
-            }
-        }));
+        assertTrue("Inflight message size should be greater than previous inlight size",
+                Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > inFlightSize));
 
         receiveMessages(prefetch);
 
-        assertTrue("Inflight message size should be 0", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return getSubscription().getInFlightMessageSize() == 0;
-            }
-        }));
+        assertTrue("Inflight message size should be 0", Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0));
     }
 
 
@@ -256,12 +236,10 @@ public abstract class AbstractInflightMessageSizeTest {
 
         final long size = sendMessages(10);
 
-        assertTrue("Inflight message size should be greater than the content length sent", Wait.waitFor(new Wait.Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return getSubscription().getInFlightMessageSize() > size;
-            }
-        }));
+        assertTrue("Inflight message size should be greater than the content length sent",
+                Wait.waitFor(() -> getSubscription().getInFlightMessageSize() > size));
+        assertTrue("Inflight message count should equal number of messages sent",
+                Wait.waitFor(() -> getSubscription().getDispatchedCounter() == 10));
 
        long inFlightSize = getSubscription().getInFlightMessageSize();
 
@@ -270,7 +248,32 @@ public abstract class AbstractInflightMessageSizeTest {
         }
         session.rollback();
 
-        assertEquals("Inflight message size should not change on rollback", inFlightSize, getSubscription().getInFlightMessageSize());
+        assertTrue("Inflight message size should not change on rollback",
+                Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == inFlightSize));
+    }
+
+    @Test(timeout=30000)
+    public void testInflightMessageSizeConsumerExpiration() throws Exception {
+        Assume.assumeTrue(useTopicSubscriptionInflightStats);
+        Assume.assumeFalse(optimizeAcknowledge);
+
+        int ttl = 500;
+        int messageCount = 10;
+        //Send 10 messages with a TTL of 500 ms which is long enough to be paged in and then wait for TTL to pass
+        sendMessages(10, ttl);
+        Thread.sleep(ttl * 2);
+
+        //Make sure we can't receive and all 10 messages were expired
+        //verify in flight size and count is now 0
+        assertNull(consumer.receive(10));
+        assertTrue("Expired count is wrong", Wait.waitFor(() -> brokerService.getDestination(getActiveMQDestination())
+                .getDestinationStatistics().getExpired().getCount() == messageCount));
+        assertTrue("Inflight message count should be 0", Wait.waitFor(() -> getSubscription().getDispatchedQueueSize() == 0));
+        assertTrue("Inflight message size should be 0", Wait.waitFor(() -> getSubscription().getInFlightMessageSize() == 0));
+    }
+
+    protected long sendMessages(int count) throws JMSException {
+        return sendMessages(count, null);
     }
 
     /**
@@ -279,8 +282,11 @@ public abstract class AbstractInflightMessageSizeTest {
      * @param count
      * @throws JMSException
      */
-    protected long sendMessages(int count) throws JMSException {
+    protected long sendMessages(int count, Integer ttl) throws JMSException {
         MessageProducer producer = session.createProducer(dest);
+        if (ttl != null) {
+            producer.setTimeToLive(ttl);
+        }
         long totalSize = 0;
         for (int i = 0; i < count; i++) {
             Random r = new Random();