You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/06 16:39:35 UTC
[3/3] qpid-broker-j git commit: QPID-7739: [Java Broker] [AMQP 1.0]
Change type of field Session_1_0#_remoteIncomingWindow to long and decrement
it on every transfer
QPID-7739: [Java Broker] [AMQP 1.0] Change type of field Session_1_0#_remoteIncomingWindow to long and decrement it on every transfer
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/57e647cf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/57e647cf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/57e647cf
Branch: refs/heads/master
Commit: 57e647cf96c6a810349124f4099228c448d5c3c3
Parents: ddc1538
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Jun 6 17:03:12 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Jun 6 17:09:00 2017 +0100
----------------------------------------------------------------------
.../qpid/server/protocol/v1_0/Session_1_0.java | 85 ++++++++++----------
1 file changed, 42 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/57e647cf/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index fdcbefa..07a228e 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -155,7 +155,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
private final UnsignedInteger _incomingWindow;
private SequenceNumber _nextOutgoingId = new SequenceNumber(_initialOutgoingId.intValue());
private final UnsignedInteger _outgoingWindow = UnsignedInteger.valueOf(DEFAULT_SESSION_BUFFER_SIZE);
- private UnsignedInteger _remoteIncomingWindow;
+ private volatile long _remoteIncomingWindow;
private UnsignedInteger _remoteOutgoingWindow = UnsignedInteger.ZERO;
private UnsignedInteger _lastSentIncomingLimit;
@@ -269,7 +269,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
public boolean hasCreditToSend()
{
- boolean b = _remoteIncomingWindow != null && _remoteIncomingWindow.intValue() > 0;
+ boolean b = _remoteIncomingWindow > 0;
boolean b1 = getOutgoingWindow() != null && getOutgoingWindow().compareTo(UnsignedInteger.ZERO) > 0;
return b && b1;
}
@@ -292,7 +292,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
{
final Delivery delivery = new Delivery(xfr, endpoint);
_outgoingUnsettled.put(deliveryId, delivery);
- _remoteIncomingWindow = _remoteIncomingWindow.subtract(UnsignedInteger.ONE);
endpoint.addUnsettled(delivery);
}
}
@@ -305,18 +304,16 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
if (!settled)
{
delivery.addTransfer(xfr);
- _remoteIncomingWindow = _remoteIncomingWindow.subtract(UnsignedInteger.ONE);
}
else
{
- _remoteIncomingWindow = _remoteIncomingWindow.add(new UnsignedInteger(delivery.getNumberOfTransfers()));
endpoint.settle(delivery.getDeliveryTag());
_outgoingUnsettled.remove(deliveryId);
}
}
}
xfr.setDeliveryId(deliveryId);
-
+ _remoteIncomingWindow--;
try
{
List<QpidByteBuffer> payload = xfr.getPayload();
@@ -414,49 +411,56 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
public void receiveFlow(final Flow flow)
{
- UnsignedInteger handle = flow.getHandle();
- final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint =
- handle == null ? null : _inputHandleToEndpoint.get(handle);
-
- final UnsignedInteger nextOutgoingId =
- flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
- long limit = (nextOutgoingId.longValue() + flow.getIncomingWindow().longValue());
- _remoteIncomingWindow = UnsignedInteger.valueOf(limit - _nextOutgoingId.longValue());
-
- _nextIncomingId = new SequenceNumber(flow.getNextOutgoingId().intValue());
- _remoteOutgoingWindow = flow.getOutgoingWindow();
-
- if (endpoint != null)
+ final SequenceNumber flowNextIncomingId = new SequenceNumber(flow.getNextIncomingId() == null
+ ? _initialOutgoingId.intValue()
+ : flow.getNextIncomingId().intValue());
+ if (flowNextIncomingId.compareTo(_nextOutgoingId) > 0)
{
- endpoint.receiveFlow(flow);
-
- if (Boolean.TRUE.equals(flow.getEcho()))
- {
- endpoint.sendFlow();
- }
+ final End end = new End();
+ end.setError(new Error(SessionError.WINDOW_VIOLATION,
+ String.format("Next incoming id '%d' exceeds next outgoing id '%d'",
+ flowNextIncomingId,
+ _nextOutgoingId)));
+ end(end);
}
else
{
- final Collection<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> allLinkEndpoints =
- _inputHandleToEndpoint.values();
- for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> le : allLinkEndpoints)
+ _remoteIncomingWindow = flowNextIncomingId.longValue() + flow.getIncomingWindow().longValue()
+ - _nextOutgoingId.longValue();
+
+ _nextIncomingId = new SequenceNumber(flow.getNextOutgoingId().intValue());
+ _remoteOutgoingWindow = flow.getOutgoingWindow();
+
+ UnsignedInteger handle = flow.getHandle();
+ final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint =
+ handle == null ? null : _inputHandleToEndpoint.get(handle);
+
+ if (endpoint != null)
{
- le.flowStateChanged();
- }
+ endpoint.receiveFlow(flow);
- if (Boolean.TRUE.equals(flow.getEcho()))
+ if (Boolean.TRUE.equals(flow.getEcho()))
+ {
+ endpoint.sendFlow();
+ }
+ }
+ else
{
- sendFlow();
+ final Collection<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> allLinkEndpoints =
+ _inputHandleToEndpoint.values();
+ for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> le : allLinkEndpoints)
+ {
+ le.flowStateChanged();
+ }
+
+ if (Boolean.TRUE.equals(flow.getEcho()))
+ {
+ sendFlow();
+ }
}
}
}
- public void setNextIncomingId(final UnsignedInteger nextIncomingId)
- {
- _nextIncomingId = new SequenceNumber(nextIncomingId.intValue());
-
- }
-
public void receiveDisposition(final Disposition disposition)
{
Role dispositionRole = disposition.getRole();
@@ -532,11 +536,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
send(flow);
}
- public void setRemoteIncomingWindow(final UnsignedInteger remoteIncomingWindow)
- {
- _remoteIncomingWindow = remoteIncomingWindow;
- }
-
public void receiveDetach(final Detach detach)
{
UnsignedInteger handle = detach.getHandle();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org