You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/05/31 16:21:35 UTC

[activemq-artemis] branch master updated: ARTEMIS-1011 Slow consumer detection after bursts

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new ef2ec7b  ARTEMIS-1011 Slow consumer detection after bursts
     new f0c023e  This closes #2688
ef2ec7b is described below

commit ef2ec7b56ea7dcae33c17dcadab05411ad8be282
Author: brusdev <br...@gmail.com>
AuthorDate: Thu May 30 14:14:53 2019 +0200

    ARTEMIS-1011 Slow consumer detection after bursts
    
    Fix slow consumer detection after a burst, taking into account messages
    which are already in queue at the start of queueRateCheckTime period.
---
 .../artemis/core/server/impl/QueueImpl.java        | 22 ++++----
 .../tests/integration/client/SlowConsumerTest.java | 66 +++++++++++++++++++++-
 2 files changed, 76 insertions(+), 12 deletions(-)

diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 76c0219..a772529 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3935,19 +3935,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       @Override
       public void run() {
          float queueRate = getRate();
+         long queueMessages = getMessageCount();
+
          if (logger.isDebugEnabled()) {
-            logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
+            logger.debug(getAddress() + ":" + getName() + " has " + queueMessages + " message(s) and " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
          }
 
 
          if (consumers.size() == 0) {
             logger.debug("There are no consumers, no need to check slow consumer's rate");
             return;
-         } else if (queueRate < (threshold * consumers.size())) {
-            if (logger.isDebugEnabled()) {
-               logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
+         } else {
+            float queueThreshold = threshold * consumers.size();
+
+            if (queueRate < queueThreshold && queueMessages < queueThreshold) {
+               if (logger.isDebugEnabled()) {
+                  logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
+               }
+               return;
             }
-            return;
          }
 
          for (ConsumerHolder consumerHolder : consumers) {
@@ -3955,11 +3961,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
             if (consumer instanceof ServerConsumerImpl) {
                ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
                float consumerRate = serverConsumer.getRate();
-               if (queueRate < threshold) {
-                  if (logger.isDebugEnabled()) {
-                     logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
-                  }
-               } else if (consumerRate < threshold) {
+               if (consumerRate < threshold) {
                   RemotingConnection connection = null;
                   ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer();
                   RemotingService remotingService = server.getRemotingService();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
index 0e43197..8a83845 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
@@ -59,7 +59,8 @@ public class SlowConsumerTest extends ActiveMQTestBase {
 
    private static final Logger logger = Logger.getLogger(SlowConsumerTest.class);
 
-   int threshold = 10;
+   private int threshold = 10;
+   private long checkPeriod = 1;
    private boolean isNetty = false;
    private boolean isPaging = false;
 
@@ -89,7 +90,7 @@ public class SlowConsumerTest extends ActiveMQTestBase {
       server = createServer(true, isNetty);
 
       AddressSettings addressSettings = new AddressSettings();
-      addressSettings.setSlowConsumerCheckPeriod(1);
+      addressSettings.setSlowConsumerCheckPeriod(checkPeriod);
       addressSettings.setSlowConsumerThreshold(threshold);
       addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
 
@@ -142,6 +143,67 @@ public class SlowConsumerTest extends ActiveMQTestBase {
       }
    }
 
+   @Test
+   public void testSlowConsumerKilledAfterBurst() throws Exception {
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = addClientSession(sf.createSession(false, true, true, false));
+
+      ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
+
+      assertPaging();
+
+      final int numMessages = 3 * threshold;
+
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(createTextMessage(session, "m" + i));
+      }
+
+      ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
+      session.start();
+
+      for (int i = 0; i < threshold; i++) {
+         consumer.receiveImmediate().individualAcknowledge();
+      }
+
+      Thread.sleep(3 *  checkPeriod * 1000);
+
+      try {
+         consumer.receiveImmediate();
+         fail();
+      } catch (ActiveMQObjectClosedException e) {
+         assertEquals(e.getType(), ActiveMQExceptionType.OBJECT_CLOSED);
+      }
+   }
+
+   @Test
+   public void testSlowConsumerSparedAfterBurst() throws Exception {
+      ClientSessionFactory sf = createSessionFactory(locator);
+
+      ClientSession session = addClientSession(sf.createSession(false, true, true, false));
+
+      ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
+
+      assertPaging();
+
+      final int numMessages = 3 * threshold + 1;
+
+      for (int i = 0; i < numMessages; i++) {
+         producer.send(createTextMessage(session, "m" + i));
+      }
+
+      ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
+      session.start();
+
+      for (int i = 0; i < 3 * threshold; i++) {
+         consumer.receiveImmediate().individualAcknowledge();
+      }
+
+      Thread.sleep(3 *  checkPeriod * 1000);
+
+      assertNotNull(consumer.receiveImmediate());
+   }
+
    private void assertPaging() throws Exception {
       Queue queue = server.locateQueue(QUEUE);
       if (isPaging) {