You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2019/06/28 11:47:23 UTC
[qpid-jms] 02/02: QPIDJMS-464: ensure the message pull attempt
transitions to stop attempt at timeout if message is partially transferred
This is an automated email from the ASF dual-hosted git repository.
robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git
commit ebf2b444833fc184f81de71f806f6a54fdabe519
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Fri Jun 28 12:35:24 2019 +0100
QPIDJMS-464: ensure the message pull attempt transitions to stop attempt at timeout if message is partially transferred
---
.../qpid/jms/provider/amqp/AmqpConsumer.java | 6 +-
.../integration/ZeroPrefetchIntegrationTest.java | 63 ++++++++++++++++
...bstractFrameFieldAndPayloadMatchingHandler.java | 18 ++++-
.../qpid/jms/test/testpeer/FrameHandler.java | 6 ++
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 83 ++++++++++++++++++++--
5 files changed, 164 insertions(+), 12 deletions(-)
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 7edbaa7..7bc2be1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -148,10 +148,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
// We need to drain the credit if no message(s) arrive to use it.
final ScheduledFuture<?> future = getSession().schedule(() -> {
LOG.trace("Consumer {} running scheduled stop", getConsumerId());
- if (getEndpoint().getRemoteCredit() != 0) {
- stop(request);
- session.getProvider().pumpToProtonTransport(request);
- }
+ stop(request);
+ session.getProvider().pumpToProtonTransport(request);
}, timeout);
stopRequest = new ScheduledRequest(future, request);
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
index 93df727..a16396f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
@@ -42,6 +42,7 @@ import javax.jms.TextMessage;
import org.apache.qpid.jms.test.QpidJmsTestCase;
import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
@@ -284,4 +285,66 @@ public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(3000);
}
}
+
+ @Test(timeout=20000)
+ public void testZeroPrefetchConsumerReceiveTimedPullWithInFlightArrival() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ // Create a connection with zero prefetch
+ Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
+ connection.start();
+
+ testPeer.expectBegin();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue("myQueue");
+
+ // Expected the consumer to attach but NOT send credit
+ testPeer.expectReceiverAttach();
+
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ String msgContent = "content";
+ // Expect that once receive is called, it flows 1 credit. Give it an initial transfer frame with header only.
+ testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.ONE));
+ testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(new HeaderDescribedType(), null, null, null, null, 1, "delivery1", true, 0);
+ // Then give it a final transfer with body only, after a delay.
+ testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(msgContent), 1, "delivery1", false, 30);
+ // Expect it to be accepted. Depending on timing (e.g in slow CI), a draining Flow might arrive first, allowing for that.
+ testPeer.optionalFlow(true, false, equalTo(UnsignedInteger.ONE));
+ testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1);
+
+ final AtomicReference<Throwable> error = new AtomicReference<>();
+ final CountDownLatch done = new CountDownLatch(1);
+
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ try {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ Message m = consumer.receive(20);
+
+ assertNotNull("Message should have been received", m);
+ assertTrue(m instanceof TextMessage);
+ assertEquals("Unexpected message content", msgContent, ((TextMessage) m).getText());
+ } catch (Throwable t) {
+ error.set(t);
+ } finally {
+ done.countDown();
+ }
+ }
+ });
+
+ assertTrue("Consumer receive task did not complete", done.await(4, TimeUnit.SECONDS));
+ assertNull("Consumer receive errored", error.get());
+ } finally {
+ executor.shutdownNow();
+ }
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(2000);
+ }
+ }
}
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java
index 101d773..6c4a071 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java
@@ -44,6 +44,8 @@ public abstract class AbstractFrameFieldAndPayloadMatchingHandler extends Abstra
private int _expectedFrameSize;
+ private boolean _optional;
+
protected AbstractFrameFieldAndPayloadMatchingHandler(FrameType frameType,
int channel,
int frameSize,
@@ -148,10 +150,20 @@ public abstract class AbstractFrameFieldAndPayloadMatchingHandler extends Abstra
}
@Override
+ public void setOptional(boolean optional) {
+ _optional = optional;
+ }
+
+ @Override
+ public boolean isOptional() {
+ return _optional;
+ }
+
+ @Override
public String toString()
{
- return "AbstractFrameFieldAndPayloadMatchingHandler [_symbolicDescriptor=" + getSymbolicDescriptor()
- + ", _expectedChannel=" + expectedChannelString()
- + "]";
+ return "AbstractFrameFieldAndPayloadMatchingHandler [descriptor=" + getSymbolicDescriptor() + "/" + getNumericDescriptor()
+ + ", expectedChannel=" + expectedChannelString()
+ + (_optional ? ", optional=true]" : "]");
}
}
\ No newline at end of file
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java
index 79f9b95..2197a6d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java
@@ -24,4 +24,10 @@ import org.apache.qpid.proton.amqp.DescribedType;
interface FrameHandler extends Handler
{
void frame(int type, int channel, int frameBodySize, DescribedType describedType, Binary payload, TestAmqpPeer peer);
+
+ boolean descriptorMatches(Object descriptor);
+
+ void setOptional(boolean optionalFrame);
+
+ boolean isOptional();
}
\ No newline at end of file
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 909d0e6..67b2582 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -298,6 +298,20 @@ public class TestAmqpPeer implements AutoCloseable
void receiveFrame(int type, int channel, int frameSize, DescribedType describedType, Binary payload)
{
Handler handler = getFirstHandler();
+
+ while (handler instanceof FrameHandler && ((FrameHandler) handler).isOptional())
+ {
+ FrameHandler frameHandler = (FrameHandler) handler;
+ if(frameHandler.descriptorMatches(describedType.getDescriptor())){
+ LOGGER.info("Optional frame handler matches the descriptor, proceeding to verify it");
+ break;
+ } else {
+ LOGGER.info("Skipping non-matching optional frame handler, received frame descriptor (" + describedType.getDescriptor() + ") does not match handler: " + frameHandler);
+ removeFirstHandler();
+ handler = getFirstHandler();
+ }
+ }
+
if(handler == null)
{
Object actualDescriptor = describedType.getDescriptor();
@@ -2686,7 +2700,7 @@ public class TestAmqpPeer implements AutoCloseable
final DescribedType content,
final int nextIncomingDeliveryId) {
- sendTransferToLastOpenedLinkOnLastOpenedSession(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content, nextIncomingDeliveryId, false);
+ sendTransferToLastOpenedLinkOnLastOpenedSession(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content, nextIncomingDeliveryId, null, null, 0);
}
public void sendTransferToLastOpenedLinkOnLastOpenedSession(final HeaderDescribedType headerDescribedType,
@@ -2695,18 +2709,26 @@ public class TestAmqpPeer implements AutoCloseable
final ApplicationPropertiesDescribedType appPropertiesDescribedType,
final DescribedType content,
final int nextIncomingDeliveryId,
- final boolean sendSettled) {
+ final String tagAsString,
+ final Boolean more,
+ final int sendDelay) {
synchronized (_handlersLock) {
CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
- String tagString = "theDeliveryTag" + nextIncomingDeliveryId;
+ String tagString = tagAsString;
+ if(tagString == null) {
+ tagString = "theDeliveryTag" + nextIncomingDeliveryId;
+ }
+
Binary dtag = new Binary(tagString.getBytes());
final TransferFrame transferResponse = new TransferFrame()
.setDeliveryId(UnsignedInteger.valueOf(nextIncomingDeliveryId))
.setDeliveryTag(dtag)
- .setMessageFormat(UnsignedInteger.ZERO)
- .setSettled(sendSettled);
+ .setMessageFormat(UnsignedInteger.ZERO);
+ if(more != null) {
+ transferResponse.setMore(more);
+ }
Binary payload = prepareTransferPayload(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content);
@@ -2722,6 +2744,10 @@ public class TestAmqpPeer implements AutoCloseable
}
});
+ if(sendDelay != 0) {
+ transferSender.setSendDelay(sendDelay);
+ }
+
comp.add(transferSender);
}
}
@@ -2776,4 +2802,51 @@ public class TestAmqpPeer implements AutoCloseable
runAfterLastHandler(exitEarly);
}
+
+ public void optionalFlow(final boolean drain, final boolean sendDrainFlowResponse,Matcher<UnsignedInteger> creditMatcher)
+ {
+ final FlowMatcher flowMatcher = new FlowMatcher();
+ flowMatcher.setOptional(true);
+
+ Matcher<Boolean> drainMatcher = null;
+ if(drain)
+ {
+ drainMatcher = equalTo(true);
+ }
+ else
+ {
+ drainMatcher = Matchers.anyOf(equalTo(false), nullValue());
+ }
+
+ flowMatcher.withLinkCredit(creditMatcher);
+ flowMatcher.withDrain(drainMatcher);
+
+ if(drain && sendDrainFlowResponse)
+ {
+ final FlowFrame drainResponse = new FlowFrame();
+ drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded
+ drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded
+ drainResponse.setLinkCredit(UnsignedInteger.ZERO);
+ drainResponse.setDrain(true);
+
+ // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+ final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null);
+ flowResponseSender.setValueProvider(new ValueProvider()
+ {
+ @Override
+ public void setValues()
+ {
+ flowResponseSender.setChannel(flowMatcher.getActualChannel());
+ drainResponse.setHandle(flowMatcher.getReceivedHandle());
+ drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher));
+ drainResponse.setNextOutgoingId(calculateNewOutgoingId(flowMatcher, 0));
+ drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
+ }
+ });
+
+ flowMatcher.onCompletion(flowResponseSender);
+ }
+
+ addHandler(flowMatcher);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org