You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/05/10 16:10:58 UTC
[3/3] qpid-broker-j git commit: QPID-7748: [Java Broker] Ensure Flow
is sent in response to Flow.echo=true
QPID-7748: [Java Broker] Ensure Flow is sent in response to Flow.echo=true
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/f166e530
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/f166e530
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/f166e530
Branch: refs/heads/master
Commit: f166e5302cea4e66c97e1dfd6b33ee3854e6ec62
Parents: 66a1545
Author: Lorenz Quack <lq...@apache.org>
Authored: Wed May 10 15:09:27 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Wed May 10 17:09:35 2017 +0100
----------------------------------------------------------------------
.../protocol/v1_0/AMQPConnection_1_0Impl.java | 4 +-
.../protocol/v1_0/AbstractLinkEndpoint.java | 1 +
.../protocol/v1_0/ErrantLinkEndpoint.java | 6 ++
.../qpid/server/protocol/v1_0/LinkEndpoint.java | 2 +
.../server/protocol/v1_0/SequenceNumber.java | 5 ++
.../qpid/server/protocol/v1_0/Session_1_0.java | 95 ++++++++++++--------
.../protocol/v1_0/type/transport/Flow.java | 2 +-
.../protocol/v1_0/transport/link/FlowTest.java | 45 ++++++++--
8 files changed, 111 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f166e530/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index 07777a4..809fb58 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -631,8 +631,8 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
Begin beginToSend = new Begin();
beginToSend.setRemoteChannel(UnsignedShort.valueOf(receivingChannelId));
beginToSend.setNextOutgoingId(session.getNextOutgoingId());
- beginToSend.setOutgoingWindow(session.getOutgoingWindowSize());
- beginToSend.setIncomingWindow(session.getIncomingWindowSize());
+ beginToSend.setOutgoingWindow(session.getOutgoingWindow());
+ beginToSend.setIncomingWindow(session.getIncomingWindow());
sendFrame(sendingChannelId, beginToSend);
synchronized (_blockingLock)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f166e530/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 4ee3a86..0c99258 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -423,6 +423,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
}
+ @Override
public void sendFlow()
{
sendFlow(_flowTransactionId != null);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f166e530/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
index b0e98a2..d7a7667 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrantLinkEndpoint.java
@@ -136,6 +136,12 @@ public class ErrantLinkEndpoint<S extends BaseSource, T extends BaseTarget> impl
}
@Override
+ public void sendFlow()
+ {
+ throw new UnsupportedOperationException("This Link is errant");
+ }
+
+ @Override
public void flowStateChanged()
{
throw new UnsupportedOperationException("This Link is errant");
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f166e530/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
index c2e4a5e..da8c0cb 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java
@@ -56,6 +56,8 @@ public interface LinkEndpoint<S extends BaseSource, T extends BaseTarget>
void receiveFlow(Flow flow);
+ void sendFlow();
+
void flowStateChanged();
void start();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f166e530/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
index 58f4e1c..6abe4e3 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
@@ -107,4 +107,9 @@ public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable
{
return _seqNo;
}
+
+ public long longValue()
+ {
+ return ((long) _seqNo) & 0xFFFFFFFFL;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f166e530/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 9a39007..c203e50 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -53,8 +53,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.server.filter.AMQPFilterTypes;
import org.apache.qpid.server.exchange.ExchangeDefaults;
+import org.apache.qpid.server.filter.AMQPFilterTypes;
import org.apache.qpid.server.filter.SelectorParsingException;
import org.apache.qpid.server.filter.selector.ParseException;
import org.apache.qpid.server.filter.selector.TokenMgrError;
@@ -146,17 +146,18 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
private int _nextOutgoingDeliveryId;
- private UnsignedInteger _outgoingSessionCredit;
- private UnsignedInteger _initialOutgoingId = UnsignedInteger.valueOf(0);
- private SequenceNumber _nextIncomingTransferId;
- private SequenceNumber _nextOutgoingTransferId = new SequenceNumber(_initialOutgoingId.intValue());
+ private UnsignedInteger _initialOutgoingId = UnsignedInteger.ZERO;
+ private SequenceNumber _nextIncomingId;
+ private final int _incomingWindow;
+ private SequenceNumber _nextOutgoingId = new SequenceNumber(_initialOutgoingId.intValue());
+ private int _outgoingWindow = DEFAULT_SESSION_BUFFER_SIZE;
+ private UnsignedInteger _remoteIncomingWindow;
+ private UnsignedInteger _remoteOutgoingWindow = UnsignedInteger.ZERO;
+ private UnsignedInteger _lastSentIncomingLimit;
private LinkedHashMap<UnsignedInteger,Delivery> _outgoingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
private LinkedHashMap<UnsignedInteger,Delivery> _incomingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
- private final int _incomingWindowSize;
- private int _availableOutgoingCredit = DEFAULT_SESSION_BUFFER_SIZE;
- private UnsignedInteger _lastSentIncomingLimit;
private final Error _sessionEndedLinkError =
new Error(LinkError.DETACH_FORCED,
@@ -173,16 +174,16 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
Begin begin,
short sendingChannelId,
short receivingChannelId,
- int incomingWindowSize)
+ int incomingWindow)
{
super(connection, sendingChannelId);
_sendingChannel = sendingChannelId;
_receivingChannel = receivingChannelId;
_sessionState = SessionState.ACTIVE;
- _nextIncomingTransferId = new SequenceNumber(begin.getNextOutgoingId().intValue());
+ _nextIncomingId = new SequenceNumber(begin.getNextOutgoingId().intValue());
_connection = connection;
_primaryDomain = getPrimaryDomain();
- _incomingWindowSize = incomingWindowSize;
+ _incomingWindow = incomingWindow;
AccessController.doPrivileged((new PrivilegedAction<Object>()
{
@@ -264,8 +265,8 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
public boolean hasCreditToSend()
{
- boolean b = _outgoingSessionCredit != null && _outgoingSessionCredit.intValue() > 0;
- boolean b1 = getOutgoingWindowSize() != null && getOutgoingWindowSize().compareTo(UnsignedInteger.ZERO) > 0;
+ boolean b = _remoteIncomingWindow != null && _remoteIncomingWindow.intValue() > 0;
+ boolean b1 = getOutgoingWindow() != null && getOutgoingWindow().compareTo(UnsignedInteger.ZERO) > 0;
return b && b1;
}
@@ -276,7 +277,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
public void sendTransfer(final Transfer xfr, final SendingLinkEndpoint endpoint, final boolean newDelivery)
{
- _nextOutgoingTransferId.incr();
+ _nextOutgoingId.incr();
UnsignedInteger deliveryId;
final boolean settled = Boolean.TRUE.equals(xfr.getSettled());
if (newDelivery)
@@ -287,7 +288,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
{
final Delivery delivery = new Delivery(xfr, endpoint);
_outgoingUnsettled.put(deliveryId, delivery);
- _outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
+ _remoteIncomingWindow = _remoteIncomingWindow.subtract(UnsignedInteger.ONE);
endpoint.addUnsettled(delivery);
}
}
@@ -300,11 +301,11 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
if (!settled)
{
delivery.addTransfer(xfr);
- _outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
+ _remoteIncomingWindow = _remoteIncomingWindow.subtract(UnsignedInteger.ONE);
}
else
{
- _outgoingSessionCredit = _outgoingSessionCredit.add(new UnsignedInteger(delivery.getNumberOfTransfers()));
+ _remoteIncomingWindow = _remoteIncomingWindow.add(new UnsignedInteger(delivery.getNumberOfTransfers()));
endpoint.settle(delivery.getDeliveryTag());
_outgoingUnsettled.remove(deliveryId);
}
@@ -382,19 +383,19 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
public UnsignedInteger getNextOutgoingId()
{
- return UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue());
+ return UnsignedInteger.valueOf(_nextOutgoingId.intValue());
}
public void sendFlowConditional()
{
- if(_nextIncomingTransferId != null)
+ if(_nextIncomingId != null)
{
UnsignedInteger clientsCredit =
- _lastSentIncomingLimit.subtract(UnsignedInteger.valueOf(_nextIncomingTransferId.intValue()));
+ _lastSentIncomingLimit.subtract(UnsignedInteger.valueOf(_nextIncomingId.intValue()));
// TODO - we should use a better metric here, and/or manage session credit across the whole connection
// send a flow if the window is at least half used up
- if (UnsignedInteger.valueOf(_incomingWindowSize).subtract(clientsCredit).compareTo(clientsCredit) >= 0)
+ if (UnsignedInteger.valueOf(_incomingWindow).subtract(clientsCredit).compareTo(clientsCredit) >= 0)
{
sendFlow();
}
@@ -402,38 +403,53 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
}
- public UnsignedInteger getOutgoingWindowSize()
+ public UnsignedInteger getOutgoingWindow()
{
- return UnsignedInteger.valueOf(_availableOutgoingCredit);
+ return UnsignedInteger.valueOf(_outgoingWindow);
}
public void receiveFlow(final Flow flow)
{
UnsignedInteger handle = flow.getHandle();
- final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint = handle == null ? null : _inputHandleToEndpoint.get(handle);
+ final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint =
+ handle == null ? null : _inputHandleToEndpoint.get(handle);
final UnsignedInteger nextOutgoingId =
flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
- int limit = (nextOutgoingId.intValue() + flow.getIncomingWindow().intValue());
- _outgoingSessionCredit = UnsignedInteger.valueOf(limit - _nextOutgoingTransferId.intValue());
+ long limit = (nextOutgoingId.longValue() + flow.getIncomingWindow().longValue());
+ _remoteIncomingWindow = UnsignedInteger.valueOf(limit - _nextOutgoingId.longValue());
+
+ _nextIncomingId = new SequenceNumber(flow.getNextOutgoingId().intValue());
+ _remoteOutgoingWindow = flow.getOutgoingWindow();
if (endpoint != null)
{
endpoint.receiveFlow(flow);
+
+ if (Boolean.TRUE.equals(flow.getEcho()))
+ {
+ endpoint.sendFlow();
+ }
}
else
{
- final Collection<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> allLinkEndpoints = _inputHandleToEndpoint.values();
+ final Collection<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> allLinkEndpoints =
+ _inputHandleToEndpoint.values();
for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> le : allLinkEndpoints)
{
le.flowStateChanged();
}
+
+ if (Boolean.TRUE.equals(flow.getEcho()))
+ {
+ sendFlow();
+ }
}
}
public void setNextIncomingId(final UnsignedInteger nextIncomingId)
{
- _nextIncomingTransferId = new SequenceNumber(nextIncomingId.intValue());
+ _nextIncomingId = new SequenceNumber(nextIncomingId.intValue());
}
@@ -496,22 +512,22 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
public void sendFlow(final Flow flow)
{
- if(_nextIncomingTransferId != null)
+ if(_nextIncomingId != null)
{
- final int nextIncomingId = _nextIncomingTransferId.intValue();
+ final long nextIncomingId = _nextIncomingId.longValue();
flow.setNextIncomingId(UnsignedInteger.valueOf(nextIncomingId));
- _lastSentIncomingLimit = UnsignedInteger.valueOf(nextIncomingId + _incomingWindowSize);
+ _lastSentIncomingLimit = UnsignedInteger.valueOf(nextIncomingId + _incomingWindow);
}
- flow.setIncomingWindow(UnsignedInteger.valueOf(_incomingWindowSize));
+ flow.setIncomingWindow(UnsignedInteger.valueOf(_incomingWindow));
- flow.setNextOutgoingId(UnsignedInteger.valueOf(_nextOutgoingTransferId.intValue()));
- flow.setOutgoingWindow(UnsignedInteger.valueOf(_availableOutgoingCredit));
+ flow.setNextOutgoingId(UnsignedInteger.valueOf(_nextOutgoingId.intValue()));
+ flow.setOutgoingWindow(UnsignedInteger.valueOf(_outgoingWindow));
send(flow);
}
- public void setOutgoingSessionCredit(final UnsignedInteger outgoingSessionCredit)
+ public void setRemoteIncomingWindow(final UnsignedInteger remoteIncomingWindow)
{
- _outgoingSessionCredit = outgoingSessionCredit;
+ _remoteIncomingWindow = remoteIncomingWindow;
}
public void receiveDetach(final Detach detach)
@@ -565,7 +581,8 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
public void receiveTransfer(final Transfer transfer)
{
- _nextIncomingTransferId.incr();
+ _nextIncomingId.incr();
+ _remoteOutgoingWindow = _remoteOutgoingWindow.subtract(UnsignedInteger.ONE);
UnsignedInteger inputHandle = transfer.getHandle();
LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint = _inputHandleToEndpoint.get(inputHandle);
@@ -654,9 +671,9 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
return _sessionState == SessionState.ENDED || _connection.isClosed();
}
- UnsignedInteger getIncomingWindowSize()
+ UnsignedInteger getIncomingWindow()
{
- return UnsignedInteger.valueOf(_incomingWindowSize);
+ return UnsignedInteger.valueOf(_incomingWindow);
}
AccessControlContext getAccessControllerContext()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f166e530/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java
index 764bd6f..2d11684 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/Flow.java
@@ -27,8 +27,8 @@ import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.qpid.server.protocol.v1_0.ConnectionHandler;
-import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.CompositeTypeField;
+import org.apache.qpid.server.protocol.v1_0.type.FrameBody;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
public class Flow implements FrameBody
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/f166e530/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
----------------------------------------------------------------------
diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
index ccb05f7..564df3b 100644
--- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
+++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/link/FlowTest.java
@@ -23,14 +23,13 @@ package org.apache.qpid.tests.protocol.v1_0.transport.link;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.fail;
+import static org.hamcrest.core.IsNot.not;
import java.net.InetSocketAddress;
-import org.junit.Ignore;
import org.junit.Test;
import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
@@ -71,22 +70,53 @@ public class FlowTest extends ProtocolTestBase
}
@Test
- @Ignore("QPID-7748")
@SpecificationTest(section = "2.7.4",
description = "If set to true then the receiver SHOULD send its state at the earliest convenient opportunity.")
- public void echoFlow() throws Exception
+ public void sessionEchoFlow() throws Exception
+ {
+ final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
+ try (FrameTransport transport = new FrameTransport(addr))
+ {
+ transport.doBeginSession();
+ Flow flow = new Flow();
+ flow.setIncomingWindow(UnsignedInteger.ZERO);
+ flow.setNextIncomingId(UnsignedInteger.ZERO);
+ flow.setOutgoingWindow(UnsignedInteger.ZERO);
+ flow.setNextOutgoingId(UnsignedInteger.ZERO);
+ flow.setEcho(Boolean.TRUE);
+
+ transport.sendPerformative(flow);
+ PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
+
+ assertThat(response, is(notNullValue()));
+ assertThat(response.getFrameBody(), is(instanceOf(Flow.class)));
+ Flow responseFlow = (Flow) response.getFrameBody();
+ assertThat(responseFlow.getEcho(), not(equalTo(Boolean.TRUE)));
+ assertThat(responseFlow.getHandle(), is(nullValue()));
+ }
+ }
+
+ @Test
+ @SpecificationTest(section = "2.7.4",
+ description = "If set to true then the receiver SHOULD send its state at the earliest convenient opportunity.")
+ public void linkEchoFlow() throws Exception
{
getBrokerAdmin().createQueue(BrokerAdmin.TEST_QUEUE_NAME);
final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.ANONYMOUS_AMQP);
try (FrameTransport transport = new FrameTransport(addr))
{
- transport.doAttachReceivingLink(BrokerAdmin.TEST_QUEUE_NAME);
+ final UnsignedInteger handle = UnsignedInteger.ONE;
+ transport.doAttachSendingLink(handle, BrokerAdmin.TEST_QUEUE_NAME);
Flow flow = new Flow();
flow.setIncomingWindow(UnsignedInteger.ZERO);
flow.setNextIncomingId(UnsignedInteger.ZERO);
flow.setOutgoingWindow(UnsignedInteger.ZERO);
flow.setNextOutgoingId(UnsignedInteger.ZERO);
flow.setEcho(Boolean.TRUE);
+ flow.setAvailable(UnsignedInteger.valueOf(10));
+ flow.setDeliveryCount(UnsignedInteger.ZERO);
+ flow.setLinkCredit(UnsignedInteger.ZERO);
+ flow.setHandle(handle);
transport.sendPerformative(flow);
PerformativeResponse response = (PerformativeResponse) transport.getNextResponse();
@@ -94,7 +124,8 @@ public class FlowTest extends ProtocolTestBase
assertThat(response, is(notNullValue()));
assertThat(response.getFrameBody(), is(instanceOf(Flow.class)));
Flow responseFlow = (Flow) response.getFrameBody();
- assertThat(responseFlow.getEcho(), is(equalTo(Boolean.FALSE)));
+ assertThat(responseFlow.getEcho(), not(equalTo(Boolean.TRUE)));
+ assertThat(responseFlow.getHandle(), is(notNullValue()));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org