You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/01/16 15:59:08 UTC

[3/6] qpid-jms git commit: add test for client-ack session and recover from message listener, expose issue with order

add test for client-ack session and recover from message listener, expose issue with order


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/20d4d9ff
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/20d4d9ff
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/20d4d9ff

Branch: refs/heads/master
Commit: 20d4d9ffef8d69fa94367a48fcac3fcfa692a318
Parents: 2ca35e9
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Jan 16 12:18:01 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Jan 16 12:32:18 2015 +0000

----------------------------------------------------------------------
 .../qpid/jms/consumer/JmsClientAckTest.java     | 124 +++++++++++++++++++
 1 file changed, 124 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/20d4d9ff/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
index 653961a..a9fd3be 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
@@ -17,6 +17,7 @@
 package org.apache.qpid.jms.consumer;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
@@ -384,4 +385,127 @@ public class JmsClientAckTest extends AmqpTestSupport {
 
         assertTrue("Failed to get all deliveries", consumed.await(45, TimeUnit.SECONDS));
     }
+
+    /**
+     * Test use of session recovery while using a client-ack session and
+     * a message listener. Calling recover should result in delivery of the
+     * previous messages again, followed by those that would have been received
+     * afterwards.
+     *
+     * Send three messages. Consume the first message, then recover on the second
+     * message and expect to see both again, ensure the third message is not seen
+     * until after this.
+     *
+     * @throws Exception
+     */
+    @Test(timeout = 60000)
+    public void testRecoverInOnMessage() throws Exception {
+        connection = createAmqpConnection();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        assertNotNull(session);
+        Queue queue = session.createQueue(name.getMethodName());
+        MessageConsumer consumer = session.createConsumer(queue);
+
+        sendMessages(connection, queue, 3);
+
+        CountDownLatch latch = new CountDownLatch(1);
+        ClientAckRecoverMsgListener listener = new ClientAckRecoverMsgListener(latch, session);
+        consumer.setMessageListener(listener);
+
+        connection.start();
+
+        assertTrue("Timed out waiting for async listener", latch.await(10, TimeUnit.SECONDS));
+        assertFalse("Test failed in listener, consult logs", listener.getFailed());
+    }
+
+    private static class ClientAckRecoverMsgListener implements MessageListener {
+        final Session session;
+        final CountDownLatch latch;
+        private boolean seenFirstMessage = false;
+        private boolean seenFirstMessageTwice = false;
+        private boolean seenSecondMessage = false;
+        private boolean seenSecondMessageTwice = false;
+        private boolean complete = false;
+        private boolean failed = false;
+
+        public ClientAckRecoverMsgListener(CountDownLatch latch, Session session) {
+            this.latch = latch;
+            this.session = session;
+        }
+
+        @Override
+        public void onMessage(Message message) {
+            try {
+                int msgNumProperty = message.getIntProperty(MESSAGE_NUMBER);
+
+                if(complete ){
+                    LOG.info("Test already finished, ignoring delivered message: " + msgNumProperty);
+                    return;
+                }
+
+                if (msgNumProperty == 1) {
+                    if (!seenFirstMessage) {
+                        LOG.info("Received first message.");
+                        seenFirstMessage = true;
+                    } else {
+                        LOG.info("Received first message again.");
+                        if(message.getJMSRedelivered()) {
+                            LOG.info("Message was marked redelivered as expected.");
+                        } else {
+                            LOG.error("Message was not marked redelivered.");
+                            complete(true);
+                        }
+                        seenFirstMessageTwice = true;
+                    }
+                } else if (msgNumProperty == 2) {
+                    if(!seenSecondMessage){
+                        seenSecondMessage = true;
+                        LOG.info("Received second message. Now calling recover()");
+                        session.recover();
+                    } else {
+                        LOG.info("Received second message again as expected.");
+                        seenSecondMessageTwice = true;
+                        if(message.getJMSRedelivered()) {
+                            LOG.info("Message was marked redelivered as expected.");
+                        } else {
+                            LOG.error("Message was not marked redelivered.");
+                            complete(true);
+                        }
+                    }
+                } else {
+                    if (msgNumProperty != 3) {
+                        LOG.error("Received unexpected message: " + msgNumProperty);
+                        complete(true);
+                    }
+
+                    if (!seenFirstMessageTwice && !seenSecondMessageTwice) {
+                        LOG.error("Third message was not received in expected sequence.");
+                        complete(true);
+                    }
+
+                    if(message.getJMSRedelivered()) {
+                        LOG.error("Message was marked redelivered against expectation.");
+                        complete(true);
+                    } else {
+                        LOG.info("Message was not marked redelivered, as expected.");
+                        complete(false);
+                    }
+                }
+            } catch (JMSException e) {
+                LOG.error("Exception caught in listener", e);
+                complete(true);
+            }
+        }
+
+        public boolean getFailed() {
+            return failed;
+        }
+
+        private void complete(boolean fail) {
+            failed = fail;
+            complete = true;
+            latch.countDown();
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org