You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/05/10 16:10:58 UTC

[3/3] qpid-broker-j git commit: QPID-7748: [Java Broker] Ensure Flow is sent in response to Flow.echo=true

QPID-7748: [Java Broker] Ensure Flow is sent in response to Flow.echo=true


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f166e530
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f166e530
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f166e530

Branch: refs/heads/master
Commit: f166e5302cea4e66c97e1dfd6b33ee3854e6ec62
Parents: 66a1545
Author: Lorenz Quack <lq...@apache.org>
Authored: Wed May 10 15:09:27 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Wed May 10 17:09:35 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/AMQPConnection_1_0Impl.java   |  4 +-
 .../protocol/v1_0/AbstractLinkEndpoint.java     |  1 +
 .../protocol/v1_0/ErrantLinkEndpoint.java       |  6 ++
 .../qpid/server/protocol/v1_0/LinkEndpoint.java |  2 +
 .../server/protocol/v1_0/SequenceNumber.java    |  5 ++
 .../qpid/server/protocol/v1_0/Session_1_0.java  | 95 ++++++++++++--------
 .../protocol/v1_0/type/transport/Flow.java      |  2 +-
 .../protocol/v1_0/transport/link/FlowTest.java  | 45 ++++++++--
 8 files changed, 111 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f166e530/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 07777a4..809fb58 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -631,8 +631,8 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
                     Begin beginToSend = new Begin();
                     beginToSend.setRemoteChannel(UnsignedShort.valueOf(receivingChannelId));
                     beginToSend.setNextOutgoingId(session.getNextOutgoingId());
-                    beginToSend.setOutgoingWindow(session.getOutgoingWindowSize());
-                    beginToSend.setIncomingWindow(session.getIncomingWindowSize());
+                    beginToSend.setOutgoingWindow(session.getOutgoingWindow());
+                    beginToSend.setIncomingWindow(session.getIncomingWindow());
                     sendFrame(sendingChannelId, beginToSend);
 
                     synchronized (_blockingLock)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f166e530/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 4ee3a86..0c99258 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -423,6 +423,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
     }
 
 
+    @Override
     public void sendFlow()
     {
         sendFlow(_flowTransactionId != null);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f166e530/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
index b0e98a2..d7a7667 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
@@ -136,6 +136,12 @@ public class ErrantLinkEndpoint<S extends BaseSource, T extends BaseTarget> impl
     }
 
     @Override
+    public void sendFlow()
+    {
+        throw new UnsupportedOperationException("This Link is errant");
+    }
+
+    @Override
     public void flowStateChanged()
     {
         throw new UnsupportedOperationException("This Link is errant");

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f166e530/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
index c2e4a5e..da8c0cb 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
@@ -56,6 +56,8 @@ public interface LinkEndpoint<S extends BaseSource, T extends BaseTarget>
 
     void receiveFlow(Flow flow);
 
+    void sendFlow();
+
     void flowStateChanged();
 
     void start();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f166e530/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
index 58f4e1c..6abe4e3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
@@ -107,4 +107,9 @@ public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable
     {
         return _seqNo;
     }
+
+    public long longValue()
+    {
+        return  ((long) _seqNo) & 0xFFFFFFFFL;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f166e530/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 9a39007..c203e50 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -53,8 +53,8 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.filter.AMQPFilterTypes;
 import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.filter.AMQPFilterTypes;
 import org.apache.qpid.server.filter.SelectorParsingException;
 import org.apache.qpid.server.filter.selector.ParseException;
 import org.apache.qpid.server.filter.selector.TokenMgrError;
@@ -146,17 +146,18 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     private int _nextOutgoingDeliveryId;
 
-    private UnsignedInteger _outgoingSessionCredit;
-    private UnsignedInteger _initialOutgoingId = UnsignedInteger.valueOf(0);
-    private SequenceNumber _nextIncomingTransferId;
-    private SequenceNumber _nextOutgoingTransferId = new SequenceNumber(_initialOutgoingId.intValue());
+    private UnsignedInteger _initialOutgoingId = UnsignedInteger.ZERO;
+    private SequenceNumber _nextIncomingId;
+    private final int _incomingWindow;
+    private SequenceNumber _nextOutgoingId = new SequenceNumber(_initialOutgoingId.intValue());
+    private int _outgoingWindow = DEFAULT_SESSION_BUFFER_SIZE;
+    private UnsignedInteger _remoteIncomingWindow;
+    private UnsignedInteger _remoteOutgoingWindow = UnsignedInteger.ZERO;
+    private UnsignedInteger _lastSentIncomingLimit;
 
     private LinkedHashMap<UnsignedInteger,Delivery> _outgoingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
     private LinkedHashMap<UnsignedInteger,Delivery> _incomingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
 
-    private final int _incomingWindowSize;
-    private int _availableOutgoingCredit = DEFAULT_SESSION_BUFFER_SIZE;
-    private UnsignedInteger _lastSentIncomingLimit;
 
     private final Error _sessionEndedLinkError =
             new Error(LinkError.DETACH_FORCED,
@@ -173,16 +174,16 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
                        Begin begin,
                        short sendingChannelId,
                        short receivingChannelId,
-                       int incomingWindowSize)
+                       int incomingWindow)
     {
         super(connection, sendingChannelId);
         _sendingChannel = sendingChannelId;
         _receivingChannel = receivingChannelId;
         _sessionState = SessionState.ACTIVE;
-        _nextIncomingTransferId = new SequenceNumber(begin.getNextOutgoingId().intValue());
+        _nextIncomingId = new SequenceNumber(begin.getNextOutgoingId().intValue());
         _connection = connection;
         _primaryDomain = getPrimaryDomain();
-        _incomingWindowSize = incomingWindowSize;
+        _incomingWindow = incomingWindow;
 
         AccessController.doPrivileged((new PrivilegedAction<Object>()
         {
@@ -264,8 +265,8 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     public boolean hasCreditToSend()
     {
-        boolean b = _outgoingSessionCredit != null && _outgoingSessionCredit.intValue() > 0;
-        boolean b1 = getOutgoingWindowSize() != null && getOutgoingWindowSize().compareTo(UnsignedInteger.ZERO) > 0;
+        boolean b = _remoteIncomingWindow != null && _remoteIncomingWindow.intValue() > 0;
+        boolean b1 = getOutgoingWindow() != null && getOutgoingWindow().compareTo(UnsignedInteger.ZERO) > 0;
         return b && b1;
     }
 
@@ -276,7 +277,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     public void sendTransfer(final Transfer xfr, final SendingLinkEndpoint endpoint, final boolean newDelivery)
     {
-        _nextOutgoingTransferId.incr();
+        _nextOutgoingId.incr();
         UnsignedInteger deliveryId;
         final boolean settled = Boolean.TRUE.equals(xfr.getSettled());
         if (newDelivery)
@@ -287,7 +288,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
             {
                 final Delivery delivery = new Delivery(xfr, endpoint);
                 _outgoingUnsettled.put(deliveryId, delivery);
-                _outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
+                _remoteIncomingWindow = _remoteIncomingWindow.subtract(UnsignedInteger.ONE);
                 endpoint.addUnsettled(delivery);
             }
         }
@@ -300,11 +301,11 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
                 if (!settled)
                 {
                     delivery.addTransfer(xfr);
-                    _outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
+                    _remoteIncomingWindow = _remoteIncomingWindow.subtract(UnsignedInteger.ONE);
                 }
                 else
                 {
-                    _outgoingSessionCredit = _outgoingSessionCredit.add(new UnsignedInteger(delivery.getNumberOfTransfers()));
+                    _remoteIncomingWindow = _remoteIncomingWindow.add(new UnsignedInteger(delivery.getNumberOfTransfers()));
                     endpoint.settle(delivery.getDeliveryTag());
                     _outgoingUnsettled.remove(deliveryId);
                 }
@@ -382,19 +383,19 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     public UnsignedInteger getNextOutgoingId()
     {
-        return UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue());
+        return UnsignedInteger.valueOf(_nextOutgoingId.intValue());
     }
 
     public void sendFlowConditional()
     {
-        if(_nextIncomingTransferId != null)
+        if(_nextIncomingId != null)
         {
             UnsignedInteger clientsCredit =
-                    _lastSentIncomingLimit.subtract(UnsignedInteger.valueOf(_nextIncomingTransferId.intValue()));
+                    _lastSentIncomingLimit.subtract(UnsignedInteger.valueOf(_nextIncomingId.intValue()));
 
             // TODO - we should use a better metric here, and/or manage session credit across the whole connection
             // send a flow if the window is at least half used up
-            if (UnsignedInteger.valueOf(_incomingWindowSize).subtract(clientsCredit).compareTo(clientsCredit) >= 0)
+            if (UnsignedInteger.valueOf(_incomingWindow).subtract(clientsCredit).compareTo(clientsCredit) >= 0)
             {
                 sendFlow();
             }
@@ -402,38 +403,53 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     }
 
-    public UnsignedInteger getOutgoingWindowSize()
+    public UnsignedInteger getOutgoingWindow()
     {
-        return UnsignedInteger.valueOf(_availableOutgoingCredit);
+        return UnsignedInteger.valueOf(_outgoingWindow);
     }
 
     public void receiveFlow(final Flow flow)
     {
         UnsignedInteger handle = flow.getHandle();
-        final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint = handle == null ? null : _inputHandleToEndpoint.get(handle);
+        final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint =
+                handle == null ? null : _inputHandleToEndpoint.get(handle);
 
         final UnsignedInteger nextOutgoingId =
                 flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
-        int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
-        _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
+        long limit = (nextOutgoingId.longValue() + flow.getIncomingWindow().longValue());
+        _remoteIncomingWindow = UnsignedInteger.valueOf(limit - _nextOutgoingId.longValue());
+
+        _nextIncomingId = new SequenceNumber(flow.getNextOutgoingId().intValue());
+        _remoteOutgoingWindow = flow.getOutgoingWindow();
 
         if (endpoint != null)
         {
             endpoint.receiveFlow(flow);
+
+            if (Boolean.TRUE.equals(flow.getEcho()))
+            {
+                endpoint.sendFlow();
+            }
         }
         else
         {
-            final Collection<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> allLinkEndpoints = _inputHandleToEndpoint.values();
+            final Collection<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> allLinkEndpoints =
+                    _inputHandleToEndpoint.values();
             for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> le : allLinkEndpoints)
             {
                 le.flowStateChanged();
             }
+
+            if (Boolean.TRUE.equals(flow.getEcho()))
+            {
+                sendFlow();
+            }
         }
     }
 
     public void setNextIncomingId(final UnsignedInteger nextIncomingId)
     {
-        _nextIncomingTransferId = new SequenceNumber(nextIncomingId.intValue());
+        _nextIncomingId = new SequenceNumber(nextIncomingId.intValue());
 
     }
 
@@ -496,22 +512,22 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     public void sendFlow(final Flow flow)
     {
-        if(_nextIncomingTransferId != null)
+        if(_nextIncomingId != null)
         {
-            final int nextIncomingId = _nextIncomingTransferId.intValue();
+            final long nextIncomingId = _nextIncomingId.longValue();
             flow.setNextIncomingId(UnsignedInteger.valueOf(nextIncomingId));
-            _lastSentIncomingLimit = UnsignedInteger.valueOf(nextIncomingId + _incomingWindowSize);
+            _lastSentIncomingLimit = UnsignedInteger.valueOf(nextIncomingId + _incomingWindow);
         }
-        flow.setIncomingWindow(UnsignedInteger.valueOf(_incomingWindowSize));
+        flow.setIncomingWindow(UnsignedInteger.valueOf(_incomingWindow));
 
-        flow.setNextOutgoingId(UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue()));
-        flow.setOutgoingWindow(UnsignedInteger.valueOf(_availableOutgoingCredit));
+        flow.setNextOutgoingId(UnsignedInteger.valueOf(_nextOutgoingId.intValue()));
+        flow.setOutgoingWindow(UnsignedInteger.valueOf(_outgoingWindow));
         send(flow);
     }
 
-    public void setOutgoingSessionCredit(final UnsignedInteger outgoingSessionCredit)
+    public void setRemoteIncomingWindow(final UnsignedInteger remoteIncomingWindow)
     {
-        _outgoingSessionCredit = outgoingSessionCredit;
+        _remoteIncomingWindow = remoteIncomingWindow;
     }
 
     public void receiveDetach(final Detach detach)
@@ -565,7 +581,8 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     public void receiveTransfer(final Transfer transfer)
     {
-        _nextIncomingTransferId.incr();
+        _nextIncomingId.incr();
+        _remoteOutgoingWindow = _remoteOutgoingWindow.subtract(UnsignedInteger.ONE);
 
         UnsignedInteger inputHandle = transfer.getHandle();
         LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint = _inputHandleToEndpoint.get(inputHandle);
@@ -654,9 +671,9 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         return _sessionState == SessionState.ENDED || _connection.isClosed();
     }
 
-    UnsignedInteger getIncomingWindowSize()
+    UnsignedInteger getIncomingWindow()
     {
-        return UnsignedInteger.valueOf(_incomingWindowSize);
+        return UnsignedInteger.valueOf(_incomingWindow);
     }
 
     AccessControlContext getAccessControllerContext()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f166e530/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java
index 764bd6f..2d11684 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java
@@ -27,8 +27,8 @@ import java.nio.ByteBuffer;
 import java.util.Map;
 
 import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
 
 public class Flow implements FrameBody

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f166e530/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index ccb05f7..564df3b 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -23,14 +23,13 @@ package org.apache.qpid.tests.protocol.v1_0.transport.link;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.fail;
+import static org.hamcrest.core.IsNot.not;
 
 import java.net.InetSocketAddress;
 
-import org.junit.Ignore;
 import org.junit.Test;
 
 import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
@@ -71,22 +70,53 @@ public class FlowTest extends ProtocolTestBase
     }
 
     @Test
-    @Ignore("QPID-7748")
     @SpecificationTest(section = "2.7.4",
             description = "If set to true then the receiver SHOULD send its state at the earliest convenient opportunity.")
-    public void echoFlow() throws Exception
+    public void sessionEchoFlow() throws Exception
+    {
+        final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+        try (FrameTransport transport = new FrameTransport(addr))
+        {
+            transport.doBeginSession();
+            Flow flow = new Flow();
+            flow.setIncomingWindow(UnsignedInteger.ZERO);
+            flow.setNextIncomingId(UnsignedInteger.ZERO);
+            flow.setOutgoingWindow(UnsignedInteger.ZERO);
+            flow.setNextOutgoingId(UnsignedInteger.ZERO);
+            flow.setEcho(Boolean.TRUE);
+
+            transport.sendPerformative(flow);
+            PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+            assertThat(response, is(notNullValue()));
+            assertThat(response.getFrameBody(), is(instanceOf(Flow.class)));
+            Flow responseFlow = (Flow) response.getFrameBody();
+            assertThat(responseFlow.getEcho(), not(equalTo(Boolean.TRUE)));
+            assertThat(responseFlow.getHandle(), is(nullValue()));
+        }
+    }
+
+    @Test
+    @SpecificationTest(section = "2.7.4",
+            description = "If set to true then the receiver SHOULD send its state at the earliest convenient opportunity.")
+    public void linkEchoFlow() throws Exception
     {
         getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
         final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
         try (FrameTransport transport = new FrameTransport(addr))
         {
-            transport.doAttachReceivingLink(BrokerAdmin.TEST_QUEUE_NAME);
+            final UnsignedInteger handle = UnsignedInteger.ONE;
+            transport.doAttachSendingLink(handle, BrokerAdmin.TEST_QUEUE_NAME);
             Flow flow = new Flow();
             flow.setIncomingWindow(UnsignedInteger.ZERO);
             flow.setNextIncomingId(UnsignedInteger.ZERO);
             flow.setOutgoingWindow(UnsignedInteger.ZERO);
             flow.setNextOutgoingId(UnsignedInteger.ZERO);
             flow.setEcho(Boolean.TRUE);
+            flow.setAvailable(UnsignedInteger.valueOf(10));
+            flow.setDeliveryCount(UnsignedInteger.ZERO);
+            flow.setLinkCredit(UnsignedInteger.ZERO);
+            flow.setHandle(handle);
 
             transport.sendPerformative(flow);
             PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
@@ -94,7 +124,8 @@ public class FlowTest extends ProtocolTestBase
             assertThat(response, is(notNullValue()));
             assertThat(response.getFrameBody(), is(instanceOf(Flow.class)));
             Flow responseFlow = (Flow) response.getFrameBody();
-            assertThat(responseFlow.getEcho(), is(equalTo(Boolean.FALSE)));
+            assertThat(responseFlow.getEcho(), not(equalTo(Boolean.TRUE)));
+            assertThat(responseFlow.getHandle(), is(notNullValue()));
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org