You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/12/21 17:22:31 UTC
qpid-jms git commit: QPIDJMS-239 Add a test against ActiveMQ based on
the scenario described
Repository: qpid-jms
Updated Branches:
refs/heads/master ff7076ab5 -> 81fc0d9a0
QPIDJMS-239 Add a test against ActiveMQ based on the scenario described
Adds a recover test based on the described scenario in the ActiveMQ test
suite to try and reproduce.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/81fc0d9a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/81fc0d9a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/81fc0d9a
Branch: refs/heads/master
Commit: 81fc0d9a029f297c385f163262821ae056fa5cdc
Parents: ff7076a
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Dec 21 12:22:15 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Dec 21 12:22:15 2016 -0500
----------------------------------------------------------------------
.../qpid/jms/consumer/JmsClientAckTest.java | 87 ++++++++++++++++++++
1 file changed, 87 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/81fc0d9a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
index 56272c4..7b7f09f 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
@@ -19,12 +19,14 @@ package org.apache.qpid.jms.consumer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.DeliveryMode;
@@ -538,6 +540,91 @@ public class JmsClientAckTest extends AmqpTestSupport {
}));
}
+ @Test(timeout = 60000)
+ public void testRepeatedRecoveriesInAsyncListener() throws Exception {
+ final int MESSAGE_COUNT = 20;
+ final int ITERATIONS = 10;
+
+ final AtomicInteger messagesConsumed = new AtomicInteger();
+ final AtomicReference<Exception> failure = new AtomicReference<Exception>();
+
+ connection = createAmqpConnection();
+ connection.start();
+ final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ Queue queue = session.createQueue(name.getMethodName());
+
+ sendMessages(connection, queue, MESSAGE_COUNT);
+
+ final QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
+ assertTrue("Queue didn't receive all messages", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == MESSAGE_COUNT;
+ }
+ }));
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ consumer.setMessageListener(new MessageListener() {
+
+ int retries = 0;
+
+ @Override
+ public void onMessage(Message message) {
+ try {
+ LOG.info("Read message {}", message.getIntProperty(MESSAGE_NUMBER));
+ if (message.getIntProperty(MESSAGE_NUMBER) != messagesConsumed.get() + 1) {
+ failure.set(new IllegalArgumentException("Read message with wrong sequence"));
+ }
+
+ if (++retries == ITERATIONS) {
+ messagesConsumed.incrementAndGet();
+ retries = 0;
+ message.acknowledge();
+
+ // Check that only one message is consumed
+ boolean consumed = Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == MESSAGE_COUNT - messagesConsumed.get();
+ }
+ }, 10000, 20);
+
+ if (!consumed) {
+ failure.set(new IllegalStateException("Broker Queue Size doesn't match expectations"));
+ }
+ } else {
+ session.recover();
+ }
+
+ } catch (Exception e) {
+ failure.set(e);
+ }
+ }
+ });
+
+ assertTrue("Not all messages could be consumed, got " + messagesConsumed.get(), Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ LOG.info("Are we complete: error:{} nessages read:{}", failure.get(), messagesConsumed.get());
+ return failure.get() != null || messagesConsumed.get() == MESSAGE_COUNT;
+ }
+ }));
+
+ assertNull("Should not get any failures during this test", failure.get());
+
+ assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return proxy.getQueueSize() == 0;
+ }
+ }));
+ }
+
private static class ClientAckRecoverMsgListener implements MessageListener {
final Session session;
final CountDownLatch latch;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org