You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ke...@apache.org on 2013/12/05 12:51:02 UTC

git commit: Test to reproduce AMQ4920

Updated Branches:
  refs/heads/trunk 8b06c44cc -> 7cf5c240a


Test to reproduce AMQ4920


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7cf5c240
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7cf5c240
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7cf5c240

Branch: refs/heads/trunk
Commit: 7cf5c240a260c30572729b3a1b29863b64935a44
Parents: 8b06c44
Author: Kevin Earls <ke...@kevinearls.com>
Authored: Thu Dec 5 12:50:55 2013 +0100
Committer: Kevin Earls <ke...@kevinearls.com>
Committed: Thu Dec 5 12:50:55 2013 +0100

----------------------------------------------------------------------
 .../activemq/transport/amqp/AMQ4920Test.java    | 162 +++++++++++++++++++
 1 file changed, 162 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7cf5c240/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
new file mode 100644
index 0000000..5d6d473
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AMQ4920Test.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+public class AMQ4920Test extends AmqpTestSupport {
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4920Test.class);
+    private static final Integer ITERATIONS = 1 * 1000;
+    private static final Integer CONSUMER_COUNT = 4;  // At least 2 consumers are required to reproduce the original issue
+    public static final String TEXT_MESSAGE = "TextMessage: ";
+    private CountDownLatch latch = new CountDownLatch(CONSUMER_COUNT * ITERATIONS);
+
+    @Before
+    public void setUp() throws Exception {
+        super.setUp();
+        this.autoFailTestSupport.setAutoFail(false);
+    }
+
+    @Test(timeout = 5 * 60 * 1000)
+    public void testSendWithMultipleConsumers() throws Exception {
+        ConnectionFactory connectionFactory = (ConnectionFactory) new ConnectionFactoryImpl("localhost", port, "admin", "admin");
+        Connection connection = connectionFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        String destinationName = "topic://AMQ4920Test" + System.currentTimeMillis();
+        Destination destination = session.createTopic(destinationName);
+        connection.start();
+
+        ExecutorService executor = Executors.newCachedThreadPool();
+        for (int i=0; i < CONSUMER_COUNT; i++) {
+            AMQ4930ConsumerTask consumerTask = new AMQ4930ConsumerTask(destinationName, port, "Consumer-" + i, latch, ITERATIONS);
+            executor.submit(consumerTask);
+        }
+        connection.start();
+
+        LOG.debug("At start latch is " + latch.getCount());
+        sendMessages(connection, destination, ITERATIONS, 10);
+        LOG.debug("After send latch is " + latch.getCount());
+
+        latch.await(15, TimeUnit.SECONDS);
+        LOG.debug("After await latch is " + latch.getCount());
+        assertEquals(0, latch.getCount());
+
+        executor.shutdown();
+    }
+
+    public void sendMessages(Connection connection, Destination destination, int count, int sleepInterval) throws Exception {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(destination);
+
+        for (int i = 0; i < count; i++) {
+            TextMessage message = session.createTextMessage();
+            message.setText(TEXT_MESSAGE + i);
+            LOG.debug("Sending message [" + i + "]");
+            producer.send(message);
+            if (sleepInterval > 0) {
+                Thread.sleep(sleepInterval);
+            }
+        }
+
+        session.close();
+    }
+}
+
+
+class AMQ4930ConsumerTask implements Callable<Boolean> {
+    protected static final Logger LOG = LoggerFactory.getLogger(AMQ4930ConsumerTask.class);
+    private String destinationName;
+    private String consumerName;
+    private CountDownLatch messagesReceived;
+    private int port;
+    private int expectedMessageCount;
+
+    public AMQ4930ConsumerTask (String destinationName, int port, String consumerName, CountDownLatch latch, int expectedMessageCount) {
+        this.destinationName = destinationName;
+        this.port = port;
+        this.consumerName = consumerName;
+        this.messagesReceived = latch;
+        this.expectedMessageCount = expectedMessageCount;
+    }
+
+    @Override
+    public Boolean call() throws Exception {
+        LOG.debug(consumerName + " starting");
+        Connection connection=null;
+        try {
+            ConnectionFactory connectionFactory = (ConnectionFactory) new ConnectionFactoryImpl("localhost", port, "admin", "admin");
+            connection = connectionFactory.createConnection();
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Destination destination = session.createTopic(destinationName);
+            MessageConsumer consumer = session.createConsumer(destination);
+            connection.start();
+
+            int receivedCount = 0;
+            while(receivedCount < expectedMessageCount) {
+                Message message = consumer.receive(5 * 1000);
+                if (message == null) {
+                    LOG.error("consumer {} got null message on iteration {}", consumerName, receivedCount);
+                    return false;
+                }
+                if (!(message instanceof TextMessage)) {
+                    LOG.error("consumer {} expected text message on iteration {} but got {}", consumerName, receivedCount, message.getClass().getCanonicalName());
+                    return false;
+                }
+                TextMessage tm = (TextMessage) message;
+                if (!tm.getText().equals(AMQ4920Test.TEXT_MESSAGE + receivedCount)) {
+                    LOG.error("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText());
+                    return false;
+                }
+                LOG.debug("consumer {} expected {} got message [{}]", consumerName, receivedCount, tm.getText());  // TODO make debug
+
+                messagesReceived.countDown();
+                receivedCount++;
+            }
+        } catch (Exception e) {
+            LOG.error("UnexpectedException in " + consumerName, e);
+        } finally {
+            try {
+                connection.close();
+            } catch (JMSException ignoreMe) {
+            }
+        }
+
+        return true;
+    }
+}
+