You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2019/06/28 14:59:02 UTC
[qpid-jms] branch master updated: QPIDJMS-464: fail pull attempt if
message doesnt eventually complete
This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
The following commit(s) were added to refs/heads/master by this push:
new 8c83408 QPIDJMS-464: fail pull attempt if message doesnt eventually complete
8c83408 is described below
commit 8c83408c48fe646f21ca0387bc332566175b79c5
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Fri Jun 28 15:57:50 2019 +0100
QPIDJMS-464: fail pull attempt if message doesnt eventually complete
---
.../qpid/jms/provider/amqp/AmqpConsumer.java | 30 ++++++++-
.../integration/ZeroPrefetchIntegrationTest.java | 73 ++++++++++++++++++++++
2 files changed, 100 insertions(+), 3 deletions(-)
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 7bc2be1..197980d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -110,7 +110,29 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
request.onSuccess();
} else {
// There are still deliveries to process, wait for them to be.
- stopRequest = request;
+ if (getDrainTimeout() > 0) {
+ // If the remote doesn't respond we will close the consumer and break any
+ // blocked receive or stop calls that are waiting, unless the consumer is
+ // a participant in a transaction in which case we will just fail the request
+ // and leave the consumer open since the TX needs it to remain active.
+ final ScheduledFuture<?> future = getSession().schedule(() -> {
+ LOG.trace("Consumer {} stop timed out awaiting message processing", getConsumerId());
+ Exception cause = new JmsOperationTimedOutException("Consumer stop timed out awaiting message processing");
+ if (session.isTransacted() && session.getTransactionContext().isInTransaction(getConsumerId())) {
+ stopRequest.onFailure(cause);
+ stopRequest = null;
+ } else {
+ closeResource(session.getProvider(), cause, false);
+ session.getProvider().pumpToProtonTransport();
+ }
+ }, getDrainTimeout());
+
+ stopRequest = new ScheduledRequest(future, request);
+ } else {
+ stopRequest = request;
+ }
+
+ LOG.trace("Consumer {} stop awaiting queued delivery processing", getConsumerId());
}
} else {
// TODO: We don't actually want the additional messages that could be sent while
@@ -461,6 +483,10 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
@Override
public void processDeliveryUpdates(AmqpProvider provider, Delivery delivery) throws IOException {
+ if(delivery.getDefaultDeliveryState() == null){
+ delivery.setDefaultDeliveryState(Released.getInstance());
+ }
+
if (delivery.isReadable() && !delivery.isPartial()) {
LOG.trace("{} has incoming Message(s).", this);
try {
@@ -492,8 +518,6 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
}
private boolean processDelivery(Delivery incoming) throws Exception {
- incoming.setDefaultDeliveryState(Released.getInstance());
-
JmsMessage message = null;
try {
message = AmqpCodec.decodeMessage(this, getEndpoint().recv()).asJmsMessage();
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
index a16396f..8a169f0 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
@@ -32,6 +32,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
@@ -40,6 +41,7 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.Wait;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
@@ -347,4 +349,75 @@ public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(2000);
}
}
+
+ @Test(timeout=20000)
+ public void testZeroPrefetchConsumerReceiveTimedPullWithInFlightArrivalTimesOutIfNotCompleted() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ // Create a connection with zero prefetch
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0&amqp.drainTimeout=75");
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ // Expected the consumer to attach but NOT send credit
+ testPeer.expectReceiverAttach();
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ // Expect that once receive is called, it flows 1 credit. Give it an initial (ie. more=true) transfer frame with header only.
+ testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.ONE));
+ testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(new HeaderDescribedType(), null, null, null, null, 1, "delivery1", true, 0);
+ // Expect the consumer to be closed when stop times out. Depending on timing (e.g in slow CI), a draining Flow might arrive first, allowing for that.
+ testPeer.optionalFlow(true, false, equalTo(UnsignedInteger.ONE));
+ testPeer.expectDetach(true, true, true);
+ testPeer.expectDispositionThatIsReleasedAndSettled();
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final CountDownLatch done = new CountDownLatch(1);
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ consumer.receive(20);
+ } catch (Throwable t) {
+ error.set(t);
+ } finally {
+ done.countDown();
+ }
+ }
+ });
+
+ assertTrue("Consumer receive task did not complete", done.await(4, TimeUnit.SECONDS));
+
+ Throwable t = error.get();
+ assertNotNull("Consumer receive did not throw as expected", t);
+ assertTrue("Consumer receive did not throw as expected", t instanceof JMSException);
+ } finally {
+ executor.shutdownNow();
+ }
+
+ assertTrue("Consumer should be closed", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisfied() throws Exception {
+ try {
+ consumer.getMessageSelector();
+ return false;
+ } catch (JMSException ex) {
+ return true;
+ }
+ }
+ }, 5000, 10));
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org