You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/03/24 21:43:44 UTC
qpid-jms git commit: QPIDJMS-159 Message producer respond to drain of
credit.
Repository: qpid-jms
Updated Branches:
refs/heads/master f8470ec1b -> acecfbe19
QPIDJMS-159 Message producer respond to drain of credit.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/acecfbe1
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/acecfbe1
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/acecfbe1
Branch: refs/heads/master
Commit: acecfbe194036ffdf3d6ee11b6a945b4520c7711
Parents: f8470ec
Author: Timothy Bish <ta...@gmail.com>
Authored: Thu Mar 24 16:43:35 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Thu Mar 24 16:43:35 2016 -0400
----------------------------------------------------------------------
.../jms/provider/amqp/AmqpFixedProducer.java | 5 ++
.../integration/ProducerIntegrationTest.java | 46 +++++++++++++++++
.../qpid/jms/test/testpeer/TestAmqpPeer.java | 54 ++++++++++++++++++++
3 files changed, 105 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/acecfbe1/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index f548096..3dbb85a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -201,6 +201,11 @@ public class AmqpFixedProducer extends AmqpProducer {
}
}
+ // If a drain was requested, we just sent what we had so respond with drained
+ if (getEndpoint().getDrain()) {
+ getEndpoint().drained();
+ }
+
// Once the pending sends queue is drained we can propagate the close request.
if (blocked.isEmpty() && isAwaitingClose() && !isClosed()) {
super.close(closeRequest);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/acecfbe1/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index f65a47a..4b35a4f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -68,6 +68,7 @@ import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -1248,4 +1249,49 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
testPeer.waitForAllHandlersToComplete(1000);
}
}
+
+ @Test(timeout = 20000)
+ public void testCreditDrainedAfterSend() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+ JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer);
+ connection.setSendTimeout(500);
+
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue destination = session.createQueue(getTestName());
+ MessageProducer producer = session.createProducer(destination);
+
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true);
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+ // After the first send lets drain off the credit from the sender asking for one
+ // more message if it has one.
+ testPeer.expectTransferRespondWithDrain(messageMatcher, 1);
+ testPeer.expectLinkFlow(true, false, Matchers.equalTo(UnsignedInteger.ZERO));
+ testPeer.expectDetach(true, true, true);
+ testPeer.expectClose();
+
+ producer.send(session.createMessage());
+
+ // We don't have any credit now since we were drained, so the send should
+ // block until more credit is issued.
+ try {
+ producer.send(session.createMessage());
+ fail("Should have timed out waiting for credit to send.");
+ } catch (JmsSendTimedOutException jmsEx) {
+ LOG.info("Caught expected send timeout.");
+ }
+
+ producer.close();
+
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/acecfbe1/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index 7ffdb2e..296c108 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -1549,6 +1549,60 @@ public class TestAmqpPeer implements AutoCloseable
addHandler(transferMatcher);
}
+ public void expectTransferRespondWithDrain(Matcher<Binary> expectedPayloadMatcher, int drainAmount)
+ {
+ Matcher<Boolean> settledMatcher = Matchers.anyOf(equalTo(false), nullValue());
+
+ final TransferMatcher transferMatcher = new TransferMatcher();
+ transferMatcher.setPayloadMatcher(expectedPayloadMatcher);
+ transferMatcher.withSettled(settledMatcher);
+ transferMatcher.withState(nullValue());
+
+ CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable();
+ final DispositionFrame dispositionResponse = new DispositionFrame().setRole(Role.RECEIVER).setSettled(true).setState(new Accepted());
+
+ // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+ final FrameSender dispositionFrameSender = new FrameSender(this, FrameType.AMQP, -1, dispositionResponse, null);
+ dispositionFrameSender.setValueProvider(new ValueProvider()
+ {
+ @Override
+ public void setValues()
+ {
+ dispositionFrameSender.setChannel(transferMatcher.getActualChannel());
+ dispositionResponse.setFirst(transferMatcher.getReceivedDeliveryId());
+ }
+ });
+
+ final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
+ .setIncomingWindow(UnsignedInteger.valueOf(2048))
+ .setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded
+ .setOutgoingWindow(UnsignedInteger.valueOf(2048))
+ .setLinkCredit(UnsignedInteger.valueOf(drainAmount));
+
+ // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder.
+ final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null);
+ flowFrameSender.setValueProvider(new ValueProvider()
+ {
+ @Override
+ public void setValues()
+ {
+ flowFrameSender.setChannel(transferMatcher.getActualChannel());
+ flowFrame.setHandle(transferMatcher.getReceivedHandle());
+ flowFrame.setDeliveryCount(UnsignedInteger.ONE);
+ flowFrame.setDrain(true);
+ }
+ });
+
+ flowFrameSender.setSendDelay(0);
+
+ composite.add(flowFrameSender);
+ composite.add(dispositionFrameSender);
+
+ transferMatcher.onCompletion(composite);
+
+ addHandler(transferMatcher);
+ }
+
public void expectDeclare(Binary txnId)
{
TransferPayloadCompositeMatcher declareMatcher = new TransferPayloadCompositeMatcher();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org