You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/11/21 16:51:45 UTC

qpid-proton git commit: PROTON-721: expose link capabilities and wire up handling of them

Repository: qpid-proton
Updated Branches:
  refs/heads/master b4cadd1db -> 2d5b8d8a3


PROTON-721: expose link capabilities and wire up handling of them


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2d5b8d8a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2d5b8d8a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2d5b8d8a

Branch: refs/heads/master
Commit: 2d5b8d8a333d84785fbcfac2f1c22e9c0a754874
Parents: b4cadd1
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Nov 21 16:45:10 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Nov 21 16:45:10 2016 +0000

----------------------------------------------------------------------
 .../org/apache/qpid/proton/engine/Link.java     |  55 +++++++++
 .../qpid/proton/engine/impl/LinkImpl.java       |  50 ++++++++
 .../qpid/proton/engine/impl/TransportImpl.java  |  13 ++
 .../qpid/proton/systemtests/LinkTest.java       | 123 +++++++++++++++++++
 4 files changed, 241 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d5b8d8a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
index 634f3e0..248c687 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
@@ -243,4 +243,59 @@ public interface Link extends Endpoint
     public void detach();
     public boolean detached();
 
+    /**
+     * Sets the local link offered capabilities, to be conveyed to the peer via the Attach frame
+     * when attaching the link to the session.
+     *
+     * Must be called during link setup, i.e. before calling the {@link #open()} method.
+     *
+     * @param offeredCapabilities
+     *          the offered capabilities array to send, or null for none.
+     */
+    public void setOfferedCapabilities(Symbol[] offeredCapabilities);
+
+    /**
+     * Gets the local link offered capabilities.
+     *
+     * @return the offered capabilities array, or null if none was set.
+     *
+     * @see #setOfferedCapabilities(Symbol[])
+     */
+    Symbol[] getOfferedCapabilities();
+
+    /**
+     * Gets the remote link offered capabilities, as conveyed from the peer via the Attach frame
+     * when attaching the link to the session.
+     *
+     * @return the offered capabilities array conveyed by the peer, or null if there was none.
+     */
+    Symbol[] getRemoteOfferedCapabilities();
+
+    /**
+     * Sets the local link desired capabilities, to be conveyed to the peer via the Attach frame
+     * when attaching the link to the session.
+     *
+     * Must be called during link setup, i.e. before calling the {@link #open()} method.
+     *
+     * @param desiredCapabilities
+     *          the desired capabilities array to send, or null for none.
+     */
+    public void setDesiredCapabilities(Symbol[] desiredCapabilities);
+
+    /**
+     * Gets the local link desired capabilities.
+     *
+     * @return the desired capabilities array, or null if none was set.
+     *
+     * @see #setDesiredCapabilities(Symbol[])
+     */
+    Symbol[] getDesiredCapabilities();
+
+    /**
+     * Gets the remote link desired capabilities, as conveyed from the peer via the Attach frame
+     * when attaching the link to the session.
+     *
+     * @return the desired capabilities array conveyed by the peer, or null if there was none.
+     */
+    Symbol[] getRemoteDesiredCapabilities();
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d5b8d8a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
index 63e9ddd..a67785e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
@@ -61,6 +61,10 @@ public abstract class LinkImpl extends EndpointImpl implements Link
     private boolean _detached;
     private Map<Symbol, Object> _properties;
     private Map<Symbol, Object> _remoteProperties;
+    private Symbol[] _offeredCapabilities;
+    private Symbol[] _remoteOfferedCapabilities;
+    private Symbol[] _desiredCapabilities;
+    private Symbol[] _remoteDesiredCapabilities;
 
     LinkImpl(SessionImpl session, String name)
     {
@@ -400,6 +404,52 @@ public abstract class LinkImpl extends EndpointImpl implements Link
     }
 
     @Override
+    public Symbol[] getDesiredCapabilities()
+    {
+        return _desiredCapabilities;
+    }
+
+    @Override
+    public void setDesiredCapabilities(Symbol[] desiredCapabilities)
+    {
+        _desiredCapabilities = desiredCapabilities;
+    }
+
+    @Override
+    public Symbol[] getRemoteDesiredCapabilities()
+    {
+        return _remoteDesiredCapabilities;
+    }
+
+    void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities)
+    {
+        _remoteDesiredCapabilities = remoteDesiredCapabilities;
+    }
+
+    @Override
+    public Symbol[] getOfferedCapabilities()
+    {
+        return _offeredCapabilities;
+    }
+
+    @Override
+    public void setOfferedCapabilities(Symbol[] offeredCapabilities)
+    {
+        _offeredCapabilities = offeredCapabilities;
+    }
+
+    @Override
+    public Symbol[] getRemoteOfferedCapabilities()
+    {
+        return _remoteOfferedCapabilities;
+    }
+
+    void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities)
+    {
+        _remoteOfferedCapabilities = remoteOfferedCapabilities;
+    }
+
+    @Override
     public int drained()
     {
         int drained = 0;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d5b8d8a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 9a5fcbd..bb2e43b 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -781,6 +781,16 @@ public class TransportImpl extends EndpointImpl
                                 attach.setProperties(link.getProperties());
                             }
 
+                            if(link.getOfferedCapabilities() != null)
+                            {
+                                attach.setOfferedCapabilities(link.getOfferedCapabilities());
+                            }
+
+                            if(link.getDesiredCapabilities() != null)
+                            {
+                                attach.setDesiredCapabilities(link.getDesiredCapabilities());
+                            }
+
                             attach.setRole(endpoint instanceof ReceiverImpl ? Role.RECEIVER : Role.SENDER);
 
                             if(link instanceof SenderImpl)
@@ -1182,6 +1192,9 @@ public class TransportImpl extends EndpointImpl
 
                 link.setRemoteProperties(attach.getProperties());
 
+                link.setRemoteDesiredCapabilities(attach.getDesiredCapabilities());
+                link.setRemoteOfferedCapabilities(attach.getOfferedCapabilities());
+
                 transportLink.setName(attach.getName());
                 transportLink.setRemoteHandle(handle);
                 transportSession.addLinkRemoteHandle(transportLink, handle);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d5b8d8a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
index 10b509e..518960c 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
@@ -61,6 +61,129 @@ public class LinkTest extends EngineTestBase
     private final String _sourceAddress = getServer().containerId + "-link1-source";
 
     @Test
+    public void testCapabilities() throws Exception
+    {
+        final Symbol recvOfferedCap = Symbol.valueOf("recvOfferedCapability");
+        final Symbol recvDesiredCap = Symbol.valueOf("recvDesiredCapability");
+        final Symbol senderOfferedCap = Symbol.valueOf("senderOfferedCapability");
+        final Symbol senderDesiredCap = Symbol.valueOf("senderDesiredCapability");
+
+        Symbol[] clientOfferedCapabilities = new Symbol[] { recvOfferedCap };
+        Symbol[] clientDesiredCapabilities = new Symbol[] { recvDesiredCap };
+
+        Symbol[] serverOfferedCapabilities = new Symbol[] { senderOfferedCap };
+        Symbol[] serverDesiredCapabilities = new Symbol[] { senderDesiredCap };
+
+        LOGGER.fine(bold("======== About to create transports"));
+
+        getClient().transport = Proton.transport();
+        ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX);
+
+        getServer().transport = Proton.transport();
+        ProtocolTracerEnabler.setProtocolTracer(getServer().transport, "            " + TestLoggingHelper.SERVER_PREFIX);
+
+        doOutputInputCycle();
+
+        getClient().connection = Proton.connection();
+        getClient().transport.bind(getClient().connection);
+
+        getServer().connection = Proton.connection();
+        getServer().transport.bind(getServer().connection);
+
+        LOGGER.fine(bold("======== About to open connections"));
+        getClient().connection.open();
+        getServer().connection.open();
+
+        doOutputInputCycle();
+
+        LOGGER.fine(bold("======== About to open sessions"));
+        getClient().session = getClient().connection.session();
+        getClient().session.open();
+
+        pumpClientToServer();
+
+        getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
+        assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
+
+        getServer().session.open();
+        assertEndpointState(getServer().session, ACTIVE, ACTIVE);
+
+        pumpServerToClient();
+        assertEndpointState(getClient().session, ACTIVE, ACTIVE);
+
+        LOGGER.fine(bold("======== About to create reciever"));
+
+        getClient().source = new Source();
+        getClient().source.setAddress(_sourceAddress);
+
+        getClient().target = new Target();
+        getClient().target.setAddress(null);
+
+        getClient().receiver = getClient().session.receiver("link1");
+        getClient().receiver.setTarget(getClient().target);
+        getClient().receiver.setSource(getClient().source);
+
+        getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+        getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+
+        // Set the client receivers capabilities
+        getClient().receiver.setOfferedCapabilities(clientOfferedCapabilities);
+        getClient().receiver.setDesiredCapabilities(clientDesiredCapabilities);
+
+        assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED);
+
+        getClient().receiver.open();
+        assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED);
+
+        pumpClientToServer();
+
+        LOGGER.fine(bold("======== About to set up implicitly created sender"));
+
+        getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
+
+        getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode());
+        getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode());
+
+        org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = getServer().sender.getRemoteSource();
+        getServer().sender.setSource(serverRemoteSource);
+
+        // Set the server senders capabilities
+        getServer().sender.setOfferedCapabilities(serverOfferedCapabilities);
+        getServer().sender.setDesiredCapabilities(serverDesiredCapabilities);
+
+        assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE);
+        getServer().sender.open();
+
+        assertEndpointState(getServer().sender, ACTIVE, ACTIVE);
+
+        pumpServerToClient();
+
+        assertEndpointState(getClient().receiver, ACTIVE, ACTIVE);
+
+        // Verify server side got the clients receiver capabilities as expected
+        Symbol[] serverRemoteOfferedCapabilities = getServer().sender.getRemoteOfferedCapabilities();
+        assertNotNull("Server had no remote offered capabilities", serverRemoteOfferedCapabilities);
+        assertEquals("Server remote offered capabilities not expected size", 1, serverRemoteOfferedCapabilities.length);
+        assertTrue("Server remote offered capabilities lack expected value: " + recvOfferedCap, Arrays.asList(serverRemoteOfferedCapabilities).contains(recvOfferedCap));
+
+        Symbol[] serverRemoteDesiredCapabilities = getServer().sender.getRemoteDesiredCapabilities();
+        assertNotNull("Server had no remote desired capabilities", serverRemoteDesiredCapabilities);
+        assertEquals("Server remote desired capabilities not expected size", 1, serverRemoteDesiredCapabilities.length);
+        assertTrue("Server remote desired capabilities lack expected value: " + recvDesiredCap, Arrays.asList(serverRemoteDesiredCapabilities).contains(recvDesiredCap));
+
+        // Verify the client side got the servers sender capabilities as expected
+        Symbol[]  clientRemoteOfferedCapabilities = getClient().receiver.getRemoteOfferedCapabilities();
+        assertNotNull("Client had no remote offered capabilities", clientRemoteOfferedCapabilities);
+        assertEquals("Client remote offered capabilities not expected size", 1, clientRemoteOfferedCapabilities.length);
+        assertTrue("Client remote offered capabilities lack expected value: " + senderOfferedCap, Arrays.asList(clientRemoteOfferedCapabilities).contains(senderOfferedCap));
+
+        Symbol[]  clientRemoteDesiredCapabilities = getClient().receiver.getRemoteDesiredCapabilities();
+        assertNotNull("Client had no remote desired capabilities", clientRemoteDesiredCapabilities);
+        assertEquals("Client remote desired capabilities not expected size", 1, clientRemoteDesiredCapabilities.length);
+        assertTrue("Client remote desired capabilities lack expected value: " + senderDesiredCap, Arrays.asList(clientRemoteDesiredCapabilities).contains(senderDesiredCap));
+    }
+
+    @Test
     public void testProperties() throws Exception
     {
         Map<Symbol, Object> receiverProps = new HashMap<>();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org