You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2018/08/08 19:46:07 UTC
activemq-artemis git commit: ARTEMIS-1999 Broker uses 100% core's CPU
time if msg grouping is used
Repository: activemq-artemis
Updated Branches:
refs/heads/2.6.x b6ec8245c -> 0b24d0b69
ARTEMIS-1999 Broker uses 100% core's CPU time if msg grouping is used
The deliver loop won't give up trying to deliver messages when
back-pressure kicks in (credits and/or TCP) if msg grouping is used and
there are many consumers registered: this change will allow the loop
to exit by instructing the logic that the group consumer is the only
consumer to check.
(cherry picked from commit 8dd0e9472fc9eaf08c3d64c3935aeafbf04a422a)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0b24d0b6
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0b24d0b6
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0b24d0b6
Branch: refs/heads/2.6.x
Commit: 0b24d0b6925db0ad3458f077b087735b6087254a
Parents: b6ec824
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jul 31 11:16:26 2018 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 8 15:45:13 2018 -0400
----------------------------------------------------------------------
.../artemis/core/server/impl/QueueImpl.java | 9 ++-
.../unit/core/server/impl/QueueImplTest.java | 60 ++++++++++++++++++++
2 files changed, 67 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0b24d0b6/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index ccb8267..c80d4f3 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -2365,7 +2365,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
}
}
- if (pos == endPos) {
+ if (groupConsumer != null || exclusive) {
+ if (noDelivery > 0) {
+ break;
+ }
+ noDelivery = 0;
+ } else if (pos == endPos) {
// Round robin'd all
if (noDelivery == size) {
@@ -2917,7 +2922,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
return true;
}
- if (pos == startPos) {
+ if (pos == startPos || groupConsumer != null || exclusive) {
// Tried them all
break;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0b24d0b6/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index 0aa6e5c..b0987aa 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -1289,6 +1290,65 @@ public class QueueImplTest extends ActiveMQTestBase {
}
}
+ @Test
+ public void testGroupMessageWithManyConsumers() throws Exception {
+ final CountDownLatch firstMessageHandled = new CountDownLatch(1);
+ final CountDownLatch finished = new CountDownLatch(2);
+ final Consumer groupConsumer = new FakeConsumer() {
+
+ int count = 0;
+
+ @Override
+ public synchronized HandleStatus handle(MessageReference reference) {
+ if (count == 0) {
+ //the first message is handled and will be used to determine this consumer
+ //to be the group consumer
+ count++;
+ firstMessageHandled.countDown();
+ return HandleStatus.HANDLED;
+ } else if (count <= 2) {
+ //the next two attempts to send the second message will be done
+ //attempting a direct delivery and an async one after that
+ count++;
+ finished.countDown();
+ return HandleStatus.BUSY;
+ } else {
+ //this shouldn't happen, because the last attempt to deliver
+ //the second message should have stop the delivery loop:
+ //it will succeed just to let the message being handled and
+ //reduce the message count to 0
+ return HandleStatus.HANDLED;
+ }
+ }
+ };
+ final Consumer noConsumer = new FakeConsumer() {
+ @Override
+ public synchronized HandleStatus handle(MessageReference reference) {
+ Assert.fail("this consumer isn't allowed to consume any message");
+ throw new AssertionError();
+ }
+ };
+ final QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1,
+ null, null, false, true, false,
+ scheduledExecutor, null, null, null,
+ ArtemisExecutor.delegate(executor), null, null);
+ queue.addConsumer(groupConsumer);
+ queue.addConsumer(noConsumer);
+ final MessageReference firstMessageReference = generateReference(queue, 1);
+ final SimpleString groupName = SimpleString.toSimpleString("group");
+ firstMessageReference.getMessage().putStringProperty(Message.HDR_GROUP_ID, groupName);
+ final MessageReference secondMessageReference = generateReference(queue, 2);
+ secondMessageReference.getMessage().putStringProperty(Message.HDR_GROUP_ID, groupName);
+ queue.addTail(firstMessageReference, true);
+ Assert.assertTrue("first message isn't handled", firstMessageHandled.await(3000, TimeUnit.MILLISECONDS));
+ Assert.assertEquals("group consumer isn't correctly set", groupConsumer, queue.getGroups().get(groupName));
+ queue.addTail(secondMessageReference, true);
+ final boolean atLeastTwoDeliverAttempts = finished.await(3000, TimeUnit.MILLISECONDS);
+ Assert.assertTrue(atLeastTwoDeliverAttempts);
+ Thread.sleep(1000);
+ Assert.assertEquals("The second message should be in the queue", 1, queue.getMessageCount());
+ }
+
private QueueImpl getNonDurableQueue() {
return getQueue(QueueImplTest.queue1, false, false, null);
}