You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/03/10 14:27:21 UTC
[1/2] activemq-artemis git commit: Revert "ARTEMIS-1011 Small
adjustment on test" Revert "ARTEMIS-1011 adjust slow-consumer detection
logic"
Repository: activemq-artemis
Updated Branches:
refs/heads/master 9a52f51c9 -> f1891f162
Revert "ARTEMIS-1011 Small adjustment on test"
Revert "ARTEMIS-1011 adjust slow-consumer detection logic"
This reverts commit 9818206bd3aa893864eedf06d19d0c2d5c355a9c.
This reverts commit 19ebbfb5f0a10daca6f2f516efae4755613254fd.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e9ad1c81
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e9ad1c81
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e9ad1c81
Branch: refs/heads/master
Commit: e9ad1c81a539e4380435d5bd79e0d18d18d707d6
Parents: 9a52f51
Author: Justin Bertram <jb...@apache.org>
Authored: Fri Mar 10 08:14:58 2017 -0600
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Mar 10 09:26:41 2017 -0500
----------------------------------------------------------------------
.../artemis/core/server/impl/QueueImpl.java | 20 +++++-----
docs/user-manual/en/slow-consumers.md | 19 +---------
.../integration/client/SlowConsumerTest.java | 40 --------------------
3 files changed, 13 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9ad1c81/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index e04cf47..fc655f6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -233,6 +233,8 @@ public class QueueImpl implements Queue {
private final AtomicLong queueRateCheckTime = new AtomicLong(System.currentTimeMillis());
+ private final AtomicLong messagesAddedSnapshot = new AtomicLong(0);
+
private ScheduledFuture slowConsumerReaperFuture;
private SlowConsumerReaperRunnable slowConsumerReaperRunnable;
@@ -2815,11 +2817,13 @@ public class QueueImpl implements Queue {
@Override
public float getRate() {
+ long locaMessageAdded = getMessagesAdded();
float timeSlice = ((System.currentTimeMillis() - queueRateCheckTime.getAndSet(System.currentTimeMillis())) / 1000.0f);
if (timeSlice == 0) {
+ messagesAddedSnapshot.getAndSet(locaMessageAdded);
return 0.0f;
}
- return BigDecimal.valueOf(getMessageCount() / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
+ return BigDecimal.valueOf((locaMessageAdded - messagesAddedSnapshot.getAndSet(locaMessageAdded)) / timeSlice).setScale(2, BigDecimal.ROUND_UP).floatValue();
}
// Inner classes
@@ -3135,19 +3139,17 @@ public class QueueImpl implements Queue {
@Override
public void run() {
- Set<Consumer> consumersSet = getConsumers();
-
- if (consumersSet.size() == 0) {
- logger.debug("There are no consumers, no need to check slow consumer's rate");
- return;
- }
-
float queueRate = getRate();
if (logger.isDebugEnabled()) {
logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
}
- if (queueRate < (threshold * consumersSet.size())) {
+ Set<Consumer> consumersSet = getConsumers();
+
+ if (consumersSet.size() == 0) {
+ logger.debug("There are no consumers, no need to check slow consumer's rate");
+ return;
+ } else if (queueRate < (threshold * consumersSet.size())) {
if (logger.isDebugEnabled()) {
logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9ad1c81/docs/user-manual/en/slow-consumers.md
----------------------------------------------------------------------
diff --git a/docs/user-manual/en/slow-consumers.md b/docs/user-manual/en/slow-consumers.md
index 5840912..b53bac5 100644
--- a/docs/user-manual/en/slow-consumers.md
+++ b/docs/user-manual/en/slow-consumers.md
@@ -18,23 +18,8 @@ By default the server will not detect slow consumers. If slow consumer
detection is desired then see [queue attributes chapter](queue-attributes.md)
for more details.
-The calculation to determine whether or not a consumer is slow inspects two notable
-metrics:
-
-1. The queue's message count.
-
-2. The number of messages a consumer has acknowledged.
-
-The queue's message count is inspected to ensure that the queue actually has had enough
-messages to actually satisfy the consumer's threshold. For example, it would not be
-fair to mark a consumer as "slow" if the queue received no messages. This is also notable
-because in order to get an accurate message count the queue must be locked which can
-negatively impact performance in high-throughput use-cases. Therefore slow-consumer
-detection is only recommended on queues where it is absolutely necessary and in those
-cases it may be worth tuning the `slow-consumer-check-period` to ensure it's not
-running so often as to negatively impact performance.
-
-Finally, the algorithm inspects the number of messages a particular consumer has
+The calculation to determine whether or not a consumer is slow only
+inspects the number of messages a particular consumer has
*acknowledged*. It doesn't take into account whether or not flow control
has been enabled on the consumer, whether or not the consumer is
streaming a large message, etc. Keep this in mind when configuring slow
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e9ad1c81/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
index 00417ae..c81c24c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
@@ -44,7 +44,6 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.tests.util.Wait;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.TimeUtils;
@@ -245,45 +244,6 @@ public class SlowConsumerTest extends ActiveMQTestBase {
}
@Test
- public void testSlowConsumerWithBurst() throws Exception {
- ClientSessionFactory sf = createSessionFactory(locator);
-
- ClientSession session = addClientSession(sf.createSession(true, true));
-
- ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
-
- final int numMessages = 20;
-
- for (int i = 0; i < numMessages; i++) {
- producer.send(createTextMessage(session, "m" + i));
- }
-
- assertPaging();
-
- final Queue queue = server.locateQueue(QUEUE);
-
- queue.getRate();
-
- logger.info("Creating consumer...");
-
- ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
- session.start();
-
- Wait.waitFor(consumer::isClosed, 3000, 100);
-
- Assert.assertTrue(consumer.isClosed());
-
- try {
- consumer.receive(500);
- fail("Consumer should have been killed since it's slow!");
- } catch (ActiveMQObjectClosedException e) {
- // ignore
- } catch (Exception e) {
- fail("Wrong exception thrown");
- }
- }
-
- @Test
public void testFastThenSlowConsumerSpared() throws Exception {
locator.setAckBatchSize(0);
[2/2] activemq-artemis git commit: This closes #1086
Posted by cl...@apache.org.
This closes #1086
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/f1891f16
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/f1891f16
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/f1891f16
Branch: refs/heads/master
Commit: f1891f162774b0f324dee0acb2a798008fbd3f0c
Parents: 9a52f51 e9ad1c8
Author: Clebert Suconic <cl...@apache.org>
Authored: Fri Mar 10 09:27:14 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Fri Mar 10 09:27:14 2017 -0500
----------------------------------------------------------------------
.../artemis/core/server/impl/QueueImpl.java | 20 +++++-----
docs/user-manual/en/slow-consumers.md | 19 +---------
.../integration/client/SlowConsumerTest.java | 40 --------------------
3 files changed, 13 insertions(+), 66 deletions(-)
----------------------------------------------------------------------