You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2020/05/20 16:05:55 UTC

[qpid-jms] branch master updated: QPIDJMS-506 Improve error handling during async dispatch processing

This is an automated email from the ASF dual-hosted git repository.

tabish pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/master by this push:
     new d9e32d4  QPIDJMS-506 Improve error handling during async dispatch processing
d9e32d4 is described below

commit d9e32d4a09dc237dfc22ada36f92b238c3356287
Author: Timothy Bish <ta...@gmail.com>
AuthorDate: Wed May 20 11:54:44 2020 -0400

    QPIDJMS-506 Improve error handling during async dispatch processing
    
    Ensure that during preparation for asynchronous message dispatch that
    any error encountered due to corrupt message payload signals the remote
    of the failed delivery.  Also protect the asynchronous exception
    notification from inter-thread deadlocks by ensuring that the dispatch
    lock is not held during that time.
---
 .../org/apache/qpid/jms/JmsMessageConsumer.java    | 47 +++++++++++++-----
 .../main/java/org/apache/qpid/jms/JmsSession.java  |  4 --
 .../jms/integration/ConsumerIntegrationTest.java   | 58 ++++++++++++++++++++++
 3 files changed, 92 insertions(+), 17 deletions(-)

diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 938d5a3..4afe91c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -71,7 +71,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     protected volatile JmsMessageAvailableListener availableListener;
     protected final MessageQueue messageQueue;
     protected final Lock lock = new ReentrantLock();
-    protected final Lock dispatchLock = new ReentrantLock();
+    protected final ReentrantLock dispatchLock = new ReentrantLock();
     protected final AtomicReference<Throwable> failureCause = new AtomicReference<>();
     protected final MessageDeliverTask deliveryTask = new MessageDeliverTask();
     protected final JmsTracer tracer;
@@ -416,7 +416,13 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         if (envelope == null || envelope.getMessage() == null) {
             return null;
         }
-        return envelope.getMessage().copy();
+
+        try {
+            return envelope.getMessage().copy();
+        } catch (Exception ex) {
+            session.acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
+            throw ex;
+        }
     }
 
     JmsInboundMessageDispatch ackFromReceive(final JmsInboundMessageDispatch envelope) throws JMSException {
@@ -437,7 +443,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         try {
             session.acknowledge(envelope, ACK_TYPE.ACCEPTED);
         } catch (JMSException ex) {
-            session.onException(ex);
+            signalExceptionListener(ex);
             throw ex;
         }
         return envelope;
@@ -447,7 +453,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         try {
             session.acknowledge(envelope, ACK_TYPE.DELIVERED);
         } catch (JMSException ex) {
-            session.onException(ex);
+            signalExceptionListener(ex);
             throw ex;
         }
         return envelope;
@@ -457,7 +463,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         try {
             session.acknowledge(envelope, ACK_TYPE.MODIFIED_FAILED_UNDELIVERABLE);
         } catch (JMSException ex) {
-            session.onException(ex);
+            signalExceptionListener(ex);
             throw ex;
         }
     }
@@ -467,7 +473,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
             JmsRedeliveryPolicy redeliveryPolicy = consumerInfo.getRedeliveryPolicy();
             session.acknowledge(envelope, lookupAckTypeForDisposition(redeliveryPolicy.getOutcome(getDestination())));
         } catch (JMSException ex) {
-            session.onException(ex);
+            signalExceptionListener(ex);
             throw ex;
         }
     }
@@ -476,7 +482,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         try {
             session.acknowledge(envelope, ACK_TYPE.RELEASED);
         } catch (JMSException ex) {
-            session.onException(ex);
+            signalExceptionListener(ex);
             throw ex;
         }
     }
@@ -730,6 +736,23 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
         return false;
     }
 
+    private final void signalExceptionListener(Exception ex) {
+        boolean reclaimLock = false;
+
+        if (dispatchLock.isHeldByCurrentThread()) {
+            reclaimLock = true;
+            dispatchLock.unlock();
+        }
+
+        try {
+            session.onException(ex);
+        } finally {
+            if (reclaimLock) {
+                dispatchLock.lock();
+            }
+        }
+    }
+
     private void drainMessageQueueToListener() {
         if (messageListener != null && session.isStarted() && messageQueue.isRunning()) {
             session.getDispatcherExecutor().execute(new BoundedMessageDeliverTask(messageQueue.size()));
@@ -778,7 +801,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
                         deliveryFailed = true;
                         tracer.asyncDeliveryComplete(facade, DeliveryOutcome.APPLICATION_ERROR, rte);
                     } finally {
-                        if(!deliveryFailed) {
+                        if (!deliveryFailed) {
                             tracer.asyncDeliveryComplete(facade, DeliveryOutcome.DELIVERED, null);
                         }
                     }
@@ -792,11 +815,9 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
                     }
                 }
             } catch (Exception e) {
-                // TODO - There are two cases where we can get an error here, one being
-                //        and error returned from the attempted ACK that was sent and the
-                //        other being an error while attempting to copy the incoming message.
-                //        We need to decide how to respond to these.
-                session.getConnection().onException(e);
+                // An error while attempting to copy the message is the likely cause of this
+                // exception case being hit.
+                signalExceptionListener(e);
             } finally {
                 dispatchLock.unlock();
 
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 09bcd05..bf6d5cd 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -825,10 +825,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         connection.onException(ex);
     }
 
-    protected void onException(JMSException ex) {
-        connection.onException(ex);
-    }
-
     protected void send(JmsMessageProducer producer, Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, long deliveryDelay, CompletionListener listener) throws JMSException {
         if (dest == null) {
             throw new InvalidDestinationException("Destination must not be null");
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
index 0c6dab9..9fcec81 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConsumerIntegrationTest.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
@@ -58,7 +59,9 @@ import org.apache.qpid.jms.test.testpeer.AmqpPeerRunnable;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.DataDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.RejectedMatcher;
@@ -68,7 +71,9 @@ import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionM
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
 import org.apache.qpid.jms.util.QpidJMSTestRunner;
 import org.apache.qpid.jms.util.Repeat;
+import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.hamcrest.Matcher;
 import org.hamcrest.Matchers;
@@ -2228,4 +2233,57 @@ public class ConsumerIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
+
+    @Test(timeout = 20000)
+    public void testDispositionSentForDecodeErrorOnMessageCopyWithAsyncConsumer() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            final AtomicReference<JMSException> decodeError = new AtomicReference<>();
+
+            Connection connection = testFixture.establishConnecton(testPeer);
+            connection.setExceptionListener(ex -> {
+                decodeError.set(ex);
+            });
+            connection.start();
+
+            byte[] badUTF8Encoding = { (byte) 0xe4, (byte) 0xa4, 0x01 };
+
+            testPeer.expectBegin();
+
+            final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            final Queue destination = session.createQueue(getTestName());
+
+            PropertiesDescribedType properties = new PropertiesDescribedType();
+            properties.setContentType(Symbol.valueOf("text/plain;charset=utf-8"));
+
+            DescribedType dataContent = new DataDescribedType(new Binary(badUTF8Encoding));
+
+            ModifiedMatcher stateMatcher = new ModifiedMatcher();
+            stateMatcher.withDeliveryFailed(equalTo(true));
+            stateMatcher.withUndeliverableHere(equalTo(true));
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, properties, null, dataContent);
+            testPeer.expectDisposition(true, stateMatcher);
+            testPeer.expectClose();
+
+            final AtomicBoolean onMessage = new AtomicBoolean();
+
+            MessageConsumer messageConsumer = session.createConsumer(destination);
+            messageConsumer.setMessageListener(m -> onMessage.set(true));
+
+            assertTrue("Should have gotten an onException call from failed message copy", Wait.waitFor(new Wait.Condition() {
+
+                @Override
+                public boolean isSatisfied() throws Exception {
+                    return decodeError.get() != null;
+                }
+            }));
+
+            assertFalse("onMessage should not be called due to failed message copy", onMessage.get());
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org