You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2022/11/17 14:11:55 UTC

[activemq] branch activemq-5.17.x updated: AMQ-9159 - Add a test case to verify inflight message stats for wildcard consumer when a destination is removed

This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch activemq-5.17.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.17.x by this push:
     new 29dcd3f8b AMQ-9159 - Add a test case to verify inflight message stats for wildcard consumer when a destination is removed
29dcd3f8b is described below

commit 29dcd3f8bf68bcb3c30acc43793caa9bb5b0d386
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
AuthorDate: Thu Nov 17 09:02:43 2022 -0500

    AMQ-9159 - Add a test case to verify inflight message stats for wildcard
    consumer when a destination is removed
    
    (cherry picked from commit f6e26085cfa3693b2872b1c53354c674c0cbca49)
---
 .../AbstractInflightMessageSizeTest.java           | 80 ++++++++++++++++++++--
 ...DurableSubscriptionInflightMessageSizeTest.java | 19 +++--
 .../QueueSubscriptionInflightMessageSizeTest.java  | 10 +--
 .../TopicSubscriptionInflightMessageSizeTest.java  |  8 +--
 4 files changed, 98 insertions(+), 19 deletions(-)

diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
index 05b1ef030..de012d323 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/AbstractInflightMessageSizeTest.java
@@ -66,7 +66,9 @@ public abstract class AbstractInflightMessageSizeTest {
     protected boolean useTopicSubscriptionInflightStats;
     final protected int ackType;
     final protected boolean optimizeAcknowledge;
-    final protected String destName = "testDest";
+    final protected String destNamePrefix = "testDest";
+    final protected String destName = "testDest.1";
+    final protected String destName2 = "testDest.2";
 
     //use 10 second wait for assertions instead of the 30 default
     protected final long WAIT_DURATION = 10 * 1000;
@@ -367,8 +369,62 @@ public abstract class AbstractInflightMessageSizeTest {
                 getSubscription().getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
     }
 
+    @Test(timeout=60000)
+    public void testInflightMessageSizeRemoveDestination() throws Exception {
+        Assume.assumeTrue(useTopicSubscriptionInflightStats);
+        //Close as we will re-create with a wildcard sub to get messages from 2 destinations
+        consumer.close();
+
+        consumer = getMessageConsumer(destNamePrefix + ".>");
+        sendMessages(10);
+        sendMessages(10, getActiveMQDestination(destName2));
+        Destination amqDestination2 = TestSupport.getDestination(brokerService, getActiveMQDestination(destName2));
+        final Subscription subscription = getSubscription();
+
+        //Wait for the 10 messages to get dispatched and then close the consumer to test cleanup
+        assertTrue("Should be 10 in flight messages",
+            Wait.waitFor(() ->  amqDestination.getDestinationStatistics().getInflight().getCount() == 10, WAIT_DURATION, SLEEP_DURATION));
+        assertTrue("Should be 10 in flight messages",
+            Wait.waitFor(() ->  amqDestination2.getDestinationStatistics().getInflight().getCount() == 10, WAIT_DURATION, SLEEP_DURATION));
+        assertTrue("Inflight message size should be 20",
+            Wait.waitFor(() -> subscription.getInFlightSize() == 20, WAIT_DURATION, SLEEP_DURATION));
+        assertTrue("Inflight message size should be greater than 0",
+            Wait.waitFor(() -> subscription.getInFlightMessageSize() > 0, WAIT_DURATION, SLEEP_DURATION));
+
+        //remove 1 destination, leaving 10 in flight
+        brokerService.getBroker().removeDestination(brokerService.getAdminConnectionContext(), getActiveMQDestination(), 1000);
+
+        //Make sure all the stats are updated after 1 destination removal
+        assertTrue("Destination inflight message count should be 0",
+            Wait.waitFor(() -> amqDestination.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION));
+        assertTrue("Destination inflight message count should still be 10",
+            Wait.waitFor(() -> amqDestination2.getDestinationStatistics().getInflight().getCount() == 10, WAIT_DURATION, SLEEP_DURATION));
+        assertTrue("Inflight message size should be 10",
+            Wait.waitFor(() -> subscription.getInFlightSize() == 10, WAIT_DURATION, SLEEP_DURATION));
+        assertTrue("Inflight message size should be greater than 0",
+            Wait.waitFor(() -> subscription.getInFlightMessageSize() > 0, WAIT_DURATION, SLEEP_DURATION));
+
+        //remove second dest
+        brokerService.getBroker().removeDestination(brokerService.getAdminConnectionContext(), getActiveMQDestination(destName2), 1000);
+
+        assertTrue("Destination inflight message count should be 0",
+            Wait.waitFor(() -> amqDestination2.getDestinationStatistics().getInflight().getCount() == 0, WAIT_DURATION, SLEEP_DURATION));
+        assertTrue("Inflight message size should be 0",
+            Wait.waitFor(() -> subscription.getInFlightSize() == 0, WAIT_DURATION, SLEEP_DURATION));
+        assertTrue("Inflight message size should be 0",
+            Wait.waitFor(() -> subscription.getInFlightMessageSize() == 0, WAIT_DURATION, SLEEP_DURATION));
+    }
+
     protected long sendMessages(int count) throws JMSException {
-        return sendMessages(count, null);
+        return sendMessages(count, null, dest);
+    }
+
+    protected long sendMessages(int count, javax.jms.Destination dest) throws JMSException {
+        return sendMessages(count, null, dest);
+    }
+
+    protected long sendMessages(int count, Integer ttl) throws JMSException {
+        return sendMessages(count, ttl, dest);
     }
 
     /**
@@ -377,7 +433,7 @@ public abstract class AbstractInflightMessageSizeTest {
      * @param count
      * @throws JMSException
      */
-    protected long sendMessages(int count, Integer ttl) throws JMSException {
+    protected long sendMessages(int count, Integer ttl, javax.jms.Destination dest) throws JMSException {
         MessageProducer producer = session.createProducer(dest);
         if (ttl != null) {
             producer.setTimeToLive(ttl);
@@ -412,10 +468,22 @@ public abstract class AbstractInflightMessageSizeTest {
 
     protected abstract Subscription getSubscription();
 
-    protected abstract ActiveMQDestination getActiveMQDestination();
+    protected ActiveMQDestination getActiveMQDestination() {
+        return getActiveMQDestination(destName);
+    }
+
+    protected abstract ActiveMQDestination getActiveMQDestination(String destName);
 
-    protected abstract MessageConsumer getMessageConsumer() throws JMSException;
+    protected MessageConsumer getMessageConsumer() throws JMSException {
+        return getMessageConsumer(destName);
+    }
+
+    protected abstract MessageConsumer getMessageConsumer(String destName) throws JMSException;
+
+    protected javax.jms.Destination getDestination() throws JMSException {
+        return getDestination(destName);
+    }
 
-    protected abstract javax.jms.Destination getDestination() throws JMSException ;
+    protected abstract javax.jms.Destination getDestination(String destName) throws JMSException;
 
 }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java
index a7e947329..2f349296c 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/DurableSubscriptionInflightMessageSizeTest.java
@@ -24,6 +24,8 @@ import org.apache.activemq.broker.region.Topic;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.SubscriptionKey;
+import org.junit.Assume;
+import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -40,8 +42,8 @@ public class DurableSubscriptionInflightMessageSizeTest extends AbstractInflight
     }
 
     @Override
-    protected MessageConsumer getMessageConsumer() throws JMSException {
-        return session.createDurableSubscriber((javax.jms.Topic)dest, "sub1");
+    protected MessageConsumer getMessageConsumer(String destName) throws JMSException {
+        return session.createDurableSubscriber(getDestination(destName), "sub1");
     }
 
     @Override
@@ -50,13 +52,22 @@ public class DurableSubscriptionInflightMessageSizeTest extends AbstractInflight
     }
 
     @Override
-    protected javax.jms.Topic getDestination() throws JMSException {
+    protected javax.jms.Topic getDestination(String destName) throws JMSException {
         return session.createTopic(destName);
     }
 
     @Override
-    protected ActiveMQDestination getActiveMQDestination() {
+    protected ActiveMQDestination getActiveMQDestination(String destName) {
         return new ActiveMQTopic(destName);
     }
 
+    @Test(timeout=60000)
+    public void testInflightMessageSizeRemoveDestination() throws Exception {
+        Assume.assumeTrue(useTopicSubscriptionInflightStats);
+        //Close as we will re-create with a wildcard sub
+        consumer.close();
+        session.unsubscribe("sub1");
+        super.testInflightMessageSizeRemoveDestination();
+    }
+
 }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java
index 217aefb2d..11812ab24 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/QueueSubscriptionInflightMessageSizeTest.java
@@ -40,22 +40,22 @@ public class QueueSubscriptionInflightMessageSizeTest extends AbstractInflightMe
     }
 
     @Override
-    protected MessageConsumer getMessageConsumer() throws JMSException {
-        return session.createConsumer(dest);
+    protected MessageConsumer getMessageConsumer(String destName) throws JMSException {
+        return session.createConsumer(getDestination(destName));
     }
 
     @Override
     protected Subscription getSubscription() {
-        return ((Queue)amqDestination).getConsumers().get(0);
+        return amqDestination.getConsumers().get(0);
     }
 
     @Override
-    protected Destination getDestination() throws JMSException {
+    protected Destination getDestination(String destName) throws JMSException {
         return session.createQueue(destName);
     }
 
     @Override
-    protected ActiveMQDestination getActiveMQDestination() {
+    protected ActiveMQDestination getActiveMQDestination(String destName) {
         return new ActiveMQQueue(destName);
     }
 
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java
index 132a96d9b..1691329de 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/statistics/TopicSubscriptionInflightMessageSizeTest.java
@@ -43,8 +43,8 @@ public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMe
     }
 
     @Override
-    protected MessageConsumer getMessageConsumer() throws JMSException {
-        return session.createConsumer(dest);
+    protected MessageConsumer getMessageConsumer(String destName) throws JMSException {
+        return session.createConsumer(getDestination(destName));
     }
 
     @Override
@@ -53,12 +53,12 @@ public class TopicSubscriptionInflightMessageSizeTest extends AbstractInflightMe
     }
 
     @Override
-    protected Destination getDestination() throws JMSException {
+    protected Destination getDestination(String destName) throws JMSException {
         return session.createTopic(destName);
     }
 
     @Override
-    protected ActiveMQDestination getActiveMQDestination() {
+    protected ActiveMQDestination getActiveMQDestination(String destName) {
         return new ActiveMQTopic(destName);
     }