You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ke...@apache.org on 2013/12/05 12:51:02 UTC
git commit: Test to reproduce AMQ4920
Updated Branches:
refs/heads/trunk 8b06c44cc -> 7cf5c240a
Test to reproduce AMQ4920
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7cf5c240
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7cf5c240
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7cf5c240
Branch: refs/heads/trunk
Commit: 7cf5c240a260c30572729b3a1b29863b64935a44
Parents: 8b06c44
Author: Kevin Earls <ke...@kevinearls.com>
Authored: Thu Dec 5 12:50:55 2013 +0100
Committer: Kevin Earls <ke...@kevinearls.com>
Committed: Thu Dec 5 12:50:55 2013 +0100
----------------------------------------------------------------------
.../activemq/transport/amqp/AMQ4920Test.java | 162 +++++++++++++++++++
1 file changed, 162 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/7cf5c240/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
new file mode 100644
index 0000000..5d6d473
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class AMQ4920Test extends AmqpTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(AMQ4920Test.class);
+ private static final Integer ITERATIONS = 1 * 1000;
+ private static final Integer CONSUMER_COUNT = 4; // At least 2 consumers are required to reproduce the original issue
+ public static final String TEXT_MESSAGE = "TextMessage: ";
+ private CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT * ITERATIONS);
+
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ this.autoFailTestSupport.setAutoFail(false);
+ }
+
+ @Test(timeout = 5 * 60 * 1000)
+ public void testSendWithMultipleConsumers() throws Exception {
+ ConnectionFactory connectionFactory = (ConnectionFactory) new ConnectionFactoryImpl("localhost", port, "admin", "admin");
+ Connection connection = connectionFactory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ String destinationName = "topic://AMQ4920Test" + System.currentTimeMillis();
+ Destination destination = session.createTopic(destinationName);
+ connection.start();
+
+ ExecutorService executor = Executors.newCachedThreadPool();
+ for (int i=0; i < CONSUMER_COUNT; i++) {
+ AMQ4930ConsumerTask consumerTask = new AMQ4930ConsumerTask(destinationName, port, "Consumer-" + i, latch, ITERATIONS);
+ executor.submit(consumerTask);
+ }
+ connection.start();
+
+ LOG.debug("At start latch is " + latch.getCount());
+ sendMessages(connection, destination, ITERATIONS, 10);
+ LOG.debug("After send latch is " + latch.getCount());
+
+ latch.await(15, TimeUnit.SECONDS);
+ LOG.debug("After await latch is " + latch.getCount());
+ assertEquals(0, latch.getCount());
+
+ executor.shutdown();
+ }
+
+ public void sendMessages(Connection connection, Destination destination, int count, int sleepInterval) throws Exception {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(destination);
+
+ for (int i = 0; i < count; i++) {
+ TextMessage message = session.createTextMessage();
+ message.setText(TEXT_MESSAGE + i);
+ LOG.debug("Sending message [" + i + "]");
+ producer.send(message);
+ if (sleepInterval > 0) {
+ Thread.sleep(sleepInterval);
+ }
+ }
+
+ session.close();
+ }
+}
+
+
+class AMQ4930ConsumerTask implements Callable<Boolean> {
+ protected static final Logger LOG = LoggerFactory.getLogger(AMQ4930ConsumerTask.class);
+ private String destinationName;
+ private String consumerName;
+ private CountDownLatch messagesReceived;
+ private int port;
+ private int expectedMessageCount;
+
+ public AMQ4930ConsumerTask (String destinationName, int port, String consumerName, CountDownLatch latch, int expectedMessageCount) {
+ this.destinationName = destinationName;
+ this.port = port;
+ this.consumerName = consumerName;
+ this.messagesReceived = latch;
+ this.expectedMessageCount = expectedMessageCount;
+ }
+
+ @Override
+ public Boolean call() throws Exception {
+ LOG.debug(consumerName + " starting");
+ Connection connection=null;
+ try {
+ ConnectionFactory connectionFactory = (ConnectionFactory) new ConnectionFactoryImpl("localhost", port, "admin", "admin");
+ connection = connectionFactory.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createTopic(destinationName);
+ MessageConsumer consumer = session.createConsumer(destination);
+ connection.start();
+
+ int receivedCount = 0;
+ while(receivedCount < expectedMessageCount) {
+ Message message = consumer.receive(5 * 1000);
+ if (message == null) {
+ LOG.error("consumer {} got null message on iteration {}", consumerName, receivedCount);
+ return false;
+ }
+ if (!(message instanceof TextMessage)) {
+ LOG.error("consumer {} expected text message on iteration {} but got {}", consumerName, receivedCount, message.getClass().getCanonicalName());
+ return false;
+ }
+ TextMessage tm = (TextMessage) message;
+ if (!tm.getText().equals(AMQ4920Test.TEXT_MESSAGE + receivedCount)) {
+ LOG.error("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText());
+ return false;
+ }
+ LOG.debug("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText()); // TODO make debug
+
+ messagesReceived.countDown();
+ receivedCount++;
+ }
+ } catch (Exception e) {
+ LOG.error("UnexpectedException in " + consumerName, e);
+ } finally {
+ try {
+ connection.close();
+ } catch (JMSException ignoreMe) {
+ }
+ }
+
+ return true;
+ }
+}
+