You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/09/12 17:40:55 UTC
[2/2] qpid-proton git commit: PROTON-1299: update getRemoteCredit()
handling to account for the differing behaviour of senders and receivers
PROTON-1299: update getRemoteCredit() handling to account for the differing behaviour of senders and receivers
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a52c331b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a52c331b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a52c331b
Branch: refs/heads/master
Commit: a52c331bf12930a83314bd818df7391ce201400f
Parents: 166ce2d
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Sep 12 18:39:11 2016 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Sep 12 18:39:11 2016 +0100
----------------------------------------------------------------------
.../org/apache/qpid/proton/engine/Link.java | 30 +++
.../qpid/proton/engine/impl/LinkImpl.java | 6 -
.../qpid/proton/engine/impl/ReceiverImpl.java | 8 +
.../qpid/proton/engine/impl/SenderImpl.java | 8 +
.../qpid/proton/systemtests/LinkTest.java | 211 +++++++++++++++++++
5 files changed, 257 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a52c331b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
index 1b214bc..634f3e0 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
@@ -132,8 +132,30 @@ public interface Link extends Endpoint
public Link next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote);
+ /**
+ * Gets the credit balance for a link.
+ *
+ * Note that a sending link may still be used to send deliveries even if
+ * link credit is/reaches zero, however those deliveries will end up being
+ * {@link #getQueued() queued} by the link until enough credit is obtained
+ * from the receiver to send them over the wire. In this case the balance
+ * reported will go negative.
+ *
+ * @return the credit balance for the link
+ */
public int getCredit();
+
+ /**
+ * Gets the number of queued messages for a link.
+ *
+ * Links may queue deliveries for a number of reasons, for example there may be insufficient
+ * {@link #getCredit() credit} to send them to the receiver, they may not have yet had a chance
+ * to be written to the wire, or the receiving application has simply not yet processed them.
+ *
+ * @return the queued message count for the link
+ */
public int getQueued();
+
public int getUnsettled();
public Session getSession();
@@ -207,7 +229,15 @@ public interface Link extends Endpoint
public int drained();
+ /**
+ * Returns a [locally generated] view of credit at the remote peer by considering the
+ * current link {@link #getCredit() credit} count as well as the effect of
+ * any locally {@link #getQueued() queued} messages.
+ *
+ * @return view of effective remote credit
+ */
public int getRemoteCredit();
+
public boolean getDrain();
public void detach();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a52c331b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
index 8a2acf0..63e9ddd 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
@@ -431,12 +431,6 @@ public abstract class LinkImpl extends EndpointImpl implements Link
}
@Override
- public int getRemoteCredit()
- {
- return _credit - _queued;
- }
-
- @Override
public DeliveryImpl head()
{
return _head;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a52c331b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
index 39e21d5..a3a01f7 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
@@ -136,4 +136,12 @@ public class ReceiverImpl extends LinkImpl implements Receiver
modified();
_drainFlagMode = true;
}
+
+ @Override
+ public int getRemoteCredit()
+ {
+ // Credit is only decremented once advance is called on a received message,
+ // so we also need to consider the queued count.
+ return getCredit() - getQueued();
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a52c331b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
index 38ba043..7cf605f 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
@@ -117,4 +117,12 @@ public class SenderImpl extends LinkImpl implements Sender
advance();
}*/
}
+
+ @Override
+ public int getRemoteCredit()
+ {
+ // Credit is decremented as soon as advance is called on a send,
+ // so we need only consider the credit count, not the queued count.
+ return getCredit();
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a52c331b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
index 0711368..10b509e 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
@@ -23,26 +23,36 @@ import static org.apache.qpid.proton.engine.EndpointState.ACTIVE;
import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED;
import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
import org.junit.Test;
public class LinkTest extends EngineTestBase
{
private static final Logger LOGGER = Logger.getLogger(LinkTest.class.getName());
+ private static final int BUFFER_SIZE = 4096;
+
private static final Symbol RCV_PROP = Symbol.valueOf("ReceiverPropName");
private static final Integer RCV_PROP_VAL = 1234;
private static final Symbol SND_PROP = Symbol.valueOf("SenderPropName");
@@ -157,4 +167,205 @@ public class LinkTest extends EngineTestBase
assertTrue("Client remote properties lack expected key: " + SND_PROP, clientRemoteProperties.containsKey(SND_PROP));
assertEquals("Client remote properties contain unexpected value for key: " + SND_PROP, SND_PROP_VAL, clientRemoteProperties.get(SND_PROP));
}
+
+ /**
+ * Test the {@link Link#getCredit()}, {@link Link#getQueued()}, and
+ * {@link Link#getRemoteCredit()} methods behave as expected when sending
+ * from server and receiving on client links.
+ *
+ * @throws Exception
+ * if something unexpected occurs
+ */
+ @Test
+ public void testLinkCreditState() throws Exception
+ {
+ LOGGER.fine(bold("======== About to create transports"));
+
+ getClient().transport = Proton.transport();
+ ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX);
+
+ getServer().transport = Proton.transport();
+ ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX);
+
+ doOutputInputCycle();
+
+ getClient().connection = Proton.connection();
+ getClient().transport.bind(getClient().connection);
+
+ getServer().connection = Proton.connection();
+ getServer().transport.bind(getServer().connection);
+
+ LOGGER.fine(bold("======== About to open connections"));
+ getClient().connection.open();
+ getServer().connection.open();
+
+ doOutputInputCycle();
+
+ LOGGER.fine(bold("======== About to open sessions"));
+ getClient().session = getClient().connection.session();
+ getClient().session.open();
+
+ pumpClientToServer();
+
+ getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
+ assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
+
+ getServer().session.open();
+ assertEndpointState(getServer().session, ACTIVE, ACTIVE);
+
+ pumpServerToClient();
+ assertEndpointState(getClient().session, ACTIVE, ACTIVE);
+
+ LOGGER.fine(bold("======== About to create reciever"));
+
+ getClient().source = new Source();
+ getClient().source.setAddress(_sourceAddress);
+
+ getClient().target = new Target();
+ getClient().target.setAddress(null);
+
+ getClient().receiver = getClient().session.receiver("link1");
+ getClient().receiver.setTarget(getClient().target);
+ getClient().receiver.setSource(getClient().source);
+
+ getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+ getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+
+ assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED);
+
+ getClient().receiver.open();
+ assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED);
+
+ pumpClientToServer();
+
+ LOGGER.fine(bold("======== About to set up implicitly created sender"));
+
+ getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
+
+ getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode());
+ getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode());
+
+ org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = getServer().sender.getRemoteSource();
+ getServer().sender.setSource(serverRemoteSource);
+
+ assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE);
+ getServer().sender.open();
+
+ assertEndpointState(getServer().sender, ACTIVE, ACTIVE);
+
+ pumpServerToClient();
+
+ assertEndpointState(getClient().receiver, ACTIVE, ACTIVE);
+
+ LOGGER.fine(bold("======== About to flow credit"));
+
+ // Verify credit state
+ assertLinkCreditState(getServer().sender, 0, 0, 0);
+ assertLinkCreditState(getClient().receiver, 0, 0, 0);
+
+ int messagCount = 4;
+ getClient().receiver.flow(messagCount);
+
+ // Verify credit state
+ assertLinkCreditState(getServer().sender, 0, 0, 0);
+ assertLinkCreditState(getClient().receiver, 4, 0, 4);
+
+ pumpClientToServer();
+
+ // Verify credit state
+ assertLinkCreditState(getServer().sender, 4, 0, 4);
+ assertLinkCreditState(getClient().receiver, 4, 0, 4);
+
+ LOGGER.fine(bold("======== About to create messages and send to the client"));
+
+ // 'Send' and verify credit state
+ sendMessageToClient("delivery1", "Msg1");
+ assertLinkCreditState(getServer().sender, 3, 1, 3);
+ assertLinkCreditState(getClient().receiver, 4, 0, 4);
+
+ // 'Send' and verify credit state
+ sendMessageToClient("delivery2", "Msg2");
+ assertLinkCreditState(getServer().sender, 2, 2, 2);
+ assertLinkCreditState(getClient().receiver, 4, 0, 4);
+
+ // Pump to the client to send messages 'on the wire', verify new state, process messages
+ pumpServerToClient();
+
+ LOGGER.fine(bold("======== About to process the messages on the client"));
+
+ assertLinkCreditState(getServer().sender, 2, 0, 2);
+ assertLinkCreditState(getClient().receiver, 4, 2, 2);
+
+ receiveMessageFromServer("delivery1", "Msg1");
+
+ assertLinkCreditState(getServer().sender, 2, 0, 2);
+ assertLinkCreditState(getClient().receiver, 3, 1, 2);
+
+ receiveMessageFromServer("delivery2", "Msg2");
+
+ assertLinkCreditState(getServer().sender, 2, 0, 2);
+ assertLinkCreditState(getClient().receiver, 2, 0, 2);
+ }
+
+ void assertLinkCreditState(Link link, int credit, int queued, int remoteCredit)
+ {
+ assertEquals("Unexpected credit", credit, link.getCredit());
+ assertEquals("Unexpected queued", queued, link.getQueued());
+ assertEquals("Unexpected remote credit", remoteCredit, link.getRemoteCredit());
+ }
+
+ private Delivery receiveMessageFromServer(String deliveryTag, String messageContent)
+ {
+ Delivery delivery = getClient().connection.getWorkHead();
+
+ assertTrue(Arrays.equals(deliveryTag.getBytes(StandardCharsets.UTF_8), delivery.getTag()));
+ assertEquals("The received delivery should be on our receiver",
+ getClient().receiver, delivery.getLink());
+
+ assertNull(delivery.getLocalState());
+ assertNull(delivery.getRemoteState());
+
+ assertFalse(delivery.isPartial());
+ assertTrue(delivery.isReadable());
+
+ byte[] received = new byte[BUFFER_SIZE];
+ int len = getClient().receiver.recv(received, 0, BUFFER_SIZE);
+
+ assertTrue("given array was too small", len < BUFFER_SIZE);
+
+ Message m = Proton.message();
+ m.decode(received, 0, len);
+
+ Object messageBody = ((AmqpValue)m.getBody()).getValue();
+ assertEquals("Unexpected message content", messageContent, messageBody);
+
+ boolean receiverAdvanced = getClient().receiver.advance();
+ assertTrue("receiver has not advanced", receiverAdvanced);
+
+ return delivery;
+ }
+
+ private Delivery sendMessageToClient(String deliveryTag, String messageContent)
+ {
+ byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
+
+ Message m = Proton.message();
+ m.setBody(new AmqpValue(messageContent));
+
+ byte[] encoded = new byte[BUFFER_SIZE];
+ int len = m.encode(encoded, 0, BUFFER_SIZE);
+
+ assertTrue("given array was too small", len < BUFFER_SIZE);
+
+ Delivery serverDelivery = getServer().sender.delivery(tag);
+
+ int sent = getServer().sender.send(encoded, 0, len);
+
+ assertEquals("sender unable to send all data at once as assumed for simplicity", len, sent);
+
+ boolean senderAdvanced = getServer().sender.advance();
+ assertTrue("sender has not advanced", senderAdvanced);
+
+ return serverDelivery;
+ }
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org