You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2019/06/28 14:59:02 UTC

[qpid-jms] branch master updated: QPIDJMS-464: fail pull attempt if message doesnt eventually complete

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c83408  QPIDJMS-464: fail pull attempt if message doesnt eventually complete
8c83408 is described below

commit 8c83408c48fe646f21ca0387bc332566175b79c5
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Fri Jun 28 15:57:50 2019 +0100

    QPIDJMS-464: fail pull attempt if message doesnt eventually complete
---
 .../qpid/jms/provider/amqp/AmqpConsumer.java       | 30 ++++++++-
 .../integration/ZeroPrefetchIntegrationTest.java   | 73 ++++++++++++++++++++++
 2 files changed, 100 insertions(+), 3 deletions(-)

diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 7bc2be1..197980d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -110,7 +110,29 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                 request.onSuccess();
             } else {
                 // There are still deliveries to process, wait for them to be.
-                stopRequest = request;
+                if (getDrainTimeout() > 0) {
+                    // If the remote doesn't respond we will close the consumer and break any
+                    // blocked receive or stop calls that are waiting, unless the consumer is
+                    // a participant in a transaction in which case we will just fail the request
+                    // and leave the consumer open since the TX needs it to remain active.
+                    final ScheduledFuture<?> future = getSession().schedule(() -> {
+                        LOG.trace("Consumer {} stop timed out awaiting message processing", getConsumerId());
+                        Exception cause = new JmsOperationTimedOutException("Consumer stop timed out awaiting message processing");
+                        if (session.isTransacted() && session.getTransactionContext().isInTransaction(getConsumerId())) {
+                            stopRequest.onFailure(cause);
+                            stopRequest = null;
+                        } else {
+                            closeResource(session.getProvider(), cause, false);
+                            session.getProvider().pumpToProtonTransport();
+                        }
+                    }, getDrainTimeout());
+
+                    stopRequest = new ScheduledRequest(future, request);
+                } else {
+                    stopRequest = request;
+                }
+
+                LOG.trace("Consumer {} stop awaiting queued delivery processing", getConsumerId());
             }
         } else {
             // TODO: We don't actually want the additional messages that could be sent while
@@ -461,6 +483,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
 
     @Override
     public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException {
+        if(delivery.getDefaultDeliveryState() == null){
+            delivery.setDefaultDeliveryState(Released.getInstance());
+        }
+
         if (delivery.isReadable() && !delivery.isPartial()) {
             LOG.trace("{} has incoming Message(s).", this);
             try {
@@ -492,8 +518,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     }
 
     private boolean processDelivery(Delivery incoming) throws Exception {
-        incoming.setDefaultDeliveryState(Released.getInstance());
-
         JmsMessage message = null;
         try {
             message = AmqpCodec.decodeMessage(this, getEndpoint().recv()).asJmsMessage();
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
index a16396f..8a169f0 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
+import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
@@ -40,6 +41,7 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 
 import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.Wait;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
@@ -347,4 +349,75 @@ public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(2000);
         }
     }
+
+    @Test(timeout=20000)
+    public void testZeroPrefetchConsumerReceiveTimedPullWithInFlightArrivalTimesOutIfNotCompleted() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Create a connection with zero prefetch
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0&amqp.drainTimeout=75");
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // Expected the consumer to attach but NOT send credit
+            testPeer.expectReceiverAttach();
+
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            // Expect that once receive is called, it flows 1 credit. Give it an initial (ie. more=true) transfer frame with header only.
+            testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.ONE));
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(new HeaderDescribedType(), null, null, null, null, 1, "delivery1", true, 0);
+            // Expect the consumer to be closed when stop times out. Depending on timing (e.g in slow CI), a draining Flow might arrive first, allowing for that.
+            testPeer.optionalFlow(true, false, equalTo(UnsignedInteger.ONE));
+            testPeer.expectDetach(true, true, true);
+            testPeer.expectDispositionThatIsReleasedAndSettled();
+
+            final AtomicReference<Throwable> error = new AtomicReference<>();
+            final CountDownLatch done = new CountDownLatch(1);
+
+            ExecutorService executor = Executors.newSingleThreadExecutor();
+            try {
+                executor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            consumer.receive(20);
+                        } catch (Throwable t) {
+                            error.set(t);
+                        } finally {
+                            done.countDown();
+                        }
+                    }
+                });
+
+                assertTrue("Consumer receive task did not complete", done.await(4, TimeUnit.SECONDS));
+
+                Throwable t = error.get();
+                assertNotNull("Consumer receive did not throw as expected", t);
+                assertTrue("Consumer receive did not throw as expected", t instanceof JMSException);
+            } finally {
+                executor.shutdownNow();
+            }
+
+            assertTrue("Consumer should be closed", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisfied() throws Exception {
+                    try {
+                        consumer.getMessageSelector();
+                        return false;
+                    } catch (JMSException ex) {
+                        return true;
+                    }
+                }
+            }, 5000, 10));
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org