You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2016/11/21 16:51:45 UTC
qpid-proton git commit: PROTON-721: expose link capabilities and wire
up handling of them
Repository: qpid-proton
Updated Branches:
refs/heads/master b4cadd1db -> 2d5b8d8a3
PROTON-721: expose link capabilities and wire up handling of them
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2d5b8d8a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2d5b8d8a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2d5b8d8a
Branch: refs/heads/master
Commit: 2d5b8d8a333d84785fbcfac2f1c22e9c0a754874
Parents: b4cadd1
Author: Robert Gemmell <ro...@apache.org>
Authored: Mon Nov 21 16:45:10 2016 +0000
Committer: Robert Gemmell <ro...@apache.org>
Committed: Mon Nov 21 16:45:10 2016 +0000
----------------------------------------------------------------------
.../org/apache/qpid/proton/engine/Link.java | 55 +++++++++
.../qpid/proton/engine/impl/LinkImpl.java | 50 ++++++++
.../qpid/proton/engine/impl/TransportImpl.java | 13 ++
.../qpid/proton/systemtests/LinkTest.java | 123 +++++++++++++++++++
4 files changed, 241 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d5b8d8a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
index 634f3e0..248c687 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java
@@ -243,4 +243,59 @@ public interface Link extends Endpoint
public void detach();
public boolean detached();
+ /**
+ * Sets the local link offered capabilities, to be conveyed to the peer via the Attach frame
+ * when attaching the link to the session.
+ *
+ * Must be called during link setup, i.e. before calling the {@link #open()} method.
+ *
+ * @param offeredCapabilities
+ * the offered capabilities array to send, or null for none.
+ */
+ public void setOfferedCapabilities(Symbol[] offeredCapabilities);
+
+ /**
+ * Gets the local link offered capabilities.
+ *
+ * @return the offered capabilities array, or null if none was set.
+ *
+ * @see #setOfferedCapabilities(Symbol[])
+ */
+ Symbol[] getOfferedCapabilities();
+
+ /**
+ * Gets the remote link offered capabilities, as conveyed from the peer via the Attach frame
+ * when attaching the link to the session.
+ *
+ * @return the offered capabilities array conveyed by the peer, or null if there was none.
+ */
+ Symbol[] getRemoteOfferedCapabilities();
+
+ /**
+ * Sets the local link desired capabilities, to be conveyed to the peer via the Attach frame
+ * when attaching the link to the session.
+ *
+ * Must be called during link setup, i.e. before calling the {@link #open()} method.
+ *
+ * @param desiredCapabilities
+ * the desired capabilities array to send, or null for none.
+ */
+ public void setDesiredCapabilities(Symbol[] desiredCapabilities);
+
+ /**
+ * Gets the local link desired capabilities.
+ *
+ * @return the desired capabilities array, or null if none was set.
+ *
+ * @see #setDesiredCapabilities(Symbol[])
+ */
+ Symbol[] getDesiredCapabilities();
+
+ /**
+ * Gets the remote link desired capabilities, as conveyed from the peer via the Attach frame
+ * when attaching the link to the session.
+ *
+ * @return the desired capabilities array conveyed by the peer, or null if there was none.
+ */
+ Symbol[] getRemoteDesiredCapabilities();
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d5b8d8a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
index 63e9ddd..a67785e 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java
@@ -61,6 +61,10 @@ public abstract class LinkImpl extends EndpointImpl implements Link
private boolean _detached;
private Map<Symbol, Object> _properties;
private Map<Symbol, Object> _remoteProperties;
+ private Symbol[] _offeredCapabilities;
+ private Symbol[] _remoteOfferedCapabilities;
+ private Symbol[] _desiredCapabilities;
+ private Symbol[] _remoteDesiredCapabilities;
LinkImpl(SessionImpl session, String name)
{
@@ -400,6 +404,52 @@ public abstract class LinkImpl extends EndpointImpl implements Link
}
@Override
+ public Symbol[] getDesiredCapabilities()
+ {
+ return _desiredCapabilities;
+ }
+
+ @Override
+ public void setDesiredCapabilities(Symbol[] desiredCapabilities)
+ {
+ _desiredCapabilities = desiredCapabilities;
+ }
+
+ @Override
+ public Symbol[] getRemoteDesiredCapabilities()
+ {
+ return _remoteDesiredCapabilities;
+ }
+
+ void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities)
+ {
+ _remoteDesiredCapabilities = remoteDesiredCapabilities;
+ }
+
+ @Override
+ public Symbol[] getOfferedCapabilities()
+ {
+ return _offeredCapabilities;
+ }
+
+ @Override
+ public void setOfferedCapabilities(Symbol[] offeredCapabilities)
+ {
+ _offeredCapabilities = offeredCapabilities;
+ }
+
+ @Override
+ public Symbol[] getRemoteOfferedCapabilities()
+ {
+ return _remoteOfferedCapabilities;
+ }
+
+ void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities)
+ {
+ _remoteOfferedCapabilities = remoteOfferedCapabilities;
+ }
+
+ @Override
public int drained()
{
int drained = 0;
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d5b8d8a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
index 9a5fcbd..bb2e43b 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java
@@ -781,6 +781,16 @@ public class TransportImpl extends EndpointImpl
attach.setProperties(link.getProperties());
}
+ if(link.getOfferedCapabilities() != null)
+ {
+ attach.setOfferedCapabilities(link.getOfferedCapabilities());
+ }
+
+ if(link.getDesiredCapabilities() != null)
+ {
+ attach.setDesiredCapabilities(link.getDesiredCapabilities());
+ }
+
attach.setRole(endpoint instanceof ReceiverImpl ? Role.RECEIVER : Role.SENDER);
if(link instanceof SenderImpl)
@@ -1182,6 +1192,9 @@ public class TransportImpl extends EndpointImpl
link.setRemoteProperties(attach.getProperties());
+ link.setRemoteDesiredCapabilities(attach.getDesiredCapabilities());
+ link.setRemoteOfferedCapabilities(attach.getOfferedCapabilities());
+
transportLink.setName(attach.getName());
transportLink.setRemoteHandle(handle);
transportSession.addLinkRemoteHandle(transportLink, handle);
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d5b8d8a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
----------------------------------------------------------------------
diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
index 10b509e..518960c 100644
--- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
+++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java
@@ -61,6 +61,129 @@ public class LinkTest extends EngineTestBase
private final String _sourceAddress = getServer().containerId + "-link1-source";
@Test
+ public void testCapabilities() throws Exception
+ {
+ final Symbol recvOfferedCap = Symbol.valueOf("recvOfferedCapability");
+ final Symbol recvDesiredCap = Symbol.valueOf("recvDesiredCapability");
+ final Symbol senderOfferedCap = Symbol.valueOf("senderOfferedCapability");
+ final Symbol senderDesiredCap = Symbol.valueOf("senderDesiredCapability");
+
+ Symbol[] clientOfferedCapabilities = new Symbol[] { recvOfferedCap };
+ Symbol[] clientDesiredCapabilities = new Symbol[] { recvDesiredCap };
+
+ Symbol[] serverOfferedCapabilities = new Symbol[] { senderOfferedCap };
+ Symbol[] serverDesiredCapabilities = new Symbol[] { senderDesiredCap };
+
+ LOGGER.fine(bold("======== About to create transports"));
+
+ getClient().transport = Proton.transport();
+ ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX);
+
+ getServer().transport = Proton.transport();
+ ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX);
+
+ doOutputInputCycle();
+
+ getClient().connection = Proton.connection();
+ getClient().transport.bind(getClient().connection);
+
+ getServer().connection = Proton.connection();
+ getServer().transport.bind(getServer().connection);
+
+ LOGGER.fine(bold("======== About to open connections"));
+ getClient().connection.open();
+ getServer().connection.open();
+
+ doOutputInputCycle();
+
+ LOGGER.fine(bold("======== About to open sessions"));
+ getClient().session = getClient().connection.session();
+ getClient().session.open();
+
+ pumpClientToServer();
+
+ getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE));
+ assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE);
+
+ getServer().session.open();
+ assertEndpointState(getServer().session, ACTIVE, ACTIVE);
+
+ pumpServerToClient();
+ assertEndpointState(getClient().session, ACTIVE, ACTIVE);
+
+ LOGGER.fine(bold("======== About to create reciever"));
+
+ getClient().source = new Source();
+ getClient().source.setAddress(_sourceAddress);
+
+ getClient().target = new Target();
+ getClient().target.setAddress(null);
+
+ getClient().receiver = getClient().session.receiver("link1");
+ getClient().receiver.setTarget(getClient().target);
+ getClient().receiver.setSource(getClient().source);
+
+ getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
+ getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
+
+ // Set the client receivers capabilities
+ getClient().receiver.setOfferedCapabilities(clientOfferedCapabilities);
+ getClient().receiver.setDesiredCapabilities(clientDesiredCapabilities);
+
+ assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED);
+
+ getClient().receiver.open();
+ assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED);
+
+ pumpClientToServer();
+
+ LOGGER.fine(bold("======== About to set up implicitly created sender"));
+
+ getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE));
+
+ getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode());
+ getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode());
+
+ org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = getServer().sender.getRemoteSource();
+ getServer().sender.setSource(serverRemoteSource);
+
+ // Set the server senders capabilities
+ getServer().sender.setOfferedCapabilities(serverOfferedCapabilities);
+ getServer().sender.setDesiredCapabilities(serverDesiredCapabilities);
+
+ assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE);
+ getServer().sender.open();
+
+ assertEndpointState(getServer().sender, ACTIVE, ACTIVE);
+
+ pumpServerToClient();
+
+ assertEndpointState(getClient().receiver, ACTIVE, ACTIVE);
+
+ // Verify server side got the clients receiver capabilities as expected
+ Symbol[] serverRemoteOfferedCapabilities = getServer().sender.getRemoteOfferedCapabilities();
+ assertNotNull("Server had no remote offered capabilities", serverRemoteOfferedCapabilities);
+ assertEquals("Server remote offered capabilities not expected size", 1, serverRemoteOfferedCapabilities.length);
+ assertTrue("Server remote offered capabilities lack expected value: " + recvOfferedCap, Arrays.asList(serverRemoteOfferedCapabilities).contains(recvOfferedCap));
+
+ Symbol[] serverRemoteDesiredCapabilities = getServer().sender.getRemoteDesiredCapabilities();
+ assertNotNull("Server had no remote desired capabilities", serverRemoteDesiredCapabilities);
+ assertEquals("Server remote desired capabilities not expected size", 1, serverRemoteDesiredCapabilities.length);
+ assertTrue("Server remote desired capabilities lack expected value: " + recvDesiredCap, Arrays.asList(serverRemoteDesiredCapabilities).contains(recvDesiredCap));
+
+ // Verify the client side got the servers sender capabilities as expected
+ Symbol[] clientRemoteOfferedCapabilities = getClient().receiver.getRemoteOfferedCapabilities();
+ assertNotNull("Client had no remote offered capabilities", clientRemoteOfferedCapabilities);
+ assertEquals("Client remote offered capabilities not expected size", 1, clientRemoteOfferedCapabilities.length);
+ assertTrue("Client remote offered capabilities lack expected value: " + senderOfferedCap, Arrays.asList(clientRemoteOfferedCapabilities).contains(senderOfferedCap));
+
+ Symbol[] clientRemoteDesiredCapabilities = getClient().receiver.getRemoteDesiredCapabilities();
+ assertNotNull("Client had no remote desired capabilities", clientRemoteDesiredCapabilities);
+ assertEquals("Client remote desired capabilities not expected size", 1, clientRemoteDesiredCapabilities.length);
+ assertTrue("Client remote desired capabilities lack expected value: " + senderDesiredCap, Arrays.asList(clientRemoteDesiredCapabilities).contains(senderDesiredCap));
+ }
+
+ @Test
public void testProperties() throws Exception
{
Map<Symbol, Object> receiverProps = new HashMap<>();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org