You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by mi...@apache.org on 2018/08/08 16:38:12 UTC
[1/2] activemq-artemis git commit: This closes #2203
Repository: activemq-artemis
Updated Branches:
refs/heads/master 99469b1cf -> 3d2eadfb8
This closes #2203
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3d2eadfb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3d2eadfb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3d2eadfb
Branch: refs/heads/master
Commit: 3d2eadfb8a9d51d9f274ef2b20dd908f2d88bd71
Parents: 99469b1 8dd0e94
Author: Michael Andre Pearce <mi...@me.com>
Authored: Wed Aug 8 17:37:58 2018 +0100
Committer: Michael Andre Pearce <mi...@me.com>
Committed: Wed Aug 8 17:37:58 2018 +0100
----------------------------------------------------------------------
.../unit/core/server/impl/QueueImplTest.java | 60 ++++++++++++++++++++
1 file changed, 60 insertions(+)
----------------------------------------------------------------------
[2/2] activemq-artemis git commit: ARTEMIS-1999 Broker uses 100%
core's CPU time if msg grouping is used
Posted by mi...@apache.org.
ARTEMIS-1999 Broker uses 100% core's CPU time if msg grouping is used
The deliver loop won't give up trying to deliver messages when
back-pressure kicks in (credits and/or TCP) if msg grouping is used and
there are many consumers registered: this change will allow the loop
to exit by instructing the logic that the group consumer is the only
consumer to check.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8dd0e947
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8dd0e947
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8dd0e947
Branch: refs/heads/master
Commit: 8dd0e9472fc9eaf08c3d64c3935aeafbf04a422a
Parents: 99469b1
Author: Francesco Nigro <ni...@gmail.com>
Authored: Tue Jul 31 11:16:26 2018 +0200
Committer: Michael Andre Pearce <mi...@me.com>
Committed: Wed Aug 8 17:37:58 2018 +0100
----------------------------------------------------------------------
.../unit/core/server/impl/QueueImplTest.java | 60 ++++++++++++++++++++
1 file changed, 60 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8dd0e947/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
index 0aa6e5c..b0987aa 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/QueueImplTest.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -1289,6 +1290,65 @@ public class QueueImplTest extends ActiveMQTestBase {
}
}
+ @Test
+ public void testGroupMessageWithManyConsumers() throws Exception {
+ final CountDownLatch firstMessageHandled = new CountDownLatch(1);
+ final CountDownLatch finished = new CountDownLatch(2);
+ final Consumer groupConsumer = new FakeConsumer() {
+
+ int count = 0;
+
+ @Override
+ public synchronized HandleStatus handle(MessageReference reference) {
+ if (count == 0) {
+ //the first message is handled and will be used to determine this consumer
+ //to be the group consumer
+ count++;
+ firstMessageHandled.countDown();
+ return HandleStatus.HANDLED;
+ } else if (count <= 2) {
+ //the next two attempts to send the second message will be done
+ //attempting a direct delivery and an async one after that
+ count++;
+ finished.countDown();
+ return HandleStatus.BUSY;
+ } else {
+ //this shouldn't happen, because the last attempt to deliver
+ //the second message should have stop the delivery loop:
+ //it will succeed just to let the message being handled and
+ //reduce the message count to 0
+ return HandleStatus.HANDLED;
+ }
+ }
+ };
+ final Consumer noConsumer = new FakeConsumer() {
+ @Override
+ public synchronized HandleStatus handle(MessageReference reference) {
+ Assert.fail("this consumer isn't allowed to consume any message");
+ throw new AssertionError();
+ }
+ };
+ final QueueImpl queue = new QueueImpl(1, new SimpleString("address1"), QueueImplTest.queue1,
+ null, null, false, true, false,
+ scheduledExecutor, null, null, null,
+ ArtemisExecutor.delegate(executor), null, null);
+ queue.addConsumer(groupConsumer);
+ queue.addConsumer(noConsumer);
+ final MessageReference firstMessageReference = generateReference(queue, 1);
+ final SimpleString groupName = SimpleString.toSimpleString("group");
+ firstMessageReference.getMessage().putStringProperty(Message.HDR_GROUP_ID, groupName);
+ final MessageReference secondMessageReference = generateReference(queue, 2);
+ secondMessageReference.getMessage().putStringProperty(Message.HDR_GROUP_ID, groupName);
+ queue.addTail(firstMessageReference, true);
+ Assert.assertTrue("first message isn't handled", firstMessageHandled.await(3000, TimeUnit.MILLISECONDS));
+ Assert.assertEquals("group consumer isn't correctly set", groupConsumer, queue.getGroups().get(groupName));
+ queue.addTail(secondMessageReference, true);
+ final boolean atLeastTwoDeliverAttempts = finished.await(3000, TimeUnit.MILLISECONDS);
+ Assert.assertTrue(atLeastTwoDeliverAttempts);
+ Thread.sleep(1000);
+ Assert.assertEquals("The second message should be in the queue", 1, queue.getMessageCount());
+ }
+
private QueueImpl getNonDurableQueue() {
return getQueue(QueueImplTest.queue1, false, false, null);
}