You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/05/31 16:21:35 UTC
[activemq-artemis] branch master updated: ARTEMIS-1011 Slow
consumer detection after bursts
This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new ef2ec7b ARTEMIS-1011 Slow consumer detection after bursts
new f0c023e This closes #2688
ef2ec7b is described below
commit ef2ec7b56ea7dcae33c17dcadab05411ad8be282
Author: brusdev <br...@gmail.com>
AuthorDate: Thu May 30 14:14:53 2019 +0200
ARTEMIS-1011 Slow consumer detection after bursts
Fix slow consumer detection after a burst, taking into account messages
which are already in queue at the start of queueRateCheckTime period.
---
.../artemis/core/server/impl/QueueImpl.java | 22 ++++----
.../tests/integration/client/SlowConsumerTest.java | 66 +++++++++++++++++++++-
2 files changed, 76 insertions(+), 12 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 76c0219..a772529 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3935,19 +3935,25 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
@Override
public void run() {
float queueRate = getRate();
+ long queueMessages = getMessageCount();
+
if (logger.isDebugEnabled()) {
- logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
+ logger.debug(getAddress() + ":" + getName() + " has " + queueMessages + " message(s) and " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
}
if (consumers.size() == 0) {
logger.debug("There are no consumers, no need to check slow consumer's rate");
return;
- } else if (queueRate < (threshold * consumers.size())) {
- if (logger.isDebugEnabled()) {
- logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
+ } else {
+ float queueThreshold = threshold * consumers.size();
+
+ if (queueRate < queueThreshold && queueMessages < queueThreshold) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
+ }
+ return;
}
- return;
}
for (ConsumerHolder consumerHolder : consumers) {
@@ -3955,11 +3961,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
if (consumer instanceof ServerConsumerImpl) {
ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
float consumerRate = serverConsumer.getRate();
- if (queueRate < threshold) {
- if (logger.isDebugEnabled()) {
- logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
- }
- } else if (consumerRate < threshold) {
+ if (consumerRate < threshold) {
RemotingConnection connection = null;
ActiveMQServer server = ((PostOfficeImpl) postOffice).getServer();
RemotingService remotingService = server.getRemotingService();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
index 0e43197..8a83845 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
@@ -59,7 +59,8 @@ public class SlowConsumerTest extends ActiveMQTestBase {
private static final Logger logger = Logger.getLogger(SlowConsumerTest.class);
- int threshold = 10;
+ private int threshold = 10;
+ private long checkPeriod = 1;
private boolean isNetty = false;
private boolean isPaging = false;
@@ -89,7 +90,7 @@ public class SlowConsumerTest extends ActiveMQTestBase {
server = createServer(true, isNetty);
AddressSettings addressSettings = new AddressSettings();
- addressSettings.setSlowConsumerCheckPeriod(1);
+ addressSettings.setSlowConsumerCheckPeriod(checkPeriod);
addressSettings.setSlowConsumerThreshold(threshold);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
@@ -142,6 +143,67 @@ public class SlowConsumerTest extends ActiveMQTestBase {
}
}
+ @Test
+ public void testSlowConsumerKilledAfterBurst() throws Exception {
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = addClientSession(sf.createSession(false, true, true, false));
+
+ ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
+
+ assertPaging();
+
+ final int numMessages = 3 * threshold;
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(createTextMessage(session, "m" + i));
+ }
+
+ ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
+ session.start();
+
+ for (int i = 0; i < threshold; i++) {
+ consumer.receiveImmediate().individualAcknowledge();
+ }
+
+ Thread.sleep(3 * checkPeriod * 1000);
+
+ try {
+ consumer.receiveImmediate();
+ fail();
+ } catch (ActiveMQObjectClosedException e) {
+ assertEquals(e.getType(), ActiveMQExceptionType.OBJECT_CLOSED);
+ }
+ }
+
+ @Test
+ public void testSlowConsumerSparedAfterBurst() throws Exception {
+ ClientSessionFactory sf = createSessionFactory(locator);
+
+ ClientSession session = addClientSession(sf.createSession(false, true, true, false));
+
+ ClientProducer producer = addClientProducer(session.createProducer(QUEUE));
+
+ assertPaging();
+
+ final int numMessages = 3 * threshold + 1;
+
+ for (int i = 0; i < numMessages; i++) {
+ producer.send(createTextMessage(session, "m" + i));
+ }
+
+ ClientConsumer consumer = addClientConsumer(session.createConsumer(QUEUE));
+ session.start();
+
+ for (int i = 0; i < 3 * threshold; i++) {
+ consumer.receiveImmediate().individualAcknowledge();
+ }
+
+ Thread.sleep(3 * checkPeriod * 1000);
+
+ assertNotNull(consumer.receiveImmediate());
+ }
+
private void assertPaging() throws Exception {
Queue queue = server.locateQueue(QUEUE);
if (isPaging) {