You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/01/16 16:37:24 UTC
[1/3] activemq-artemis git commit: This closes #960
Repository: activemq-artemis
Updated Branches:
refs/heads/master 490bec9d9 -> 40c86b2ee
This closes #960
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/40c86b2e
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/40c86b2e
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/40c86b2e
Branch: refs/heads/master
Commit: 40c86b2eed1b640ce5eaabbd7e60614331a3d407
Parents: 490bec9 599aaa5
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Jan 16 11:37:12 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 16 11:37:12 2017 -0500
----------------------------------------------------------------------
.../artemis/core/server/impl/QueueImpl.java | 15 +-
.../integration/client/SlowConsumerTest.java | 217 ++++++++++++++++++-
2 files changed, 230 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
[3/3] activemq-artemis git commit: ARTEMIS-921 Consumers killed as
slow even if overall consuming rate is above threshold
Posted by cl...@apache.org.
ARTEMIS-921 Consumers killed as slow even if overall consuming rate is above threshold
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/90cf2398
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/90cf2398
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/90cf2398
Branch: refs/heads/master
Commit: 90cf2398296d9de83145043c6dc0d8f6a6d4c068
Parents: 490bec9
Author: Howard Gao <ho...@gmail.com>
Authored: Mon Jan 16 22:00:05 2017 +0800
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 16 11:37:12 2017 -0500
----------------------------------------------------------------------
.../artemis/core/server/impl/QueueImpl.java | 3 +
.../client/MultipleSlowConsumerTest.java | 261 +++++++++++++++++++
2 files changed, 264 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/90cf2398/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index a74b0fe..87a5bd7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3159,6 +3159,9 @@ public class QueueImpl implements Queue {
connection.killMessage(server.getNodeID());
remotingService.removeConnection(connection.getID());
connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress()));
+ //break once a consumer gets killed. This can prevent all
+ //consumers to this queue get killed all at once.
+ break;
} else if (policy.equals(SlowConsumerPolicy.NOTIFY)) {
TypedProperties props = new TypedProperties();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/90cf2398/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java
new file mode 100644
index 0000000..37ae528
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.client;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.RoutingType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
+import org.apache.activemq.artemis.utils.TimeUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Set;
+
+public class MultipleSlowConsumerTest extends ActiveMQTestBase {
+
+ private int checkPeriod = 3;
+ private int threshold = 1;
+
+ private ActiveMQServer server;
+
+ private final SimpleString QUEUE = new SimpleString("SlowConsumerTestQueue");
+
+ private ServerLocator locator;
+
+ @Before
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+
+ server = createServer(true, true);
+
+ AddressSettings addressSettings = new AddressSettings();
+ addressSettings.setSlowConsumerCheckPeriod(checkPeriod);
+ addressSettings.setSlowConsumerThreshold(threshold);
+ addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
+
+ server.start();
+
+ server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
+
+ server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false);
+
+ locator = createFactory(true);
+ }
+
+ /**
+ * This test creates 3 consumers on one queue. A producer sends
+ * messages at a rate of 2 mesages per second. Each consumer
+ * consumes messages at rate of 1 message per second. The slow
+ * consumer threshold is 1 message per second.
+ * Based on the above settings, at least one of the consumers
+ * will be removed during the test, but at least one of the
+ * consumers will remain and all messages will be received.
+ */
+ @Test
+ public void testMultipleConsumersOneQueue() throws Exception {
+ locator.setAckBatchSize(0);
+
+ ClientSessionFactory sf1 = createSessionFactory(locator);
+ ClientSessionFactory sf2 = createSessionFactory(locator);
+ ClientSessionFactory sf3 = createSessionFactory(locator);
+ ClientSessionFactory sf4 = createSessionFactory(locator);
+
+ final int messages = 10;
+
+ FixedRateProducer producer = new FixedRateProducer(sf1, QUEUE, messages);
+
+ final Set<FixedRateConsumer> consumers = new ConcurrentHashSet<>();
+ final Set<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
+
+ consumers.add(new FixedRateConsumer(sf2, QUEUE, consumers, receivedMessages, 1));
+ consumers.add(new FixedRateConsumer(sf3, QUEUE, consumers, receivedMessages, 2));
+ consumers.add(new FixedRateConsumer(sf4, QUEUE, consumers, receivedMessages, 3));
+
+ try {
+ producer.start(threshold * 1000 / 2);
+
+ for (FixedRateConsumer consumer : consumers) {
+ consumer.start(threshold * 1000);
+ }
+
+ //check at least one consumer is killed
+ //but at least one survived
+ //and all messages are received.
+ assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() < 3));
+ assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() > 0));
+ assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> receivedMessages.size() == messages));
+ assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() > 0));
+ } finally {
+ producer.stopRunning();
+ for (FixedRateConsumer consumer : consumers) {
+ consumer.stopRunning();
+ }
+ System.out.println("***report messages received: " + receivedMessages.size());
+ System.out.println("***consumers left: " + consumers.size());
+ }
+ }
+
+ private class FixedRateProducer extends FixedRateClient {
+
+ int messages;
+ ClientProducer producer;
+
+ FixedRateProducer(ClientSessionFactory sf, SimpleString queue, int messages) throws ActiveMQException {
+ super(sf, queue);
+ this.messages = messages;
+ }
+
+ @Override
+ protected void prepareWork() throws ActiveMQException {
+ super.prepareWork();
+ this.producer = session.createProducer(queue);
+ }
+
+ @Override
+ protected void doWork(int count) throws Exception {
+
+ if (count < messages) {
+ ClientMessage m = createTextMessage(session, "msg" + count);
+ producer.send(m);
+ System.out.println("producer sent a message " + count);
+ } else {
+ stopRunning();
+ }
+ }
+ }
+
+ private class FixedRateConsumer extends FixedRateClient {
+
+ Set<FixedRateConsumer> consumers;
+ ClientConsumer consumer;
+ Set<ClientMessage> receivedMessages;
+ int id;
+
+ FixedRateConsumer(ClientSessionFactory sf, SimpleString queue,
+ Set<FixedRateConsumer> consumers, Set<ClientMessage> receivedMessages,
+ int id) throws ActiveMQException {
+ super(sf, queue);
+ this.consumers = consumers;
+ this.receivedMessages = receivedMessages;
+ this.id = id;
+ }
+
+ @Override
+ protected void prepareWork() throws ActiveMQException {
+ super.prepareWork();
+ this.consumer = session.createConsumer(queue);
+ this.session.start();
+ }
+
+ @Override
+ protected void doWork(int count) throws Exception {
+ ClientMessage m = this.consumer.receive(rate);
+ System.out.println("consumer " + id + " got m: " + m);
+ if (m != null) {
+ receivedMessages.add(m);
+ m.acknowledge();
+ System.out.println("acked " + m.getClass().getName() + "now total received: " + receivedMessages.size());
+ }
+ }
+
+ @Override
+ protected void handleError(int count, Exception e) {
+ System.err.println("Got error receiving message " + count + " remove self " + this.id);
+ consumers.remove(this);
+ e.printStackTrace();
+ }
+
+ }
+
+ private abstract class FixedRateClient extends Thread {
+
+ protected ClientSessionFactory sf;
+ protected SimpleString queue;
+ protected ClientSession session;
+ protected int rate;
+ protected volatile boolean working;
+
+ FixedRateClient(ClientSessionFactory sf, SimpleString queue) throws ActiveMQException {
+ this.sf = sf;
+ this.queue = queue;
+ }
+
+ public void start(int rate) {
+ this.rate = rate;
+ working = true;
+ start();
+ }
+
+ protected void prepareWork() throws ActiveMQException {
+ this.session = addClientSession(sf.createSession(true, true));
+ }
+
+ @Override
+ public void run() {
+ try {
+ prepareWork();
+ } catch (ActiveMQException e) {
+ System.out.println("got error in prepareWork(), aborting...");
+ e.printStackTrace();
+ return;
+ }
+ int count = 0;
+ while (working) {
+ try {
+ doWork(count);
+ Thread.sleep(rate);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (Exception e) {
+ System.err.println(this + " got exception ");
+ e.printStackTrace();
+ handleError(count, e);
+ working = false;
+ } finally {
+ count++;
+ }
+ }
+ }
+
+ protected abstract void doWork(int count) throws Exception;
+
+ protected void handleError(int count, Exception e) {
+ }
+
+ public void stopRunning() {
+ working = false;
+ interrupt();
+ try {
+ join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
[2/3] activemq-artemis git commit: ARTEMIS-921 Fixing Slow Consumer
when multiple consumers on same queue
Posted by cl...@apache.org.
ARTEMIS-921 Fixing Slow Consumer when multiple consumers on same queue
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/599aaa53
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/599aaa53
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/599aaa53
Branch: refs/heads/master
Commit: 599aaa5345bb75a82da00c633575ac62fecf1bd6
Parents: 90cf239
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Jan 16 10:27:27 2017 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Jan 16 11:37:12 2017 -0500
----------------------------------------------------------------------
.../artemis/core/server/impl/QueueImpl.java | 18 +-
.../client/MultipleSlowConsumerTest.java | 261 -------------------
.../integration/client/SlowConsumerTest.java | 217 ++++++++++++++-
3 files changed, 230 insertions(+), 266 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/599aaa53/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 87a5bd7..ecc67bf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3132,7 +3132,20 @@ public class QueueImpl implements Queue {
if (logger.isDebugEnabled()) {
logger.debug(getAddress() + ":" + getName() + " has " + getConsumerCount() + " consumer(s) and is receiving messages at a rate of " + queueRate + " msgs/second.");
}
- for (Consumer consumer : getConsumers()) {
+
+ Set<Consumer> consumersSet = getConsumers();
+
+ if (consumersSet.size() == 0) {
+ logger.debug("There are no consumers, no need to check slow consumer's rate");
+ return;
+ } else if (queueRate < (threshold * consumersSet.size())) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Insufficient messages received on queue \"" + getName() + "\" to satisfy slow-consumer-threshold. Skipping inspection of consumer.");
+ }
+ return;
+ }
+
+ for (Consumer consumer : consumersSet) {
if (consumer instanceof ServerConsumerImpl) {
ServerConsumerImpl serverConsumer = (ServerConsumerImpl) consumer;
float consumerRate = serverConsumer.getRate();
@@ -3159,9 +3172,6 @@ public class QueueImpl implements Queue {
connection.killMessage(server.getNodeID());
remotingService.removeConnection(connection.getID());
connection.fail(ActiveMQMessageBundle.BUNDLE.connectionsClosedByManagement(connection.getRemoteAddress()));
- //break once a consumer gets killed. This can prevent all
- //consumers to this queue get killed all at once.
- break;
} else if (policy.equals(SlowConsumerPolicy.NOTIFY)) {
TypedProperties props = new TypedProperties();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/599aaa53/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java
deleted file mode 100644
index 37ae528..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/MultipleSlowConsumerTest.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.client;
-
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.api.core.client.ClientConsumer;
-import org.apache.activemq.artemis.api.core.client.ClientMessage;
-import org.apache.activemq.artemis.api.core.client.ClientProducer;
-import org.apache.activemq.artemis.api.core.client.ClientSession;
-import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
-import org.apache.activemq.artemis.api.core.client.ServerLocator;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.RoutingType;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
-import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
-import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.apache.activemq.artemis.utils.ConcurrentHashSet;
-import org.apache.activemq.artemis.utils.TimeUtils;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.Set;
-
-public class MultipleSlowConsumerTest extends ActiveMQTestBase {
-
- private int checkPeriod = 3;
- private int threshold = 1;
-
- private ActiveMQServer server;
-
- private final SimpleString QUEUE = new SimpleString("SlowConsumerTestQueue");
-
- private ServerLocator locator;
-
- @Before
- @Override
- public void setUp() throws Exception {
- super.setUp();
-
- server = createServer(true, true);
-
- AddressSettings addressSettings = new AddressSettings();
- addressSettings.setSlowConsumerCheckPeriod(checkPeriod);
- addressSettings.setSlowConsumerThreshold(threshold);
- addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
-
- server.start();
-
- server.getAddressSettingsRepository().addMatch(QUEUE.toString(), addressSettings);
-
- server.createQueue(QUEUE, RoutingType.ANYCAST, QUEUE, null, true, false);
-
- locator = createFactory(true);
- }
-
- /**
- * This test creates 3 consumers on one queue. A producer sends
- * messages at a rate of 2 mesages per second. Each consumer
- * consumes messages at rate of 1 message per second. The slow
- * consumer threshold is 1 message per second.
- * Based on the above settings, at least one of the consumers
- * will be removed during the test, but at least one of the
- * consumers will remain and all messages will be received.
- */
- @Test
- public void testMultipleConsumersOneQueue() throws Exception {
- locator.setAckBatchSize(0);
-
- ClientSessionFactory sf1 = createSessionFactory(locator);
- ClientSessionFactory sf2 = createSessionFactory(locator);
- ClientSessionFactory sf3 = createSessionFactory(locator);
- ClientSessionFactory sf4 = createSessionFactory(locator);
-
- final int messages = 10;
-
- FixedRateProducer producer = new FixedRateProducer(sf1, QUEUE, messages);
-
- final Set<FixedRateConsumer> consumers = new ConcurrentHashSet<>();
- final Set<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
-
- consumers.add(new FixedRateConsumer(sf2, QUEUE, consumers, receivedMessages, 1));
- consumers.add(new FixedRateConsumer(sf3, QUEUE, consumers, receivedMessages, 2));
- consumers.add(new FixedRateConsumer(sf4, QUEUE, consumers, receivedMessages, 3));
-
- try {
- producer.start(threshold * 1000 / 2);
-
- for (FixedRateConsumer consumer : consumers) {
- consumer.start(threshold * 1000);
- }
-
- //check at least one consumer is killed
- //but at least one survived
- //and all messages are received.
- assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() < 3));
- assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() > 0));
- assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> receivedMessages.size() == messages));
- assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> consumers.size() > 0));
- } finally {
- producer.stopRunning();
- for (FixedRateConsumer consumer : consumers) {
- consumer.stopRunning();
- }
- System.out.println("***report messages received: " + receivedMessages.size());
- System.out.println("***consumers left: " + consumers.size());
- }
- }
-
- private class FixedRateProducer extends FixedRateClient {
-
- int messages;
- ClientProducer producer;
-
- FixedRateProducer(ClientSessionFactory sf, SimpleString queue, int messages) throws ActiveMQException {
- super(sf, queue);
- this.messages = messages;
- }
-
- @Override
- protected void prepareWork() throws ActiveMQException {
- super.prepareWork();
- this.producer = session.createProducer(queue);
- }
-
- @Override
- protected void doWork(int count) throws Exception {
-
- if (count < messages) {
- ClientMessage m = createTextMessage(session, "msg" + count);
- producer.send(m);
- System.out.println("producer sent a message " + count);
- } else {
- stopRunning();
- }
- }
- }
-
- private class FixedRateConsumer extends FixedRateClient {
-
- Set<FixedRateConsumer> consumers;
- ClientConsumer consumer;
- Set<ClientMessage> receivedMessages;
- int id;
-
- FixedRateConsumer(ClientSessionFactory sf, SimpleString queue,
- Set<FixedRateConsumer> consumers, Set<ClientMessage> receivedMessages,
- int id) throws ActiveMQException {
- super(sf, queue);
- this.consumers = consumers;
- this.receivedMessages = receivedMessages;
- this.id = id;
- }
-
- @Override
- protected void prepareWork() throws ActiveMQException {
- super.prepareWork();
- this.consumer = session.createConsumer(queue);
- this.session.start();
- }
-
- @Override
- protected void doWork(int count) throws Exception {
- ClientMessage m = this.consumer.receive(rate);
- System.out.println("consumer " + id + " got m: " + m);
- if (m != null) {
- receivedMessages.add(m);
- m.acknowledge();
- System.out.println("acked " + m.getClass().getName() + "now total received: " + receivedMessages.size());
- }
- }
-
- @Override
- protected void handleError(int count, Exception e) {
- System.err.println("Got error receiving message " + count + " remove self " + this.id);
- consumers.remove(this);
- e.printStackTrace();
- }
-
- }
-
- private abstract class FixedRateClient extends Thread {
-
- protected ClientSessionFactory sf;
- protected SimpleString queue;
- protected ClientSession session;
- protected int rate;
- protected volatile boolean working;
-
- FixedRateClient(ClientSessionFactory sf, SimpleString queue) throws ActiveMQException {
- this.sf = sf;
- this.queue = queue;
- }
-
- public void start(int rate) {
- this.rate = rate;
- working = true;
- start();
- }
-
- protected void prepareWork() throws ActiveMQException {
- this.session = addClientSession(sf.createSession(true, true));
- }
-
- @Override
- public void run() {
- try {
- prepareWork();
- } catch (ActiveMQException e) {
- System.out.println("got error in prepareWork(), aborting...");
- e.printStackTrace();
- return;
- }
- int count = 0;
- while (working) {
- try {
- doWork(count);
- Thread.sleep(rate);
- } catch (InterruptedException e) {
- e.printStackTrace();
- } catch (Exception e) {
- System.err.println(this + " got exception ");
- e.printStackTrace();
- handleError(count, e);
- working = false;
- } finally {
- count++;
- }
- }
- }
-
- protected abstract void doWork(int count) throws Exception;
-
- protected void handleError(int count, Exception e) {
- }
-
- public void stopRunning() {
- working = false;
- interrupt();
- try {
- join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/599aaa53/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
index 5475778..88f7c72 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SlowConsumerTest.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.client;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -42,7 +43,10 @@ import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.SlowConsumerPolicy;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.RandomUtil;
+import org.apache.activemq.artemis.utils.TimeUtils;
+import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -52,6 +56,9 @@ import org.junit.runners.Parameterized;
@RunWith(value = Parameterized.class)
public class SlowConsumerTest extends ActiveMQTestBase {
+ private static final Logger logger = Logger.getLogger(SlowConsumerTest.class);
+
+ int threshold = 10;
private boolean isNetty = false;
private boolean isPaging = false;
@@ -82,7 +89,7 @@ public class SlowConsumerTest extends ActiveMQTestBase {
AddressSettings addressSettings = new AddressSettings();
addressSettings.setSlowConsumerCheckPeriod(1);
- addressSettings.setSlowConsumerThreshold(10);
+ addressSettings.setSlowConsumerThreshold(threshold);
addressSettings.setSlowConsumerPolicy(SlowConsumerPolicy.KILL);
if (isPaging) {
@@ -347,4 +354,212 @@ public class SlowConsumerTest extends ActiveMQTestBase {
assertEquals(e.getType(), ActiveMQExceptionType.OBJECT_CLOSED);
}
}
+
+ /**
+ * This test creates 3 consumers on one queue. A producer sends
+ * messages at a rate of 2 mesages per second. Each consumer
+ * consumes messages at rate of 1 message per second. The slow
+ * consumer threshold is 1 message per second.
+ * Based on the above settings, at least one of the consumers
+ * will be removed during the test, but at least one of the
+ * consumers will remain and all messages will be received.
+ */
+ @Test
+ public void testMultipleConsumersOneQueue() throws Exception {
+ locator.setAckBatchSize(0);
+
+ Queue queue = server.locateQueue(QUEUE);
+
+ ClientSessionFactory sf1 = createSessionFactory(locator);
+ ClientSessionFactory sf2 = createSessionFactory(locator);
+ ClientSessionFactory sf3 = createSessionFactory(locator);
+ ClientSessionFactory sf4 = createSessionFactory(locator);
+
+ final int messages = 10 * threshold;
+
+ FixedRateProducer producer = new FixedRateProducer(threshold * 2, sf1, QUEUE, messages);
+
+ final Set<FixedRateConsumer> consumers = new ConcurrentHashSet<>();
+ final Set<ClientMessage> receivedMessages = new ConcurrentHashSet<>();
+
+ consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf2, QUEUE, 1));
+ consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf3, QUEUE, 2));
+ consumers.add(new FixedRateConsumer(threshold, receivedMessages, sf4, QUEUE, 3));
+
+ try {
+ producer.start();
+
+ for (FixedRateConsumer consumer : consumers) {
+ consumer.start();
+ }
+
+ producer.join(10000);
+
+ assertTrue(TimeUtils.waitOnBoolean(true, 10000, () -> receivedMessages.size() == messages));
+
+ Assert.assertEquals(3, queue.getConsumerCount());
+
+ } finally {
+ producer.stopRunning();
+ Assert.assertFalse(producer.failed);
+ for (FixedRateConsumer consumer : consumers) {
+ consumer.stopRunning();
+ Assert.assertFalse(consumer.failed);
+ }
+ logger.debug("***report messages received: " + receivedMessages.size());
+ logger.debug("***consumers left: " + consumers.size());
+ }
+ }
+
+ private class FixedRateProducer extends FixedRateClient {
+
+ int messages;
+ ClientProducer producer;
+
+ FixedRateProducer(int rate, ClientSessionFactory sf, SimpleString queue, int messages) throws ActiveMQException {
+ super(sf, queue, rate);
+ this.messages = messages;
+ }
+
+ @Override
+ protected void prepareWork() throws ActiveMQException {
+ super.prepareWork();
+ this.producer = session.createProducer(queue);
+ }
+
+ @Override
+ protected void doWork(int count) throws Exception {
+
+ if (count < messages) {
+ ClientMessage m = createTextMessage(session, "msg" + count);
+ producer.send(m);
+ logger.debug("producer sent a message " + count);
+ } else {
+ this.working = false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "Producer";
+ }
+ }
+
+ private class FixedRateConsumer extends FixedRateClient {
+
+ Set<FixedRateConsumer> consumers;
+ ClientConsumer consumer;
+ final Set<ClientMessage> receivedMessages;
+ int id;
+
+ FixedRateConsumer(int rate,
+ Set<ClientMessage> receivedMessages,
+ ClientSessionFactory sf,
+ SimpleString queue,
+ int id) throws ActiveMQException {
+ super(sf, queue, rate);
+ this.id = id;
+ this.receivedMessages = receivedMessages;
+ }
+
+ @Override
+ protected void prepareWork() throws ActiveMQException {
+ super.prepareWork();
+ this.consumer = session.createConsumer(queue);
+ this.session.start();
+ }
+
+ @Override
+ protected void doWork(int count) throws Exception {
+ ClientMessage m = this.consumer.receive(1000);
+ logger.debug("consumer " + id + " got m: " + m);
+ if (m != null) {
+ receivedMessages.add(m);
+ m.acknowledge();
+ logger.debug(" consumer " + id + " acked " + m.getClass().getName() + "now total received: " + receivedMessages.size());
+ }
+ }
+
+ @Override
+ protected void handleError(int count, Exception e) {
+ failed = true;
+ System.err.println("Got error receiving message " + count + " remove self " + this.id);
+ e.printStackTrace();
+ }
+
+ @Override
+ public String toString() {
+ return "Consumer " + id;
+ }
+
+ }
+
+ private abstract class FixedRateClient extends Thread {
+
+ protected ClientSessionFactory sf;
+ protected SimpleString queue;
+ protected ClientSession session;
+ protected final int sleepTime;
+ protected volatile boolean working;
+ boolean failed;
+
+ FixedRateClient(ClientSessionFactory sf, SimpleString queue, int rate) throws ActiveMQException {
+ this.sf = sf;
+ this.queue = queue;
+ this.sleepTime = 1000 / rate;
+ }
+
+ protected void prepareWork() throws ActiveMQException {
+ this.session = addClientSession(sf.createSession(true, true));
+ }
+
+ @Override
+ public void run() {
+ working = true;
+ try {
+ prepareWork();
+ } catch (ActiveMQException e) {
+ logger.debug("got error in prepareWork(), aborting...");
+ e.printStackTrace();
+ return;
+ }
+ int count = 0;
+ while (working) {
+ try {
+ doWork(count);
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ // expected, nothing to be done
+ } catch (Exception e) {
+ failed = true;
+ handleError(count, e);
+ working = false;
+ } finally {
+ count++;
+ }
+ }
+ }
+
+ protected abstract void doWork(int count) throws Exception;
+
+ protected void handleError(int count, Exception e) {
+ }
+
+ public void stopRunning() {
+ working = false;
+ try {
+ session.close();
+ this.interrupt();
+ join(5000);
+ if (isAlive()) {
+ fail("Interrupt is not working on Working Thread");
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (Exception e) {
+ handleError(0, e);
+ }
+ }
+ }
+
}