You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2020/11/27 16:05:39 UTC
[qpid-jms] branch master updated: QPIDJMS-521: ensure
ExceptionListener called during unexpected async dispatch failure is
allowed to close the Session/Connection
This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/master by this push:
new 8fea843 QPIDJMS-521: ensure ExceptionListener called during unexpected async dispatch failure is allowed to close the Session/Connection
8fea843 is described below
commit 8fea843e9b46457f4dda3b21b35c3c16558b723f
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Fri Nov 27 16:03:08 2020 +0000
QPIDJMS-521: ensure ExceptionListener called during unexpected async dispatch failure is allowed to close the Session/Connection
---
.../org/apache/qpid/jms/JmsMessageConsumer.java | 2 +
.../main/java/org/apache/qpid/jms/JmsSession.java | 7 ++-
.../jms/integration/ConsumerIntegrationTest.java | 67 ++++++++++++++++++++++
3 files changed, 75 insertions(+), 1 deletion(-)
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 4afe91c..a8a6b62 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -741,6 +741,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
if (dispatchLock.isHeldByCurrentThread()) {
reclaimLock = true;
+ session.setDeliveryThreadCheckEnabled(false);
dispatchLock.unlock();
}
@@ -749,6 +750,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
} finally {
if (reclaimLock) {
dispatchLock.lock();
+ session.setDeliveryThreadCheckEnabled(true);
}
}
}
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index bf6d5cd..d9c2c42 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -127,6 +127,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
private volatile ThreadPoolExecutor deliveryExecutor;
private volatile ThreadPoolExecutor completionExcecutor;
private AtomicReference<Thread> deliveryThread = new AtomicReference<Thread>();
+ private boolean deliveryThreadCheckEnabled = true;
private AtomicReference<Thread> completionThread = new AtomicReference<Thread>();
private final AtomicLong consumerIdGenerator = new AtomicLong();
@@ -1277,8 +1278,12 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
}
}
+ void setDeliveryThreadCheckEnabled(boolean enabled) {
+ deliveryThreadCheckEnabled = enabled;
+ }
+
void checkIsDeliveryThread() throws JMSException {
- if (Thread.currentThread().equals(deliveryThread.get())) {
+ if (deliveryThreadCheckEnabled && Thread.currentThread().equals(deliveryThread.get())) {
throw new IllegalStateException("Illegal invocation from MessageListener callback");
}
}
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index e0e751c..ca4e973 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -2437,4 +2437,71 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
assertEquals("Message payloads not as expected", expectedPayloads, receivedPayloads);
}
}
+
+ @Test(timeout=20000)
+ public void testClosingSessionAndConnectionWithinExceptionListenerDueToAsyncConsumerDeliveryFailure() throws Exception {
+ final CountDownLatch exceptionListenerCalled = new CountDownLatch(1);
+ final CountDownLatch exceptionListenerCompleted = new CountDownLatch(1);
+ final AtomicReference<Throwable> asyncError = new AtomicReference<Throwable>(null);
+ final AtomicBoolean messageListenerCalled = new AtomicBoolean();
+
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ final Connection connection = testFixture.establishConnecton(testPeer);
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(getTestName());
+ connection.start();
+
+ final PropertiesDescribedType properties = new PropertiesDescribedType();
+ properties.setContentType(Symbol.valueOf("text/plain;charset=utf-8"));
+
+ byte[] invalidPayload = new byte[2]; // Add two for malformed UTF8
+ invalidPayload[0] = (byte) 0b11000111; // The prefix for a two-byte UTF8 encoding
+ invalidPayload[1] = (byte) 0b00110000; // An invalid next byte, as encoding must be 0b10xxxxxx
+ DescribedType invalidUTF8DataContent = new DataDescribedType(new Binary(invalidPayload));
+
+ testPeer.expectReceiverAttach();
+ testPeer.expectLinkFlowRespondWithTransfer(null, null, properties, null, invalidUTF8DataContent, 1);
+
+ final ModifiedMatcher modifiedFailedUndeliverableMatcher = new ModifiedMatcher();
+ modifiedFailedUndeliverableMatcher.withDeliveryFailed(equalTo(true));
+ modifiedFailedUndeliverableMatcher.withUndeliverableHere(equalTo(true));
+
+ MessageConsumer consumer = session.createConsumer(destination);
+
+ testPeer.waitForAllHandlersToComplete(2000);
+
+ testPeer.expectDisposition(true, modifiedFailedUndeliverableMatcher);
+ testPeer.expectEnd();
+ testPeer.expectClose();
+
+ connection.setExceptionListener(exception -> {
+ try {
+ exceptionListenerCalled.countDown();
+ session.close();
+ connection.close();
+ } catch (Exception ex) {
+ asyncError.set(ex);
+ } finally {
+ exceptionListenerCompleted.countDown();
+ }
+ });
+
+ consumer.setMessageListener(m -> messageListenerCalled.set(true));
+
+ assertTrue("Exception listener was not fired within given timeout",
+ exceptionListenerCalled.await(4000, TimeUnit.MILLISECONDS));
+
+ assertTrue("Exception listener didnt complete within given timeout",
+ exceptionListenerCompleted.await(4000, TimeUnit.MILLISECONDS));
+
+ assertNull("Unexpected failure during exception listener handling", asyncError.get());
+ assertFalse("Message listener should not have been called due to decoding error", messageListenerCalled.get());
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org