You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2022/11/01 14:30:02 UTC

[activemq] branch main updated: AMQ-9153: Fix slow consumer advisory for queue subscriptions

This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/main by this push:
     new 596ee3168 AMQ-9153: Fix slow consumer advisory for queue subscriptions
596ee3168 is described below

commit 596ee316874894ad0342475d8f242913bd507cd1
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
AuthorDate: Tue Nov 1 08:19:22 2022 -0400

    AMQ-9153: Fix slow consumer advisory for queue subscriptions
    
    Due to changes with Queues to check if consumers are full before adding
    more messages to the subscription, the Queue dispatch logic needed to be
    updated to mark subscriptions as slow and send advisories if configured
    instead of relying on the subscription itself to do it.
---
 .../org/apache/activemq/broker/region/Queue.java   | 10 +++++
 .../apache/activemq/advisory/AdvisoryTests.java    | 43 +++++++++++++++++++---
 2 files changed, 48 insertions(+), 5 deletions(-)

diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 20f8a3a86..4bc669046 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -2210,6 +2210,16 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
                         // no further dispatch of list to a full consumer to
                         // avoid out of order message receipt
                         fullConsumers.add(s);
+
+                        //For full consumers we need to mark that they are slow and
+                        // then call the broker.slowConsumer() hook if implemented
+                        if (s instanceof PrefetchSubscription) {
+                            final PrefetchSubscription sub = (PrefetchSubscription) s;
+                            if (!sub.isSlowConsumer()) {
+                                sub.setSlowConsumer(true);
+                                broker.slowConsumer(sub.getContext(), this, sub);
+                            }
+                        }
                         LOG.trace("Subscription full {}", s);
                     }
                 }
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
index 07bed87bc..201d8a092 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/advisory/AdvisoryTests.java
@@ -28,6 +28,7 @@ import java.util.HashSet;
 import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -110,17 +111,48 @@ public class AdvisoryTests {
     }
 
     @Test(timeout = 60000)
-    public void testSlowConsumerAdvisory() throws Exception {
+    public void testQueueSlowConsumerAdvisory() throws Exception {
+        testSlowConsumerAdvisory(new ActiveMQQueue(getClass().getName()));
+    }
+
+    @Test(timeout = 60000)
+    public void testTopicSlowConsumerAdvisory() throws Exception {
+        broker.getDestinationPolicy().getDefaultEntry().setTopicPrefetch(500);
+        broker.getDestinationPolicy().getDefaultEntry().setPendingMessageLimitStrategy(null);
+        testSlowConsumerAdvisory(new ActiveMQTopic(getClass().getName()));
+    }
+
+    @Test(timeout = 60000)
+    public void testDurableSlowConsumerAdvisory() throws Exception {
         Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        Queue queue = s.createQueue(getClass().getName());
-        MessageConsumer consumer = s.createConsumer(queue);
+        Topic topic = s.createTopic(getClass().getName());
+        MessageConsumer consumer = s.createDurableSubscriber(topic, "sub1");
         assertNotNull(consumer);
 
-        Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) queue);
+        Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) topic);
         s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
         // start throwing messages at the consumer
-        MessageProducer producer = s.createProducer(queue);
+        MessageProducer producer = s.createProducer(topic);
+        for (int i = 0; i < MESSAGE_COUNT; i++) {
+            BytesMessage m = s.createBytesMessage();
+            m.writeBytes(new byte[1024]);
+            producer.send(m);
+        }
+        Message msg = advisoryConsumer.receive(1000);
+        assertNotNull(msg);
+    }
+
+    private void testSlowConsumerAdvisory(Destination dest) throws Exception {
+        Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = s.createConsumer(dest);
+        assertNotNull(consumer);
+
+        Topic advisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic((ActiveMQDestination) dest);
+        s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer advisoryConsumer = s.createConsumer(advisoryTopic);
+        // start throwing messages at the consumer
+        MessageProducer producer = s.createProducer(dest);
         for (int i = 0; i < MESSAGE_COUNT; i++) {
             BytesMessage m = s.createBytesMessage();
             m.writeBytes(new byte[1024]);
@@ -343,6 +375,7 @@ public class AdvisoryTests {
         }
         ConnectionFactory factory = createConnectionFactory();
         connection = factory.createConnection();
+        connection.setClientID("clientId");
         connection.start();
     }