You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2018/04/11 12:27:41 UTC

qpid-jms git commit: QPIDJMS-376: add related test (and verify exception listener isnt called for regular close) and restore timeout on an existing test

Repository: qpid-jms
Updated Branches:
  refs/heads/master da0d4e50a -> a307df8bc


QPIDJMS-376: add related test (and verify exception listener isnt called for regular close) and restore timeout on an existing test


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/a307df8b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/a307df8b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/a307df8b

Branch: refs/heads/master
Commit: a307df8bcb24f29760cb68c51f92f420396c1c3a
Parents: da0d4e5
Author: Robbie Gemmell <ro...@apache.org>
Authored: Wed Apr 11 13:23:33 2018 +0100
Committer: Robbie Gemmell <ro...@apache.org>
Committed: Wed Apr 11 13:23:33 2018 +0100

----------------------------------------------------------------------
 .../integration/ConsumerIntegrationTest.java    | 63 +++++++++++++++++++-
 1 file changed, 62 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a307df8b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index c573972..9885200 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -1048,7 +1048,7 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
         }
     }
 
-    @Test // (timeout=20000)
+    @Test(timeout=20000)
     public void testMessageListenerCallsConnectionStopThrowsIllegalStateException() throws Exception {
         final CountDownLatch latch = new CountDownLatch(1);
         final AtomicReference<Exception> asyncError = new AtomicReference<Exception>(null);
@@ -1163,6 +1163,67 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    @Test(timeout=20000)
+    public void testMessageListenerClosesItsConsumer() throws Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+        final CountDownLatch exceptionListenerFired = new CountDownLatch(1);
+        final AtomicReference<Throwable> error = new AtomicReference<>();
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            connection.setExceptionListener(new ExceptionListener() {
+                @Override
+                public void onException(JMSException exception) {
+                    LOG.trace("JMS ExceptionListener: ", exception);
+                    exceptionListenerFired.countDown();
+                }
+            });
+
+            testPeer.expectBegin();
+
+            final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+            connection.start();
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 1);
+
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            testPeer.expectLinkFlow(true, true, equalTo(UnsignedInteger.valueOf(JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH -1)));
+            testPeer.expectDisposition(true, new AcceptedMatcher());
+            testPeer.expectDetach(true, true, true);
+
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message m) {
+                    try {
+                        consumer.close();
+                    } catch (Throwable t) {
+                        error.set(t);
+                        LOG.error("Unexpected error during close", t);
+                    }
+
+                    latch.countDown();
+                    LOG.debug("Async consumer got Message: {}", m);
+                }
+            });
+
+            assertTrue("Process not completed within given timeout", latch.await(3000, TimeUnit.MILLISECONDS));
+            assertNull("No error expected during close", error.get());
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            testPeer.expectClose();
+            connection.close();
+
+            assertFalse("JMS Exception listener shouldn't have fired", exceptionListenerFired.await(20, TimeUnit.MILLISECONDS));
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
     @Repeat(repetitions = 1)
     @Test(timeout=20000)
     public void testRecoverOrderingWithAsyncConsumer() throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org