You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/10 14:27:21 UTC

[1/2] activemq-artemis git commit: Revert "ARTEMIS-1011 Small adjustment on test" Revert "ARTEMIS-1011 adjust slow-consumer detection logic"

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 9a52f51c9 -> f1891f162


Revert "ARTEMIS-1011 Small adjustment on test"
Revert "ARTEMIS-1011 adjust slow-consumer detection logic"

This reverts commit 9818206bd3aa893864eedf06d19d0c2d5c355a9c.
This reverts commit 19ebbfb5f0a10daca6f2f516efae4755613254fd.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e9ad1c81
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e9ad1c81
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e9ad1c81

Branch: refs/heads/master
Commit: e9ad1c81a539e4380435d5bd79e0d18d18d707d6
Parents: 9a52f51
Author: Justin Bertram <jb...@apache.org>
Authored: Fri Mar 10 08:14:58 2017 -0600
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Mar 10 09:26:41 2017 -0500

----------------------------------------------------------------------
 .../artemis/core/server/impl/QueueImpl.java     | 20 +++++-----
 docs/user-manual/en/slow-consumers.md           | 19 +---------
 .../integration/client/SlowConsumerTest.java    | 40 --------------------
 3 files changed, 13 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9ad1c81/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index e04cf47..fc655f6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -233,6 +233,8 @@ public class QueueImpl implements Queue {
 
    private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
 
+   private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
+
    private ScheduledFuture slowConsumerReaperFuture;
 
    private SlowConsumerReaperRunnable slowConsumerReaperRunnable;
@@ -2815,11 +2817,13 @@ public class QueueImpl implements Queue {
 
    @Override
    public float getRate() {
+      long locaMessageAdded = getMessagesAdded();
       float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
       if (timeSlice == 0) {
+         messagesAddedSnapshot.getAndSet(locaMessageAdded);
          return 0.0f;
       }
-      return BigDecimal.valueOf(getMessageCount() / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
+      return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
    }
 
    // Inner classes
@@ -3135,19 +3139,17 @@ public class QueueImpl implements Queue {
 
       @Override
       public void run() {
-         Set<Consumer> consumersSet = getConsumers();
-
-         if (consumersSet.size() == 0) {
-            logger.debug("There are no consumers, no need to check slow consumer's rate");
-            return;
-         }
-
          float queueRate = getRate();
          if (logger.isDebugEnabled()) {
             logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
          }
 
-         if (queueRate < (threshold * consumersSet.size())) {
+         Set<Consumer> consumersSet = getConsumers();
+
+         if (consumersSet.size() == 0) {
+            logger.debug("There are no consumers, no need to check slow consumer's rate");
+            return;
+         } else if (queueRate  < (threshold * consumersSet.size())) {
             if (logger.isDebugEnabled()) {
                logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
             }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9ad1c81/docs/user-manual/en/slow-consumers.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/slow-consumers.md b/docs/user-manual/en/slow-consumers.md
index 5840912..b53bac5 100644
--- a/docs/user-manual/en/slow-consumers.md
+++ b/docs/user-manual/en/slow-consumers.md
@@ -18,23 +18,8 @@ By default the server will not detect slow consumers. If slow consumer
 detection is desired then see [queue attributes chapter](queue-attributes.md)
 for more details.
 
-The calculation to determine whether or not a consumer is slow inspects two notable
-metrics:
-
-1. The queue's message count.
-
-2. The number of messages a consumer has acknowledged.
-
-The queue's message count is inspected to ensure that the queue actually has had enough
-messages to actually satisfy the consumer's threshold. For example, it would not be
-fair to mark a consumer as "slow" if the queue received no messages. This is also notable
-because in order to get an accurate message count the queue must be locked which can
-negatively impact performance in high-throughput use-cases. Therefore slow-consumer
-detection is only recommended on queues where it is absolutely necessary and in those
-cases it may be worth tuning the `slow-consumer-check-period` to ensure it's not
-running so often as to negatively impact performance.
-
-Finally, the algorithm inspects the number of messages a particular consumer has
+The calculation to determine whether or not a consumer is slow only
+inspects the number of messages a particular consumer has
 *acknowledged*. It doesn't take into account whether or not flow control
 has been enabled on the consumer, whether or not the consumer is
 streaming a large message, etc. Keep this in mind when configuring slow

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9ad1c81/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
index 00417ae..c81c24c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
@@ -44,7 +44,6 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
 import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
 import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.artemis.utils.ConcurrentHashSet;
 import org.apache.activemq.artemis.utils.RandomUtil;
 import org.apache.activemq.artemis.utils.TimeUtils;
@@ -245,45 +244,6 @@ public class SlowConsumerTest extends ActiveMQTestBase {
    }
 
    @Test
-   public void testSlowConsumerWithBurst() throws Exception {
-      ClientSessionFactory sf = createSessionFactory(locator);
-
-      ClientSession session = addClientSession(sf.createSession(true, true));
-
-      ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
-
-      final int numMessages = 20;
-
-      for (int i = 0; i < numMessages; i++) {
-         producer.send(createTextMessage(session, "m" + i));
-      }
-
-      assertPaging();
-
-      final Queue queue = server.locateQueue(QUEUE);
-
-      queue.getRate();
-
-      logger.info("Creating consumer...");
-
-      ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
-      session.start();
-
-      Wait.waitFor(consumer::isClosed, 3000, 100);
-
-      Assert.assertTrue(consumer.isClosed());
-
-      try {
-         consumer.receive(500);
-         fail("Consumer should have been killed since it's slow!");
-      } catch (ActiveMQObjectClosedException e) {
-         // ignore
-      } catch (Exception e) {
-         fail("Wrong exception thrown");
-      }
-   }
-
-   @Test
    public void testFastThenSlowConsumerSpared() throws Exception {
       locator.setAckBatchSize(0);
 


[2/2] activemq-artemis git commit: This closes #1086

Posted by cl...@apache.org.
This closes #1086


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f1891f16
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f1891f16
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f1891f16

Branch: refs/heads/master
Commit: f1891f162774b0f324dee0acb2a798008fbd3f0c
Parents: 9a52f51 e9ad1c8
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Mar 10 09:27:14 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Mar 10 09:27:14 2017 -0500

----------------------------------------------------------------------
 .../artemis/core/server/impl/QueueImpl.java     | 20 +++++-----
 docs/user-manual/en/slow-consumers.md           | 19 +---------
 .../integration/client/SlowConsumerTest.java    | 40 --------------------
 3 files changed, 13 insertions(+), 66 deletions(-)
----------------------------------------------------------------------