You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/12/16 20:37:15 UTC
git commit: Make sure there are Topic consumers online before
starting to send otherwise they can miss a message and the test fails when it
shouldn't
Updated Branches:
refs/heads/trunk 257710ba1 -> a6d05daba
Make sure there are Topic consumers online before starting to send
otherwise they can miss a message and the test fails when it shouldn't
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a6d05dab
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a6d05dab
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a6d05dab
Branch: refs/heads/trunk
Commit: a6d05daba6135dc3096b906643451950cd577d2f
Parents: 257710b
Author: Timothy Bish <ta...@gmai.com>
Authored: Mon Dec 16 14:37:06 2013 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Mon Dec 16 14:37:06 2013 -0500
----------------------------------------------------------------------
.../activemq/transport/amqp/AMQ4920Test.java | 56 ++++++++++++--------
1 file changed, 33 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/a6d05dab/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
index 5d6d473..72f8b11 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
@@ -16,11 +16,13 @@
*/
package org.apache.activemq.transport.amqp;
-import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -31,21 +33,22 @@ import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import static org.junit.Assert.assertEquals;
+import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AMQ4920Test extends AmqpTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(AMQ4920Test.class);
private static final Integer ITERATIONS = 1 * 1000;
private static final Integer CONSUMER_COUNT = 4; // At least 2 consumers are required to reproduce the original issue
public static final String TEXT_MESSAGE = "TextMessage: ";
- private CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT * ITERATIONS);
+ private final CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT * ITERATIONS);
+ private final CountDownLatch initLatch = new CountDownLatch(CONSUMER_COUNT);
+ @Override
@Before
public void setUp() throws Exception {
super.setUp();
@@ -54,7 +57,7 @@ public class AMQ4920Test extends AmqpTestSupport {
@Test(timeout = 5 * 60 * 1000)
public void testSendWithMultipleConsumers() throws Exception {
- ConnectionFactory connectionFactory = (ConnectionFactory) new ConnectionFactoryImpl("localhost", port, "admin", "admin");
+ ConnectionFactory connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "admin");
Connection connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String destinationName = "topic://AMQ4920Test" + System.currentTimeMillis();
@@ -63,11 +66,15 @@ public class AMQ4920Test extends AmqpTestSupport {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i=0; i < CONSUMER_COUNT; i++) {
- AMQ4930ConsumerTask consumerTask = new AMQ4930ConsumerTask(destinationName, port, "Consumer-" + i, latch, ITERATIONS);
+ AMQ4930ConsumerTask consumerTask =
+ new AMQ4930ConsumerTask(initLatch, destinationName, port, "Consumer-" + i, latch, ITERATIONS);
executor.submit(consumerTask);
}
connection.start();
+ // Make sure at least Topic consumers are subscribed before the first send.
+ initLatch.await();
+
LOG.debug("At start latch is " + latch.getCount());
sendMessages(connection, destination, ITERATIONS, 10);
LOG.debug("After send latch is " + latch.getCount());
@@ -97,16 +104,17 @@ public class AMQ4920Test extends AmqpTestSupport {
}
}
-
class AMQ4930ConsumerTask implements Callable<Boolean> {
protected static final Logger LOG = LoggerFactory.getLogger(AMQ4930ConsumerTask.class);
- private String destinationName;
- private String consumerName;
- private CountDownLatch messagesReceived;
- private int port;
- private int expectedMessageCount;
-
- public AMQ4930ConsumerTask (String destinationName, int port, String consumerName, CountDownLatch latch, int expectedMessageCount) {
+ private final String destinationName;
+ private final String consumerName;
+ private final CountDownLatch messagesReceived;
+ private final int port;
+ private final int expectedMessageCount;
+ private final CountDownLatch started;
+
+ public AMQ4930ConsumerTask (CountDownLatch started, String destinationName, int port, String consumerName, CountDownLatch latch, int expectedMessageCount) {
+ this.started = started;
this.destinationName = destinationName;
this.port = port;
this.consumerName = consumerName;
@@ -119,13 +127,15 @@ class AMQ4930ConsumerTask implements Callable<Boolean> {
LOG.debug(consumerName + " starting");
Connection connection=null;
try {
- ConnectionFactory connectionFactory = (ConnectionFactory) new ConnectionFactoryImpl("localhost", port, "admin", "admin");
+ ConnectionFactory connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "admin");
connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createTopic(destinationName);
MessageConsumer consumer = session.createConsumer(destination);
connection.start();
+ started.countDown();
+
int receivedCount = 0;
while(receivedCount < expectedMessageCount) {
Message message = consumer.receive(5 * 1000);