You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2019/06/28 11:47:23 UTC

[qpid-jms] 02/02: QPIDJMS-464: ensure the message pull attempt transitions to stop attempt at timeout if message is partially transferred

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git

commit ebf2b444833fc184f81de71f806f6a54fdabe519
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Fri Jun 28 12:35:24 2019 +0100

    QPIDJMS-464: ensure the message pull attempt transitions to stop attempt at timeout if message is partially transferred
---
 .../qpid/jms/provider/amqp/AmqpConsumer.java       |  6 +-
 .../integration/ZeroPrefetchIntegrationTest.java   | 63 ++++++++++++++++
 ...bstractFrameFieldAndPayloadMatchingHandler.java | 18 ++++-
 .../qpid/jms/test/testpeer/FrameHandler.java       |  6 ++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java       | 83 ++++++++++++++++++++--
 5 files changed, 164 insertions(+), 12 deletions(-)

diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 7edbaa7..7bc2be1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -148,10 +148,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         // We need to drain the credit if no message(s) arrive to use it.
         final ScheduledFuture<?> future = getSession().schedule(() -> {
             LOG.trace("Consumer {} running scheduled stop", getConsumerId());
-            if (getEndpoint().getRemoteCredit() != 0) {
-                stop(request);
-                session.getProvider().pumpToProtonTransport(request);
-            }
+            stop(request);
+            session.getProvider().pumpToProtonTransport(request);
         }, timeout);
 
         stopRequest = new ScheduledRequest(future, request);
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
index 93df727..a16396f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
@@ -42,6 +42,7 @@ import javax.jms.TextMessage;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
@@ -284,4 +285,66 @@ public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
+
+    @Test(timeout=20000)
+    public void testZeroPrefetchConsumerReceiveTimedPullWithInFlightArrival() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Create a connection with zero prefetch
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // Expected the consumer to attach but NOT send credit
+            testPeer.expectReceiverAttach();
+
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+             String msgContent = "content";
+            // Expect that once receive is called, it flows 1 credit. Give it an initial transfer frame with header only.
+            testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.ONE));
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(new HeaderDescribedType(), null, null, null, null, 1, "delivery1", true, 0);
+            // Then give it a final transfer with body only, after a delay.
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(msgContent), 1, "delivery1", false, 30);
+            // Expect it to be accepted. Depending on timing (e.g in slow CI), a draining Flow might arrive first, allowing for that.
+            testPeer.optionalFlow(true, false, equalTo(UnsignedInteger.ONE));
+            testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1);
+
+            final AtomicReference<Throwable> error = new AtomicReference<>();
+            final CountDownLatch done = new CountDownLatch(1);
+
+            ExecutorService executor = Executors.newSingleThreadExecutor();
+            try {
+                executor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            Message m = consumer.receive(20);
+
+                            assertNotNull("Message should have been received", m);
+                            assertTrue(m instanceof TextMessage);
+                            assertEquals("Unexpected message content", msgContent, ((TextMessage) m).getText());
+                        } catch (Throwable t) {
+                            error.set(t);
+                        } finally {
+                            done.countDown();
+                        }
+                    }
+                });
+
+                assertTrue("Consumer receive task did not complete", done.await(4, TimeUnit.SECONDS));
+                assertNull("Consumer receive errored", error.get());
+            } finally {
+                executor.shutdownNow();
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
 }
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java
index 101d773..6c4a071 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java
@@ -44,6 +44,8 @@ public abstract class AbstractFrameFieldAndPayloadMatchingHandler extends Abstra
 
     private int _expectedFrameSize;
 
+    private boolean _optional;
+
     protected AbstractFrameFieldAndPayloadMatchingHandler(FrameType frameType,
                                                 int channel,
                                                 int frameSize,
@@ -148,10 +150,20 @@ public abstract class AbstractFrameFieldAndPayloadMatchingHandler extends Abstra
     }
 
     @Override
+    public void setOptional(boolean optional) {
+        _optional = optional;
+    }
+
+    @Override
+    public boolean isOptional() {
+        return _optional;
+    }
+
+    @Override
     public String toString()
     {
-        return "AbstractFrameFieldAndPayloadMatchingHandler [_symbolicDescriptor=" + getSymbolicDescriptor()
-                + ", _expectedChannel=" + expectedChannelString()
-                + "]";
+        return "AbstractFrameFieldAndPayloadMatchingHandler [descriptor=" + getSymbolicDescriptor() + "/" + getNumericDescriptor()
+                + ", expectedChannel=" + expectedChannelString()
+                + (_optional ? ", optional=true]" : "]");
     }
 }
\ No newline at end of file
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java
index 79f9b95..2197a6d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java
@@ -24,4 +24,10 @@ import org.apache.qpid.proton.amqp.DescribedType;
 interface FrameHandler extends Handler
 {
     void frame(int type, int channel, int frameBodySize, DescribedType describedType, Binary payload, TestAmqpPeer peer);
+
+    boolean descriptorMatches(Object descriptor);
+
+    void setOptional(boolean optionalFrame);
+
+    boolean isOptional();
 }
\ No newline at end of file
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 909d0e6..67b2582 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -298,6 +298,20 @@ public class TestAmqpPeer implements AutoCloseable
     void receiveFrame(int type, int channel, int frameSize, DescribedType describedType, Binary payload)
     {
         Handler handler = getFirstHandler();
+
+        while (handler instanceof FrameHandler && ((FrameHandler) handler).isOptional())
+        {
+            FrameHandler frameHandler = (FrameHandler) handler;
+            if(frameHandler.descriptorMatches(describedType.getDescriptor())){
+                LOGGER.info("Optional frame handler matches the descriptor, proceeding to verify it");
+                break;
+            } else {
+                LOGGER.info("Skipping non-matching optional frame handler, received frame descriptor (" + describedType.getDescriptor() + ") does not match handler: " +  frameHandler);
+                removeFirstHandler();
+                handler = getFirstHandler();
+            }
+        }
+
         if(handler == null)
         {
             Object actualDescriptor = describedType.getDescriptor();
@@ -2686,7 +2700,7 @@ public class TestAmqpPeer implements AutoCloseable
                                                                 final DescribedType content,
                                                                 final int nextIncomingDeliveryId) {
 
-        sendTransferToLastOpenedLinkOnLastOpenedSession(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content, nextIncomingDeliveryId, false);
+        sendTransferToLastOpenedLinkOnLastOpenedSession(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content, nextIncomingDeliveryId, null, null, 0);
     }
 
     public void sendTransferToLastOpenedLinkOnLastOpenedSession(final HeaderDescribedType headerDescribedType,
@@ -2695,18 +2709,26 @@ public class TestAmqpPeer implements AutoCloseable
                                                                 final ApplicationPropertiesDescribedType appPropertiesDescribedType,
                                                                 final DescribedType content,
                                                                 final int nextIncomingDeliveryId,
-                                                                final boolean sendSettled) {
+                                                                final String tagAsString,
+                                                                final Boolean more,
+                                                                final int sendDelay) {
         synchronized (_handlersLock) {
             CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
 
-            String tagString = "theDeliveryTag" + nextIncomingDeliveryId;
+            String tagString = tagAsString;
+            if(tagString == null) {
+                tagString = "theDeliveryTag" + nextIncomingDeliveryId;
+            }
+
             Binary dtag = new Binary(tagString.getBytes());
 
             final TransferFrame transferResponse = new TransferFrame()
             .setDeliveryId(UnsignedInteger.valueOf(nextIncomingDeliveryId))
             .setDeliveryTag(dtag)
-            .setMessageFormat(UnsignedInteger.ZERO)
-            .setSettled(sendSettled);
+            .setMessageFormat(UnsignedInteger.ZERO);
+            if(more != null) {
+                transferResponse.setMore(more);
+            }
 
             Binary payload = prepareTransferPayload(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content);
 
@@ -2722,6 +2744,10 @@ public class TestAmqpPeer implements AutoCloseable
                 }
             });
 
+            if(sendDelay != 0) {
+                transferSender.setSendDelay(sendDelay);
+            }
+
             comp.add(transferSender);
         }
     }
@@ -2776,4 +2802,51 @@ public class TestAmqpPeer implements AutoCloseable
 
         runAfterLastHandler(exitEarly);
     }
+
+    public void optionalFlow(final boolean drain, final boolean sendDrainFlowResponse,Matcher<UnsignedInteger> creditMatcher)
+    {
+        final FlowMatcher flowMatcher = new FlowMatcher();
+        flowMatcher.setOptional(true);
+
+        Matcher<Boolean> drainMatcher = null;
+        if(drain)
+        {
+            drainMatcher = equalTo(true);
+        }
+        else
+        {
+            drainMatcher = Matchers.anyOf(equalTo(false), nullValue());
+        }
+
+        flowMatcher.withLinkCredit(creditMatcher);
+        flowMatcher.withDrain(drainMatcher);
+
+        if(drain && sendDrainFlowResponse)
+        {
+            final FlowFrame drainResponse = new FlowFrame();
+            drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded
+            drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded
+            drainResponse.setLinkCredit(UnsignedInteger.ZERO);
+            drainResponse.setDrain(true);
+
+            // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+            final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null);
+            flowResponseSender.setValueProvider(new ValueProvider()
+            {
+                @Override
+                public void setValues()
+                {
+                    flowResponseSender.setChannel(flowMatcher.getActualChannel());
+                    drainResponse.setHandle(flowMatcher.getReceivedHandle());
+                    drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher));
+                    drainResponse.setNextOutgoingId(calculateNewOutgoingId(flowMatcher, 0));
+                    drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
+                }
+            });
+
+            flowMatcher.onCompletion(flowResponseSender);
+        }
+
+        addHandler(flowMatcher);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org