You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/09/12 17:40:54 UTC

[1/2] qpid-proton git commit: NO-JIRA: remove unused imports

Repository: qpid-proton
Updated Branches:
  refs/heads/master 21b0b76d0 -> a52c331bf


NO-JIRA: remove unused imports


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/166ce2dd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/166ce2dd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/166ce2dd

Branch: refs/heads/master
Commit: 166ce2dd1f5c4c82d4183ca818dc6e148fef1b2c
Parents: 21b0b76
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Sep 12 11:34:56 2016 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Sep 12 11:34:56 2016 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java | 2 --
 .../main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java   | 2 --
 2 files changed, 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/166ce2dd/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
index defb78b..39e21d5 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
@@ -20,9 +20,7 @@
  */
 package org.apache.qpid.proton.engine.impl;
 
-import java.util.Iterator;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
-import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.Receiver;
 
 public class ReceiverImpl extends LinkImpl implements Receiver

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/166ce2dd/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
index 38fb5f6..38ba043 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
@@ -20,8 +20,6 @@
  */
 package org.apache.qpid.proton.engine.impl;
 
-import java.util.Iterator;
-import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Sender;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-proton git commit: PROTON-1299: update getRemoteCredit() handling to account for the differing behaviour of senders and receivers

Posted by ro...@apache.org.
PROTON-1299: update getRemoteCredit() handling to account for the differing behaviour of senders and receivers


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a52c331b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a52c331b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a52c331b

Branch: refs/heads/master
Commit: a52c331bf12930a83314bd818df7391ce201400f
Parents: 166ce2d
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Sep 12 18:39:11 2016 +0100
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Sep 12 18:39:11 2016 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/proton/engine/Link.java     |  30 +++
 .../qpid/proton/engine/impl/LinkImpl.java       |   6 -
 .../qpid/proton/engine/impl/ReceiverImpl.java   |   8 +
 .../qpid/proton/engine/impl/SenderImpl.java     |   8 +
 .../qpid/proton/systemtests/LinkTest.java       | 211 +++++++++++++++++++
 5 files changed, 257 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a52c331b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
index 1b214bc..634f3e0 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
@@ -132,8 +132,30 @@ public interface Link extends Endpoint
 
     public Link next(EnumSet<EndpointState> local, EnumSet<EndpointState> remote);
 
+    /**
+     * Gets the credit balance for a link.
+     *
+     * Note that a sending link may still be used to send deliveries even if
+     * link credit is/reaches zero, however those deliveries will end up being
+     * {@link #getQueued() queued} by the link until enough credit is obtained
+     * from the receiver to send them over the wire. In this case the balance
+     * reported will go negative.
+     *
+     * @return the credit balance for the link
+     */
     public int getCredit();
+
+    /**
+     * Gets the number of queued messages for a link.
+     *
+     * Links may queue deliveries for a number of reasons, for example there may be insufficient
+     * {@link #getCredit() credit} to send them to the receiver, they may not have yet had a chance
+     * to be written to the wire, or the receiving application has simply not yet processed them.
+     *
+     * @return the queued message count for the link
+     */
     public int getQueued();
+
     public int getUnsettled();
 
     public Session getSession();
@@ -207,7 +229,15 @@ public interface Link extends Endpoint
 
     public int drained();
 
+    /**
+     * Returns a [locally generated] view of credit at the remote peer by considering the
+     * current link {@link #getCredit() credit} count as well as the effect of
+     * any locally {@link #getQueued() queued} messages.
+     *
+     * @return view of effective remote credit
+     */
     public int getRemoteCredit();
+
     public boolean getDrain();
 
     public void detach();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a52c331b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
index 8a2acf0..63e9ddd 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
@@ -431,12 +431,6 @@ public abstract class LinkImpl extends EndpointImpl implements Link
     }
 
     @Override
-    public int getRemoteCredit()
-    {
-        return _credit - _queued;
-    }
-
-    @Override
     public DeliveryImpl head()
     {
         return _head;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a52c331b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
index 39e21d5..a3a01f7 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ReceiverImpl.java
@@ -136,4 +136,12 @@ public class ReceiverImpl extends LinkImpl implements Receiver
         modified();
         _drainFlagMode = true;
     }
+
+    @Override
+    public int getRemoteCredit()
+    {
+        // Credit is only decremented once advance is called on a received message,
+        // so we also need to consider the queued count.
+        return getCredit() - getQueued();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a52c331b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
index 38ba043..7cf605f 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SenderImpl.java
@@ -117,4 +117,12 @@ public class SenderImpl  extends LinkImpl implements Sender
             advance();
         }*/
     }
+
+    @Override
+    public int getRemoteCredit()
+    {
+        // Credit is decremented as soon as advance is called on a send,
+        // so we need only consider the credit count, not the queued count.
+        return getCredit();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a52c331b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
index 0711368..10b509e 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
@@ -23,26 +23,36 @@ import static org.apache.qpid.proton.engine.EndpointState.ACTIVE;
 import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED;
 import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.logging.Logger;
 
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Source;
 import org.apache.qpid.proton.amqp.messaging.Target;
 import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
 import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.engine.Link;
 import org.apache.qpid.proton.engine.Sender;
+import org.apache.qpid.proton.message.Message;
 import org.junit.Test;
 
 public class LinkTest extends EngineTestBase
 {
     private static final Logger LOGGER = Logger.getLogger(LinkTest.class.getName());
 
+    private static final int BUFFER_SIZE = 4096;
+
     private static final Symbol RCV_PROP = Symbol.valueOf("ReceiverPropName");
     private static final Integer RCV_PROP_VAL = 1234;
     private static final Symbol SND_PROP = Symbol.valueOf("SenderPropName");
@@ -157,4 +167,205 @@ public class LinkTest extends EngineTestBase
         assertTrue("Client remote properties lack expected key: " + SND_PROP, clientRemoteProperties.containsKey(SND_PROP));
         assertEquals("Client remote properties contain unexpected value for key: " + SND_PROP, SND_PROP_VAL, clientRemoteProperties.get(SND_PROP));
     }
+
+    /**
+     * Test the {@link Link#getCredit()}, {@link Link#getQueued()}, and
+     * {@link Link#getRemoteCredit()} methods behave as expected when sending
+     * from server and receiving on client links.
+     *
+     * @throws Exception
+     *             if something unexpected occurs
+     */
+    @Test
+    public void testLinkCreditState() throws Exception
+    {
+        LOGGER.fine(bold("======== About to create transports"));
+
+        getClient().transport = Proton.transport();
+        ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX);
+
+        getServer().transport = Proton.transport();
+        ProtocolTracerEnabler.setProtocolTracer(getServer().transport, "            " + TestLoggingHelper.SERVER_PREFIX);
+
+        doOutputInputCycle();
+
+        getClient().connection = Proton.connection();
+        getClient().transport.bind(getClient().connection);
+
+        getServer().connection = Proton.connection();
+        getServer().transport.bind(getServer().connection);
+
+        LOGGER.fine(bold("======== About to open connections"));
+        getClient().connection.open();
+        getServer().connection.open();
+
+        doOutputInputCycle();
+
+        LOGGER.fine(bold("======== About to open sessions"));
+        getClient().session = getClient().connection.session();
+        getClient().session.open();
+
+        pumpClientToServer();
+
+        getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
+        assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
+
+        getServer().session.open();
+        assertEndpointState(getServer().session, ACTIVE, ACTIVE);
+
+        pumpServerToClient();
+        assertEndpointState(getClient().session, ACTIVE, ACTIVE);
+
+        LOGGER.fine(bold("======== About to create reciever"));
+
+        getClient().source = new Source();
+        getClient().source.setAddress(_sourceAddress);
+
+        getClient().target = new Target();
+        getClient().target.setAddress(null);
+
+        getClient().receiver = getClient().session.receiver("link1");
+        getClient().receiver.setTarget(getClient().target);
+        getClient().receiver.setSource(getClient().source);
+
+        getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+        getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+
+        assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED);
+
+        getClient().receiver.open();
+        assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED);
+
+        pumpClientToServer();
+
+        LOGGER.fine(bold("======== About to set up implicitly created sender"));
+
+        getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
+
+        getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode());
+        getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode());
+
+        org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = getServer().sender.getRemoteSource();
+        getServer().sender.setSource(serverRemoteSource);
+
+        assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE);
+        getServer().sender.open();
+
+        assertEndpointState(getServer().sender, ACTIVE, ACTIVE);
+
+        pumpServerToClient();
+
+        assertEndpointState(getClient().receiver, ACTIVE, ACTIVE);
+
+        LOGGER.fine(bold("======== About to flow credit"));
+
+        // Verify credit state
+        assertLinkCreditState(getServer().sender, 0, 0, 0);
+        assertLinkCreditState(getClient().receiver, 0, 0, 0);
+
+        int messagCount = 4;
+        getClient().receiver.flow(messagCount);
+
+        // Verify credit state
+        assertLinkCreditState(getServer().sender, 0, 0, 0);
+        assertLinkCreditState(getClient().receiver, 4, 0, 4);
+
+        pumpClientToServer();
+
+        // Verify credit state
+        assertLinkCreditState(getServer().sender, 4, 0, 4);
+        assertLinkCreditState(getClient().receiver, 4, 0, 4);
+
+        LOGGER.fine(bold("======== About to create messages and send to the client"));
+
+        // 'Send' and verify credit state
+        sendMessageToClient("delivery1", "Msg1");
+        assertLinkCreditState(getServer().sender, 3, 1, 3);
+        assertLinkCreditState(getClient().receiver, 4, 0, 4);
+
+        // 'Send' and verify credit state
+        sendMessageToClient("delivery2", "Msg2");
+        assertLinkCreditState(getServer().sender, 2, 2, 2);
+        assertLinkCreditState(getClient().receiver, 4, 0, 4);
+
+        // Pump to the client to send messages 'on the wire', verify new state, process messages
+        pumpServerToClient();
+
+        LOGGER.fine(bold("======== About to process the messages on the client"));
+
+        assertLinkCreditState(getServer().sender, 2, 0, 2);
+        assertLinkCreditState(getClient().receiver, 4, 2, 2);
+
+        receiveMessageFromServer("delivery1", "Msg1");
+
+        assertLinkCreditState(getServer().sender, 2, 0, 2);
+        assertLinkCreditState(getClient().receiver, 3, 1, 2);
+
+        receiveMessageFromServer("delivery2", "Msg2");
+
+        assertLinkCreditState(getServer().sender, 2, 0, 2);
+        assertLinkCreditState(getClient().receiver, 2, 0, 2);
+    }
+
+    void assertLinkCreditState(Link link, int credit, int queued, int remoteCredit)
+    {
+        assertEquals("Unexpected credit", credit, link.getCredit());
+        assertEquals("Unexpected queued", queued, link.getQueued());
+        assertEquals("Unexpected remote credit", remoteCredit, link.getRemoteCredit());
+    }
+
+    private Delivery receiveMessageFromServer(String deliveryTag, String messageContent)
+    {
+        Delivery delivery = getClient().connection.getWorkHead();
+
+        assertTrue(Arrays.equals(deliveryTag.getBytes(StandardCharsets.UTF_8), delivery.getTag()));
+        assertEquals("The received delivery should be on our receiver",
+                            getClient().receiver, delivery.getLink());
+
+        assertNull(delivery.getLocalState());
+        assertNull(delivery.getRemoteState());
+
+        assertFalse(delivery.isPartial());
+        assertTrue(delivery.isReadable());
+
+        byte[] received = new byte[BUFFER_SIZE];
+        int len = getClient().receiver.recv(received, 0, BUFFER_SIZE);
+
+        assertTrue("given array was too small", len < BUFFER_SIZE);
+
+        Message m = Proton.message();
+        m.decode(received, 0, len);
+
+        Object messageBody = ((AmqpValue)m.getBody()).getValue();
+        assertEquals("Unexpected message content", messageContent, messageBody);
+
+        boolean receiverAdvanced = getClient().receiver.advance();
+        assertTrue("receiver has not advanced", receiverAdvanced);
+
+        return delivery;
+    }
+
+    private Delivery sendMessageToClient(String deliveryTag, String messageContent)
+    {
+        byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
+
+        Message m = Proton.message();
+        m.setBody(new AmqpValue(messageContent));
+
+        byte[] encoded = new byte[BUFFER_SIZE];
+        int len = m.encode(encoded, 0, BUFFER_SIZE);
+
+        assertTrue("given array was too small", len < BUFFER_SIZE);
+
+        Delivery serverDelivery = getServer().sender.delivery(tag);
+
+        int sent = getServer().sender.send(encoded, 0, len);
+
+        assertEquals("sender unable to send all data at once as assumed for simplicity", len, sent);
+
+        boolean senderAdvanced = getServer().sender.advance();
+        assertTrue("sender has not advanced", senderAdvanced);
+
+        return serverDelivery;
+    }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org