You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2015/01/16 15:59:08 UTC
[3/6] qpid-jms git commit: add test for client-ack session and
recover from message listener, expose issue with order
add test for client-ack session and recover from message listener, expose issue with order
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/20d4d9ff
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/20d4d9ff
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/20d4d9ff
Branch: refs/heads/master
Commit: 20d4d9ffef8d69fa94367a48fcac3fcfa692a318
Parents: 2ca35e9
Author: Robert Gemmell <ro...@apache.org>
Authored: Fri Jan 16 12:18:01 2015 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Fri Jan 16 12:32:18 2015 +0000
----------------------------------------------------------------------
.../qpid/jms/consumer/JmsClientAckTest.java | 124 +++++++++++++++++++
1 file changed, 124 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/20d4d9ff/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
index 653961a..a9fd3be 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java
@@ -17,6 +17,7 @@
package org.apache.qpid.jms.consumer;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
@@ -384,4 +385,127 @@ public class JmsClientAckTest extends AmqpTestSupport {
assertTrue("Failed to get all deliveries", consumed.await(45, TimeUnit.SECONDS));
}
+
+ /**
+ * Test use of session recovery while using a client-ack session and
+ * a message listener. Calling recover should result in delivery of the
+ * previous messages again, followed by those that would have been received
+ * afterwards.
+ *
+ * Send three messages. Consume the first message, then recover on the second
+ * message and expect to see both again, ensure the third message is not seen
+ * until after this.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 60000)
+ public void testRecoverInOnMessage() throws Exception {
+ connection = createAmqpConnection();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ assertNotNull(session);
+ Queue queue = session.createQueue(name.getMethodName());
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ sendMessages(connection, queue, 3);
+
+ CountDownLatch latch = new CountDownLatch(1);
+ ClientAckRecoverMsgListener listener = new ClientAckRecoverMsgListener(latch, session);
+ consumer.setMessageListener(listener);
+
+ connection.start();
+
+ assertTrue("Timed out waiting for async listener", latch.await(10, TimeUnit.SECONDS));
+ assertFalse("Test failed in listener, consult logs", listener.getFailed());
+ }
+
+ private static class ClientAckRecoverMsgListener implements MessageListener {
+ final Session session;
+ final CountDownLatch latch;
+ private boolean seenFirstMessage = false;
+ private boolean seenFirstMessageTwice = false;
+ private boolean seenSecondMessage = false;
+ private boolean seenSecondMessageTwice = false;
+ private boolean complete = false;
+ private boolean failed = false;
+
+ public ClientAckRecoverMsgListener(CountDownLatch latch, Session session) {
+ this.latch = latch;
+ this.session = session;
+ }
+
+ @Override
+ public void onMessage(Message message) {
+ try {
+ int msgNumProperty = message.getIntProperty(MESSAGE_NUMBER);
+
+ if(complete ){
+ LOG.info("Test already finished, ignoring delivered message: " + msgNumProperty);
+ return;
+ }
+
+ if (msgNumProperty == 1) {
+ if (!seenFirstMessage) {
+ LOG.info("Received first message.");
+ seenFirstMessage = true;
+ } else {
+ LOG.info("Received first message again.");
+ if(message.getJMSRedelivered()) {
+ LOG.info("Message was marked redelivered as expected.");
+ } else {
+ LOG.error("Message was not marked redelivered.");
+ complete(true);
+ }
+ seenFirstMessageTwice = true;
+ }
+ } else if (msgNumProperty == 2) {
+ if(!seenSecondMessage){
+ seenSecondMessage = true;
+ LOG.info("Received second message. Now calling recover()");
+ session.recover();
+ } else {
+ LOG.info("Received second message again as expected.");
+ seenSecondMessageTwice = true;
+ if(message.getJMSRedelivered()) {
+ LOG.info("Message was marked redelivered as expected.");
+ } else {
+ LOG.error("Message was not marked redelivered.");
+ complete(true);
+ }
+ }
+ } else {
+ if (msgNumProperty != 3) {
+ LOG.error("Received unexpected message: " + msgNumProperty);
+ complete(true);
+ }
+
+ if (!seenFirstMessageTwice && !seenSecondMessageTwice) {
+ LOG.error("Third message was not received in expected sequence.");
+ complete(true);
+ }
+
+ if(message.getJMSRedelivered()) {
+ LOG.error("Message was marked redelivered against expectation.");
+ complete(true);
+ } else {
+ LOG.info("Message was not marked redelivered, as expected.");
+ complete(false);
+ }
+ }
+ } catch (JMSException e) {
+ LOG.error("Exception caught in listener", e);
+ complete(true);
+ }
+ }
+
+ public boolean getFailed() {
+ return failed;
+ }
+
+ private void complete(boolean fail) {
+ failed = fail;
+ complete = true;
+ latch.countDown();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org