You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/12/21 17:22:31 UTC

qpid-jms git commit: QPIDJMS-239 Add a test against ActiveMQ based on the scenario described

Repository: qpid-jms
Updated Branches:
  refs/heads/master ff7076ab5 -> 81fc0d9a0


QPIDJMS-239 Add a test against ActiveMQ based on the scenario described

Adds a recover test based on the described scenario in the ActiveMQ test
suite to try and reproduce.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/81fc0d9a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/81fc0d9a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/81fc0d9a

Branch: refs/heads/master
Commit: 81fc0d9a029f297c385f163262821ae056fa5cdc
Parents: ff7076a
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Dec 21 12:22:15 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Dec 21 12:22:15 2016 -0500

----------------------------------------------------------------------
 .../qpid/jms/consumer/JmsClientAckTest.java     | 87 ++++++++++++++++++++
 1 file changed, 87 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/81fc0d9a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
index 56272c4..7b7f09f 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
@@ -19,12 +19,14 @@ package org.apache.qpid.jms.consumer;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.DeliveryMode;
@@ -538,6 +540,91 @@ public class JmsClientAckTest extends AmqpTestSupport {
         }));
     }
 
+    @Test(timeout = 60000)
+    public void testRepeatedRecoveriesInAsyncListener() throws Exception {
+        final int MESSAGE_COUNT = 20;
+        final int ITERATIONS = 10;
+
+        final AtomicInteger messagesConsumed = new AtomicInteger();
+        final AtomicReference<Exception> failure = new AtomicReference<Exception>();
+
+        connection = createAmqpConnection();
+        connection.start();
+        final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        Queue queue = session.createQueue(name.getMethodName());
+
+        sendMessages(connection, queue, MESSAGE_COUNT);
+
+        final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+        assertTrue("Queue didn't receive all messages", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == MESSAGE_COUNT;
+            }
+        }));
+
+        // Consume the message...
+        MessageConsumer consumer = session.createConsumer(queue);
+        consumer.setMessageListener(new MessageListener() {
+
+            int retries = 0;
+
+            @Override
+            public void onMessage(Message message) {
+                try {
+                    LOG.info("Read message {}", message.getIntProperty(MESSAGE_NUMBER));
+                    if (message.getIntProperty(MESSAGE_NUMBER) != messagesConsumed.get() + 1) {
+                        failure.set(new IllegalArgumentException("Read message with wrong sequence"));
+                    }
+
+                    if (++retries == ITERATIONS) {
+                        messagesConsumed.incrementAndGet();
+                        retries = 0;
+                        message.acknowledge();
+
+                        // Check that only one message is consumed
+                        boolean consumed = Wait.waitFor(new Wait.Condition() {
+
+                            @Override
+                            public boolean isSatisified() throws Exception {
+                                return proxy.getQueueSize() == MESSAGE_COUNT - messagesConsumed.get();
+                            }
+                        }, 10000, 20);
+
+                        if (!consumed) {
+                            failure.set(new IllegalStateException("Broker Queue Size doesn't match expectations"));
+                        }
+                    } else {
+                        session.recover();
+                    }
+
+                } catch (Exception e) {
+                    failure.set(e);
+                }
+            }
+        });
+
+        assertTrue("Not all messages could be consumed, got " + messagesConsumed.get(), Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("Are we complete: error:{} nessages read:{}", failure.get(), messagesConsumed.get());
+                return failure.get() != null || messagesConsumed.get() == MESSAGE_COUNT;
+            }
+        }));
+
+        assertNull("Should not get any failures during this test", failure.get());
+
+        assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return proxy.getQueueSize() == 0;
+            }
+        }));
+    }
+
     private static class ClientAckRecoverMsgListener implements MessageListener {
         final Session session;
         final CountDownLatch latch;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org