You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2019/06/28 11:47:21 UTC

[qpid-jms] branch master updated (345585a -> ebf2b44)

This is an automated email from the ASF dual-hosted git repository.

robbie pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git.


    from 345585a  QPIDJMS-463: update surefire and jacoco testing plugin versions
     new 9552e17  NO-JIRA: shut down the executors created in some tests, and check for task error
     new ebf2b44  QPIDJMS-464: ensure the message pull attempt transitions to stop attempt at timeout if message is partially transferred

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../qpid/jms/provider/amqp/AmqpConsumer.java       |   6 +-
 .../integration/ZeroPrefetchIntegrationTest.java   | 116 ++++++++++++++++-----
 ...bstractFrameFieldAndPayloadMatchingHandler.java |  18 +++-
 .../qpid/jms/test/testpeer/FrameHandler.java       |   6 ++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java       |  83 ++++++++++++++-
 5 files changed, 193 insertions(+), 36 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-jms] 02/02: QPIDJMS-464: ensure the message pull attempt transitions to stop attempt at timeout if message is partially transferred

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git

commit ebf2b444833fc184f81de71f806f6a54fdabe519
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Fri Jun 28 12:35:24 2019 +0100

    QPIDJMS-464: ensure the message pull attempt transitions to stop attempt at timeout if message is partially transferred
---
 .../qpid/jms/provider/amqp/AmqpConsumer.java       |  6 +-
 .../integration/ZeroPrefetchIntegrationTest.java   | 63 ++++++++++++++++
 ...bstractFrameFieldAndPayloadMatchingHandler.java | 18 ++++-
 .../qpid/jms/test/testpeer/FrameHandler.java       |  6 ++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java       | 83 ++++++++++++++++++++--
 5 files changed, 164 insertions(+), 12 deletions(-)

diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 7edbaa7..7bc2be1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -148,10 +148,8 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
         // We need to drain the credit if no message(s) arrive to use it.
         final ScheduledFuture<?> future = getSession().schedule(() -> {
             LOG.trace("Consumer {} running scheduled stop", getConsumerId());
-            if (getEndpoint().getRemoteCredit() != 0) {
-                stop(request);
-                session.getProvider().pumpToProtonTransport(request);
-            }
+            stop(request);
+            session.getProvider().pumpToProtonTransport(request);
         }, timeout);
 
         stopRequest = new ScheduledRequest(future, request);
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
index 93df727..a16396f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
@@ -42,6 +42,7 @@ import javax.jms.TextMessage;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.HeaderDescribedType;
 import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
 import org.apache.qpid.jms.test.testpeer.matchers.AcceptedMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.ModifiedMatcher;
@@ -284,4 +285,66 @@ public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(3000);
         }
     }
+
+    @Test(timeout=20000)
+    public void testZeroPrefetchConsumerReceiveTimedPullWithInFlightArrival() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            // Create a connection with zero prefetch
+            Connection connection = testFixture.establishConnecton(testPeer, "?jms.prefetchPolicy.all=0");
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            // Expected the consumer to attach but NOT send credit
+            testPeer.expectReceiverAttach();
+
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+             String msgContent = "content";
+            // Expect that once receive is called, it flows 1 credit. Give it an initial transfer frame with header only.
+            testPeer.expectLinkFlow(false, equalTo(UnsignedInteger.ONE));
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(new HeaderDescribedType(), null, null, null, null, 1, "delivery1", true, 0);
+            // Then give it a final transfer with body only, after a delay.
+            testPeer.sendTransferToLastOpenedLinkOnLastOpenedSession(null, null, null, null, new AmqpValueDescribedType(msgContent), 1, "delivery1", false, 30);
+            // Expect it to be accepted. Depending on timing (e.g in slow CI), a draining Flow might arrive first, allowing for that.
+            testPeer.optionalFlow(true, false, equalTo(UnsignedInteger.ONE));
+            testPeer.expectDisposition(true, new AcceptedMatcher(), 1, 1);
+
+            final AtomicReference<Throwable> error = new AtomicReference<>();
+            final CountDownLatch done = new CountDownLatch(1);
+
+            ExecutorService executor = Executors.newSingleThreadExecutor();
+            try {
+                executor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            Message m = consumer.receive(20);
+
+                            assertNotNull("Message should have been received", m);
+                            assertTrue(m instanceof TextMessage);
+                            assertEquals("Unexpected message content", msgContent, ((TextMessage) m).getText());
+                        } catch (Throwable t) {
+                            error.set(t);
+                        } finally {
+                            done.countDown();
+                        }
+                    }
+                });
+
+                assertTrue("Consumer receive task did not complete", done.await(4, TimeUnit.SECONDS));
+                assertNull("Consumer receive errored", error.get());
+            } finally {
+                executor.shutdownNow();
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
 }
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java
index 101d773..6c4a071 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java
@@ -44,6 +44,8 @@ public abstract class AbstractFrameFieldAndPayloadMatchingHandler extends Abstra
 
     private int _expectedFrameSize;
 
+    private boolean _optional;
+
     protected AbstractFrameFieldAndPayloadMatchingHandler(FrameType frameType,
                                                 int channel,
                                                 int frameSize,
@@ -148,10 +150,20 @@ public abstract class AbstractFrameFieldAndPayloadMatchingHandler extends Abstra
     }
 
     @Override
+    public void setOptional(boolean optional) {
+        _optional = optional;
+    }
+
+    @Override
+    public boolean isOptional() {
+        return _optional;
+    }
+
+    @Override
     public String toString()
     {
-        return "AbstractFrameFieldAndPayloadMatchingHandler [_symbolicDescriptor=" + getSymbolicDescriptor()
-                + ", _expectedChannel=" + expectedChannelString()
-                + "]";
+        return "AbstractFrameFieldAndPayloadMatchingHandler [descriptor=" + getSymbolicDescriptor() + "/" + getNumericDescriptor()
+                + ", expectedChannel=" + expectedChannelString()
+                + (_optional ? ", optional=true]" : "]");
     }
 }
\ No newline at end of file
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java
index 79f9b95..2197a6d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/FrameHandler.java
@@ -24,4 +24,10 @@ import org.apache.qpid.proton.amqp.DescribedType;
 interface FrameHandler extends Handler
 {
     void frame(int type, int channel, int frameBodySize, DescribedType describedType, Binary payload, TestAmqpPeer peer);
+
+    boolean descriptorMatches(Object descriptor);
+
+    void setOptional(boolean optionalFrame);
+
+    boolean isOptional();
 }
\ No newline at end of file
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 909d0e6..67b2582 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -298,6 +298,20 @@ public class TestAmqpPeer implements AutoCloseable
     void receiveFrame(int type, int channel, int frameSize, DescribedType describedType, Binary payload)
     {
         Handler handler = getFirstHandler();
+
+        while (handler instanceof FrameHandler && ((FrameHandler) handler).isOptional())
+        {
+            FrameHandler frameHandler = (FrameHandler) handler;
+            if(frameHandler.descriptorMatches(describedType.getDescriptor())){
+                LOGGER.info("Optional frame handler matches the descriptor, proceeding to verify it");
+                break;
+            } else {
+                LOGGER.info("Skipping non-matching optional frame handler, received frame descriptor (" + describedType.getDescriptor() + ") does not match handler: " +  frameHandler);
+                removeFirstHandler();
+                handler = getFirstHandler();
+            }
+        }
+
         if(handler == null)
         {
             Object actualDescriptor = describedType.getDescriptor();
@@ -2686,7 +2700,7 @@ public class TestAmqpPeer implements AutoCloseable
                                                                 final DescribedType content,
                                                                 final int nextIncomingDeliveryId) {
 
-        sendTransferToLastOpenedLinkOnLastOpenedSession(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content, nextIncomingDeliveryId, false);
+        sendTransferToLastOpenedLinkOnLastOpenedSession(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content, nextIncomingDeliveryId, null, null, 0);
     }
 
     public void sendTransferToLastOpenedLinkOnLastOpenedSession(final HeaderDescribedType headerDescribedType,
@@ -2695,18 +2709,26 @@ public class TestAmqpPeer implements AutoCloseable
                                                                 final ApplicationPropertiesDescribedType appPropertiesDescribedType,
                                                                 final DescribedType content,
                                                                 final int nextIncomingDeliveryId,
-                                                                final boolean sendSettled) {
+                                                                final String tagAsString,
+                                                                final Boolean more,
+                                                                final int sendDelay) {
         synchronized (_handlersLock) {
             CompositeAmqpPeerRunnable comp = insertCompsiteActionForLastHandler();
 
-            String tagString = "theDeliveryTag" + nextIncomingDeliveryId;
+            String tagString = tagAsString;
+            if(tagString == null) {
+                tagString = "theDeliveryTag" + nextIncomingDeliveryId;
+            }
+
             Binary dtag = new Binary(tagString.getBytes());
 
             final TransferFrame transferResponse = new TransferFrame()
             .setDeliveryId(UnsignedInteger.valueOf(nextIncomingDeliveryId))
             .setDeliveryTag(dtag)
-            .setMessageFormat(UnsignedInteger.ZERO)
-            .setSettled(sendSettled);
+            .setMessageFormat(UnsignedInteger.ZERO);
+            if(more != null) {
+                transferResponse.setMore(more);
+            }
 
             Binary payload = prepareTransferPayload(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, appPropertiesDescribedType, content);
 
@@ -2722,6 +2744,10 @@ public class TestAmqpPeer implements AutoCloseable
                 }
             });
 
+            if(sendDelay != 0) {
+                transferSender.setSendDelay(sendDelay);
+            }
+
             comp.add(transferSender);
         }
     }
@@ -2776,4 +2802,51 @@ public class TestAmqpPeer implements AutoCloseable
 
         runAfterLastHandler(exitEarly);
     }
+
+    public void optionalFlow(final boolean drain, final boolean sendDrainFlowResponse,Matcher<UnsignedInteger> creditMatcher)
+    {
+        final FlowMatcher flowMatcher = new FlowMatcher();
+        flowMatcher.setOptional(true);
+
+        Matcher<Boolean> drainMatcher = null;
+        if(drain)
+        {
+            drainMatcher = equalTo(true);
+        }
+        else
+        {
+            drainMatcher = Matchers.anyOf(equalTo(false), nullValue());
+        }
+
+        flowMatcher.withLinkCredit(creditMatcher);
+        flowMatcher.withDrain(drainMatcher);
+
+        if(drain && sendDrainFlowResponse)
+        {
+            final FlowFrame drainResponse = new FlowFrame();
+            drainResponse.setOutgoingWindow(UnsignedInteger.ZERO); //TODO: shouldnt be hard coded
+            drainResponse.setIncomingWindow(UnsignedInteger.valueOf(Integer.MAX_VALUE)); //TODO: shouldnt be hard coded
+            drainResponse.setLinkCredit(UnsignedInteger.ZERO);
+            drainResponse.setDrain(true);
+
+            // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+            final FrameSender flowResponseSender = new FrameSender(this, FrameType.AMQP, -1, drainResponse, null);
+            flowResponseSender.setValueProvider(new ValueProvider()
+            {
+                @Override
+                public void setValues()
+                {
+                    flowResponseSender.setChannel(flowMatcher.getActualChannel());
+                    drainResponse.setHandle(flowMatcher.getReceivedHandle());
+                    drainResponse.setDeliveryCount(calculateNewDeliveryCount(flowMatcher));
+                    drainResponse.setNextOutgoingId(calculateNewOutgoingId(flowMatcher, 0));
+                    drainResponse.setNextIncomingId(flowMatcher.getReceivedNextOutgoingId());
+                }
+            });
+
+            flowMatcher.onCompletion(flowResponseSender);
+        }
+
+        addHandler(flowMatcher);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[qpid-jms] 01/02: NO-JIRA: shut down the executors created in some tests, and check for task error

Posted by ro...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

robbie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/qpid-jms.git

commit 9552e177e0aa396f17234812db1fa19ec946aefa
Author: Robbie Gemmell <ro...@apache.org>
AuthorDate: Fri Jun 28 10:50:45 2019 +0100

    NO-JIRA: shut down the executors created in some tests, and check for task error
---
 .../integration/ZeroPrefetchIntegrationTest.java   | 53 ++++++++++++----------
 1 file changed, 29 insertions(+), 24 deletions(-)

diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
index d902fdc..93df727 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ZeroPrefetchIntegrationTest.java
@@ -29,7 +29,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Connection;
 import javax.jms.Message;
@@ -242,37 +242,42 @@ public class ZeroPrefetchIntegrationTest extends QpidJmsTestCase {
                 testPeer.expectLinkFlow(false, false, equalTo(UnsignedInteger.ONE));
             }
 
-            final AtomicBoolean error = new AtomicBoolean(false);
+            final AtomicReference<Throwable> error = new AtomicReference<>();
             final CountDownLatch done = new CountDownLatch(1);
 
             ExecutorService executor = Executors.newSingleThreadExecutor();
-            executor.execute(new Runnable() {
-
-                @Override
-                public void run() {
-                    try {
-                        if (timeout < 0) {
-                            consumer.receiveNoWait();
-                        } else if (timeout == 0) {
-                            consumer.receive();
-                        } else {
-                            consumer.receive(10000);
+            try {
+                executor.execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            if (timeout < 0) {
+                                consumer.receiveNoWait();
+                            } else if (timeout == 0) {
+                                consumer.receive();
+                            } else {
+                                consumer.receive(10000);
+                            }
+                        } catch (Throwable t) {
+                            error.set(t);
+                        } finally {
+                            done.countDown();
                         }
-                    } catch (Exception ex) {
-                        error.set(true);
-                    } finally {
-                        done.countDown();
                     }
-                }
-            });
+                });
 
-            testPeer.waitForAllHandlersToComplete(3000);
-            testPeer.expectEnd();
-            testPeer.expectClose();
+                testPeer.waitForAllHandlersToComplete(3000);
+                testPeer.expectEnd();
+
+                session.close();
 
-            session.close();
+                assertTrue("Consumer did not unblock", done.await(10, TimeUnit.SECONDS));
+                assertNull("Consumer receive errored", error.get());
+            } finally {
+                executor.shutdownNow();
+            }
 
-            assertTrue("Consumer did not unblock", done.await(10, TimeUnit.SECONDS));
+            testPeer.expectClose();
 
             connection.close();
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org