You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/11/09 16:58:56 UTC
[4/7] activemq-artemis git commit: ARTEMIS-1495 Test simulating a
dead lock on queue auto create under stress
ARTEMIS-1495 Test simulating a dead lock on queue auto create under stress
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8bf879f1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8bf879f1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8bf879f1
Branch: refs/heads/master
Commit: 8bf879f1560b958907bf8f6808bc66b8f2402431
Parents: c2a21c9
Author: Francesco Nigro <ni...@gmail.com>
Authored: Thu Nov 2 10:51:43 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Nov 9 11:58:36 2017 -0500
----------------------------------------------------------------------
.../tests/integration/client/ConsumerTest.java | 120 +++++++++++++++++++
1 file changed, 120 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8bf879f1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index af172c8..9c05114 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -20,6 +20,7 @@ import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
@@ -31,9 +32,13 @@ import java.io.Serializable;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.stream.Stream;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@@ -1074,4 +1079,119 @@ public class ConsumerTest extends ActiveMQTestBase {
session.close();
}
+ @Test
+ public void testMultipleConsumersOnSharedQueue() throws Throwable {
+ if (!isNetty() || this.durable) {
+ return;
+ }
+ final boolean durable = false;
+ final long TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
+ final int forks = 100;
+ final int queues = forks;
+ final int runs = 1;
+ final int messages = 1;
+ final ConnectionFactory factorySend = createFactory(1);
+ final AtomicLongArray receivedMessages = new AtomicLongArray(forks);
+ final Thread[] producersRunners = new Thread[forks];
+ final Thread[] consumersRunners = new Thread[forks];
+ //parties are forks (1 producer 1 consumer) + 1 controller in the main test thread
+ final CyclicBarrier onStartRun = new CyclicBarrier((forks * 2) + 1);
+ final CyclicBarrier onFinishRun = new CyclicBarrier((forks * 2) + 1);
+
+ final int messagesSent = forks * messages;
+ final AtomicInteger messagesRecieved = new AtomicInteger(0);
+
+ for (int i = 0; i < forks; i++) {
+ final int forkIndex = i;
+ final String queueName = "q_" + (forkIndex % queues);
+ final Thread producerRunner = new Thread(() -> {
+ try (Connection connection = factorySend.createConnection()) {
+ connection.start();
+ try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+ final javax.jms.Queue queue = session.createQueue(queueName);
+ try (MessageProducer producer = session.createProducer(queue)) {
+ producer.setDeliveryMode(durable ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ for (int r = 0; r < runs; r++) {
+ onStartRun.await();
+ for (int m = 0; m < messages; m++) {
+ final BytesMessage bytesMessage = session.createBytesMessage();
+ bytesMessage.writeInt(forkIndex);
+ producer.send(bytesMessage);
+ }
+ onFinishRun.await();
+ }
+ } catch (InterruptedException | BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ }
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ });
+
+ producerRunner.setDaemon(true);
+
+ final Thread consumerRunner = new Thread(() -> {
+ try (Connection connection = factorySend.createConnection()) {
+ connection.start();
+ try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+ final javax.jms.Queue queue = session.createQueue(queueName);
+ try (MessageConsumer consumer = session.createConsumer(queue)) {
+ for (int r = 0; r < runs; r++) {
+ onStartRun.await();
+ while (messagesRecieved.get() != messagesSent) {
+ final BytesMessage receivedMessage = (BytesMessage) consumer.receive(1000);
+ if (receivedMessage != null) {
+ final int receivedConsumerIndex = receivedMessage.readInt();
+ receivedMessages.getAndIncrement(receivedConsumerIndex);
+ messagesRecieved.incrementAndGet();
+ }
+ }
+ onFinishRun.await();
+ }
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } catch (BrokenBarrierException e) {
+ e.printStackTrace();
+ }
+ }
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
+ });
+ consumerRunner.setDaemon(true);
+ consumersRunners[forkIndex] = consumerRunner;
+ producersRunners[forkIndex] = producerRunner;
+ }
+ Stream.of(consumersRunners).forEach(Thread::start);
+ Stream.of(producersRunners).forEach(Thread::start);
+ final long messagesPerRun = (forks * messages);
+ for (int r = 0; r < runs; r++) {
+ onStartRun.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ System.out.println("started run " + r);
+ final long start = System.currentTimeMillis();
+ onFinishRun.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+ final long elapsedMillis = System.currentTimeMillis() - start;
+ System.out.println((messagesPerRun * 1000L) / elapsedMillis + " msg/sec");
+ }
+ Stream.of(producersRunners).forEach(runner -> {
+ try {
+ runner.join(TIMEOUT_MILLIS * runs);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ });
+ Stream.of(producersRunners).forEach(Thread::interrupt);
+ Stream.of(consumersRunners).forEach(runner -> {
+ try {
+ runner.join(TIMEOUT_MILLIS * runs);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ });
+ Stream.of(consumersRunners).forEach(Thread::interrupt);
+ for (int i = 0; i < forks; i++) {
+ Assert.assertEquals("The consumer " + i + " must receive all the messages sent.", messages * runs, receivedMessages.get(i));
+ }
+ }
}