You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/11/09 16:58:56 UTC

[4/7] activemq-artemis git commit: ARTEMIS-1495 Test simulating a dead lock on queue auto create under stress

ARTEMIS-1495 Test simulating a dead lock on queue auto create under stress


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/8bf879f1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/8bf879f1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/8bf879f1

Branch: refs/heads/master
Commit: 8bf879f1560b958907bf8f6808bc66b8f2402431
Parents: c2a21c9
Author: Francesco Nigro <ni...@gmail.com>
Authored: Thu Nov 2 10:51:43 2017 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Nov 9 11:58:36 2017 -0500

----------------------------------------------------------------------
 .../tests/integration/client/ConsumerTest.java  | 120 +++++++++++++++++++
 1 file changed, 120 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/8bf879f1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
index af172c8..9c05114 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/ConsumerTest.java
@@ -20,6 +20,7 @@ import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
+import javax.jms.JMSException;
 import javax.jms.MapMessage;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
@@ -31,9 +32,13 @@ import java.io.Serializable;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Set;
+import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLongArray;
+import java.util.stream.Stream;
 
 import org.apache.activemq.artemis.api.core.ActiveMQException;
 import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
@@ -1074,4 +1079,119 @@ public class ConsumerTest extends ActiveMQTestBase {
       session.close();
    }
 
+   @Test
+   public void testMultipleConsumersOnSharedQueue() throws Throwable {
+      if (!isNetty() || this.durable) {
+         return;
+      }
+      final boolean durable = false;
+      final long TIMEOUT_MILLIS = TimeUnit.MINUTES.toMillis(1);
+      final int forks = 100;
+      final int queues = forks;
+      final int runs = 1;
+      final int messages = 1;
+      final ConnectionFactory factorySend = createFactory(1);
+      final AtomicLongArray receivedMessages = new AtomicLongArray(forks);
+      final Thread[] producersRunners = new Thread[forks];
+      final Thread[] consumersRunners = new Thread[forks];
+      //parties are forks (1 producer 1 consumer) + 1 controller in the main test thread
+      final CyclicBarrier onStartRun = new CyclicBarrier((forks * 2) + 1);
+      final CyclicBarrier onFinishRun = new CyclicBarrier((forks * 2) + 1);
+
+      final int messagesSent = forks * messages;
+      final AtomicInteger messagesRecieved = new AtomicInteger(0);
+
+      for (int i = 0; i < forks; i++) {
+         final int forkIndex = i;
+         final String queueName = "q_" + (forkIndex % queues);
+         final Thread producerRunner = new Thread(() -> {
+            try (Connection connection = factorySend.createConnection()) {
+               connection.start();
+               try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+                  final javax.jms.Queue queue = session.createQueue(queueName);
+                  try (MessageProducer producer = session.createProducer(queue)) {
+                     producer.setDeliveryMode(durable ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+                     for (int r = 0; r < runs; r++) {
+                        onStartRun.await();
+                        for (int m = 0; m < messages; m++) {
+                           final BytesMessage bytesMessage = session.createBytesMessage();
+                           bytesMessage.writeInt(forkIndex);
+                           producer.send(bytesMessage);
+                        }
+                        onFinishRun.await();
+                     }
+                  } catch (InterruptedException | BrokenBarrierException e) {
+                     e.printStackTrace();
+                  }
+               }
+            } catch (JMSException e) {
+               e.printStackTrace();
+            }
+         });
+
+         producerRunner.setDaemon(true);
+
+         final Thread consumerRunner = new Thread(() -> {
+            try (Connection connection = factorySend.createConnection()) {
+               connection.start();
+               try (Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE)) {
+                  final javax.jms.Queue queue = session.createQueue(queueName);
+                  try (MessageConsumer consumer = session.createConsumer(queue)) {
+                     for (int r = 0; r < runs; r++) {
+                        onStartRun.await();
+                        while (messagesRecieved.get() != messagesSent) {
+                           final BytesMessage receivedMessage = (BytesMessage) consumer.receive(1000);
+                           if (receivedMessage != null) {
+                              final int receivedConsumerIndex = receivedMessage.readInt();
+                              receivedMessages.getAndIncrement(receivedConsumerIndex);
+                              messagesRecieved.incrementAndGet();
+                           }
+                        }
+                        onFinishRun.await();
+                     }
+                  } catch (InterruptedException e) {
+                     e.printStackTrace();
+                  } catch (BrokenBarrierException e) {
+                     e.printStackTrace();
+                  }
+               }
+            } catch (JMSException e) {
+               e.printStackTrace();
+            }
+         });
+         consumerRunner.setDaemon(true);
+         consumersRunners[forkIndex] = consumerRunner;
+         producersRunners[forkIndex] = producerRunner;
+      }
+      Stream.of(consumersRunners).forEach(Thread::start);
+      Stream.of(producersRunners).forEach(Thread::start);
+      final long messagesPerRun = (forks * messages);
+      for (int r = 0; r < runs; r++) {
+         onStartRun.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+         System.out.println("started run " + r);
+         final long start = System.currentTimeMillis();
+         onFinishRun.await(TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+         final long elapsedMillis = System.currentTimeMillis() - start;
+         System.out.println((messagesPerRun * 1000L) / elapsedMillis + " msg/sec");
+      }
+      Stream.of(producersRunners).forEach(runner -> {
+         try {
+            runner.join(TIMEOUT_MILLIS * runs);
+         } catch (InterruptedException e) {
+            e.printStackTrace();
+         }
+      });
+      Stream.of(producersRunners).forEach(Thread::interrupt);
+      Stream.of(consumersRunners).forEach(runner -> {
+         try {
+            runner.join(TIMEOUT_MILLIS * runs);
+         } catch (InterruptedException e) {
+            e.printStackTrace();
+         }
+      });
+      Stream.of(consumersRunners).forEach(Thread::interrupt);
+      for (int i = 0; i < forks; i++) {
+         Assert.assertEquals("The consumer " + i + " must receive all the messages sent.", messages * runs, receivedMessages.get(i));
+      }
+   }
 }