You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/12/16 20:37:15 UTC

git commit: Make sure there are Topic consumers online before starting to send otherwise they can miss a message and the test fails when it shouldn't

Updated Branches:
  refs/heads/trunk 257710ba1 -> a6d05daba


Make sure there are Topic consumers online before starting to send
otherwise they can miss a message and the test fails when it shouldn't

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a6d05dab
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a6d05dab
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a6d05dab

Branch: refs/heads/trunk
Commit: a6d05daba6135dc3096b906643451950cd577d2f
Parents: 257710b
Author: Timothy Bish <ta...@gmai.com>
Authored: Mon Dec 16 14:37:06 2013 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Mon Dec 16 14:37:06 2013 -0500

----------------------------------------------------------------------
 .../activemq/transport/amqp/AMQ4920Test.java    | 56 ++++++++++++--------
 1 file changed, 33 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a6d05dab/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
index 5d6d473..72f8b11 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
@@ -16,11 +16,13 @@
  */
 package org.apache.activemq.transport.amqp;
 
-import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
@@ -31,21 +33,22 @@ import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.jms.TextMessage;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
-import static org.junit.Assert.assertEquals;
+import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AMQ4920Test extends AmqpTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(AMQ4920Test.class);
     private static final Integer ITERATIONS = 1 * 1000;
     private static final Integer CONSUMER_COUNT = 4;  // At least 2 consumers are required to reproduce the original issue
     public static final String TEXT_MESSAGE = "TextMessage: ";
-    private CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT * ITERATIONS);
+    private final CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT * ITERATIONS);
+    private final CountDownLatch initLatch = new CountDownLatch(CONSUMER_COUNT);
 
+    @Override
     @Before
     public void setUp() throws Exception {
         super.setUp();
@@ -54,7 +57,7 @@ public class AMQ4920Test extends AmqpTestSupport {
 
     @Test(timeout = 5 * 60 * 1000)
     public void testSendWithMultipleConsumers() throws Exception {
-        ConnectionFactory connectionFactory = (ConnectionFactory) new ConnectionFactoryImpl("localhost", port, "admin", "admin");
+        ConnectionFactory connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "admin");
         Connection connection = connectionFactory.createConnection();
         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
         String destinationName = "topic://AMQ4920Test" + System.currentTimeMillis();
@@ -63,11 +66,15 @@ public class AMQ4920Test extends AmqpTestSupport {
 
         ExecutorService executor = Executors.newCachedThreadPool();
         for (int i=0; i < CONSUMER_COUNT; i++) {
-            AMQ4930ConsumerTask consumerTask = new AMQ4930ConsumerTask(destinationName, port, "Consumer-" + i, latch, ITERATIONS);
+            AMQ4930ConsumerTask consumerTask =
+                new AMQ4930ConsumerTask(initLatch, destinationName, port, "Consumer-" + i, latch, ITERATIONS);
             executor.submit(consumerTask);
         }
         connection.start();
 
+        // Make sure at least Topic consumers are subscribed before the first send.
+        initLatch.await();
+
         LOG.debug("At start latch is " + latch.getCount());
         sendMessages(connection, destination, ITERATIONS, 10);
         LOG.debug("After send latch is " + latch.getCount());
@@ -97,16 +104,17 @@ public class AMQ4920Test extends AmqpTestSupport {
     }
 }
 
-
 class AMQ4930ConsumerTask implements Callable<Boolean> {
     protected static final Logger LOG = LoggerFactory.getLogger(AMQ4930ConsumerTask.class);
-    private String destinationName;
-    private String consumerName;
-    private CountDownLatch messagesReceived;
-    private int port;
-    private int expectedMessageCount;
-
-    public AMQ4930ConsumerTask (String destinationName, int port, String consumerName, CountDownLatch latch, int expectedMessageCount) {
+    private final String destinationName;
+    private final String consumerName;
+    private final CountDownLatch messagesReceived;
+    private final int port;
+    private final int expectedMessageCount;
+    private final CountDownLatch started;
+
+    public AMQ4930ConsumerTask (CountDownLatch started, String destinationName, int port, String consumerName, CountDownLatch latch, int expectedMessageCount) {
+        this.started = started;
         this.destinationName = destinationName;
         this.port = port;
         this.consumerName = consumerName;
@@ -119,13 +127,15 @@ class AMQ4930ConsumerTask implements Callable<Boolean> {
         LOG.debug(consumerName + " starting");
         Connection connection=null;
         try {
-            ConnectionFactory connectionFactory = (ConnectionFactory) new ConnectionFactoryImpl("localhost", port, "admin", "admin");
+            ConnectionFactory connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "admin");
             connection = connectionFactory.createConnection();
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             Destination destination = session.createTopic(destinationName);
             MessageConsumer consumer = session.createConsumer(destination);
             connection.start();
 
+            started.countDown();
+
             int receivedCount = 0;
             while(receivedCount < expectedMessageCount) {
                 Message message = consumer.receive(5 * 1000);