You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/03/24 21:43:44 UTC

qpid-jms git commit: QPIDJMS-159 Message producer respond to drain of credit.

Repository: qpid-jms
Updated Branches:
  refs/heads/master f8470ec1b -> acecfbe19


QPIDJMS-159 Message producer respond to drain of credit.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/acecfbe1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/acecfbe1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/acecfbe1

Branch: refs/heads/master
Commit: acecfbe194036ffdf3d6ee11b6a945b4520c7711
Parents: f8470ec
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Mar 24 16:43:35 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Mar 24 16:43:35 2016 -0400

----------------------------------------------------------------------
 .../jms/provider/amqp/AmqpFixedProducer.java    |  5 ++
 .../integration/ProducerIntegrationTest.java    | 46 +++++++++++++++++
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    | 54 ++++++++++++++++++++
 3 files changed, 105 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/acecfbe1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index f548096..3dbb85a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -201,6 +201,11 @@ public class AmqpFixedProducer extends AmqpProducer {
             }
         }
 
+        // If a drain was requested, we just sent what we had so respond with drained
+        if (getEndpoint().getDrain()) {
+            getEndpoint().drained();
+        }
+
         // Once the pending sends queue is drained we can propagate the close request.
         if (blocked.isEmpty() && isAwaitingClose() && !isClosed()) {
             super.close(closeRequest);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/acecfbe1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index f65a47a..4b35a4f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -68,6 +68,7 @@ import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
 import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -1248,4 +1249,49 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
             testPeer.waitForAllHandlersToComplete(1000);
         }
     }
+
+    @Test(timeout = 20000)
+    public void testCreditDrainedAfterSend() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+            connection.setSendTimeout(500);
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue destination = session.createQueue(getTestName());
+            MessageProducer producer = session.createProducer(destination);
+
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            // After the first send lets drain off the credit from the sender asking for one
+            // more message if it has one.
+            testPeer.expectTransferRespondWithDrain(messageMatcher, 1);
+            testPeer.expectLinkFlow(true, false, Matchers.equalTo(UnsignedInteger.ZERO));
+            testPeer.expectDetach(true, true, true);
+            testPeer.expectClose();
+
+            producer.send(session.createMessage());
+
+            // We don't have any credit now since we were drained, so the send should
+            // block until more credit is issued.
+            try {
+                producer.send(session.createMessage());
+                fail("Should have timed out waiting for credit to send.");
+            } catch (JmsSendTimedOutException jmsEx) {
+                LOG.info("Caught expected send timeout.");
+            }
+
+            producer.close();
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/acecfbe1/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 7ffdb2e..296c108 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1549,6 +1549,60 @@ public class TestAmqpPeer implements AutoCloseable
         addHandler(transferMatcher);
     }
 
+    public void expectTransferRespondWithDrain(Matcher<Binary> expectedPayloadMatcher, int drainAmount)
+    {
+        Matcher<Boolean> settledMatcher = Matchers.anyOf(equalTo(false), nullValue());
+
+        final TransferMatcher transferMatcher = new TransferMatcher();
+        transferMatcher.setPayloadMatcher(expectedPayloadMatcher);
+        transferMatcher.withSettled(settledMatcher);
+        transferMatcher.withState(nullValue());
+
+        CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
+        final DispositionFrame dispositionResponse = new DispositionFrame().setRole(Role.RECEIVER).setSettled(true).setState(new Accepted());
+
+        // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+        final FrameSender dispositionFrameSender = new FrameSender(this, FrameType.AMQP, -1, dispositionResponse, null);
+        dispositionFrameSender.setValueProvider(new ValueProvider()
+        {
+            @Override
+            public void setValues()
+            {
+                dispositionFrameSender.setChannel(transferMatcher.getActualChannel());
+                dispositionResponse.setFirst(transferMatcher.getReceivedDeliveryId());
+            }
+        });
+
+        final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
+            .setIncomingWindow(UnsignedInteger.valueOf(2048))
+            .setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
+            .setOutgoingWindow(UnsignedInteger.valueOf(2048))
+            .setLinkCredit(UnsignedInteger.valueOf(drainAmount));
+
+        // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+        final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null);
+        flowFrameSender.setValueProvider(new ValueProvider()
+        {
+            @Override
+            public void setValues()
+            {
+                flowFrameSender.setChannel(transferMatcher.getActualChannel());
+                flowFrame.setHandle(transferMatcher.getReceivedHandle());
+                flowFrame.setDeliveryCount(UnsignedInteger.ONE);
+                flowFrame.setDrain(true);
+            }
+        });
+
+        flowFrameSender.setSendDelay(0);
+
+        composite.add(flowFrameSender);
+        composite.add(dispositionFrameSender);
+
+        transferMatcher.onCompletion(composite);
+
+        addHandler(transferMatcher);
+    }
+
     public void expectDeclare(Binary txnId)
     {
         TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org