You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2020/11/27 16:05:39 UTC

[qpid-jms] branch master updated: QPIDJMS-521: ensure ExceptionListener called during unexpected async dispatch failure is allowed to close the Session/Connection

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fea843  QPIDJMS-521: ensure ExceptionListener called during unexpected async dispatch failure is allowed to close the Session/Connection
8fea843 is described below

commit 8fea843e9b46457f4dda3b21b35c3c16558b723f
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Fri Nov 27 16:03:08 2020 +0000

    QPIDJMS-521: ensure ExceptionListener called during unexpected async dispatch failure is allowed to close the Session/Connection
---
 .../org/apache/qpid/jms/JmsMessageConsumer.java    |  2 +
 .../main/java/org/apache/qpid/jms/JmsSession.java  |  7 ++-
 .../jms/integration/ConsumerIntegrationTest.java   | 67 ++++++++++++++++++++++
 3 files changed, 75 insertions(+), 1 deletion(-)

diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 4afe91c..a8a6b62 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -741,6 +741,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
 
         if (dispatchLock.isHeldByCurrentThread()) {
             reclaimLock = true;
+            session.setDeliveryThreadCheckEnabled(false);
             dispatchLock.unlock();
         }
 
@@ -749,6 +750,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         } finally {
             if (reclaimLock) {
                 dispatchLock.lock();
+                session.setDeliveryThreadCheckEnabled(true);
             }
         }
     }
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index bf6d5cd..d9c2c42 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -127,6 +127,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     private volatile ThreadPoolExecutor deliveryExecutor;
     private volatile ThreadPoolExecutor completionExcecutor;
     private AtomicReference<Thread> deliveryThread = new AtomicReference<Thread>();
+    private boolean deliveryThreadCheckEnabled = true;
     private AtomicReference<Thread> completionThread = new AtomicReference<Thread>();
 
     private final AtomicLong consumerIdGenerator = new AtomicLong();
@@ -1277,8 +1278,12 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         }
     }
 
+    void setDeliveryThreadCheckEnabled(boolean enabled) {
+        deliveryThreadCheckEnabled = enabled;
+    }
+
     void checkIsDeliveryThread() throws JMSException {
-        if (Thread.currentThread().equals(deliveryThread.get())) {
+        if (deliveryThreadCheckEnabled && Thread.currentThread().equals(deliveryThread.get())) {
             throw new IllegalStateException("Illegal invocation from MessageListener callback");
         }
     }
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index e0e751c..ca4e973 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -2437,4 +2437,71 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
             assertEquals("Message payloads not as expected", expectedPayloads, receivedPayloads);
         }
     }
+
+    @Test(timeout=20000)
+    public void testClosingSessionAndConnectionWithinExceptionListenerDueToAsyncConsumerDeliveryFailure() throws Exception {
+        final CountDownLatch exceptionListenerCalled = new CountDownLatch(1);
+        final CountDownLatch exceptionListenerCompleted = new CountDownLatch(1);
+        final AtomicReference<Throwable> asyncError = new AtomicReference<Throwable>(null);
+        final AtomicBoolean messageListenerCalled = new AtomicBoolean();
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final Connection connection = testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+            connection.start();
+
+            final PropertiesDescribedType properties = new PropertiesDescribedType();
+            properties.setContentType(Symbol.valueOf("text/plain;charset=utf-8"));
+
+            byte[] invalidPayload = new byte[2];  // Add two for malformed UTF8
+            invalidPayload[0] = (byte) 0b11000111; // The prefix for a two-byte UTF8 encoding
+            invalidPayload[1] = (byte) 0b00110000; // An invalid next byte, as encoding must be 0b10xxxxxx
+            DescribedType invalidUTF8DataContent = new DataDescribedType(new Binary(invalidPayload));
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, properties, null, invalidUTF8DataContent, 1);
+
+            final ModifiedMatcher modifiedFailedUndeliverableMatcher = new ModifiedMatcher();
+            modifiedFailedUndeliverableMatcher.withDeliveryFailed(equalTo(true));
+            modifiedFailedUndeliverableMatcher.withUndeliverableHere(equalTo(true));
+
+            MessageConsumer consumer = session.createConsumer(destination);
+
+            testPeer.waitForAllHandlersToComplete(2000);
+
+            testPeer.expectDisposition(true, modifiedFailedUndeliverableMatcher);
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            connection.setExceptionListener(exception -> {
+                try {
+                    exceptionListenerCalled.countDown();
+                    session.close();
+                    connection.close();
+                } catch (Exception ex) {
+                    asyncError.set(ex);
+                } finally {
+                    exceptionListenerCompleted.countDown();
+                }
+            });
+
+            consumer.setMessageListener(m -> messageListenerCalled.set(true));
+
+            assertTrue("Exception listener was not fired within given timeout",
+                    exceptionListenerCalled.await(4000, TimeUnit.MILLISECONDS));
+
+            assertTrue("Exception listener didnt complete within given timeout",
+                    exceptionListenerCompleted.await(4000, TimeUnit.MILLISECONDS));
+
+            assertNull("Unexpected failure during exception listener handling", asyncError.get());
+            assertFalse("Message listener should not have been called due to decoding error", messageListenerCalled.get());
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org