You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/06 16:39:35 UTC

[3/3] qpid-broker-j git commit: QPID-7739: [Java Broker] [AMQP 1.0] Change type of field Session_1_0#_remoteIncomingWindow to long and decrement it on every transfer

QPID-7739: [Java Broker] [AMQP 1.0] Change type of field Session_1_0#_remoteIncomingWindow to long and decrement it on every transfer


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/57e647cf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/57e647cf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/57e647cf

Branch: refs/heads/master
Commit: 57e647cf96c6a810349124f4099228c448d5c3c3
Parents: ddc1538
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Jun 6 17:03:12 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Jun 6 17:09:00 2017 +0100

----------------------------------------------------------------------
 .../qpid/server/protocol/v1_0/Session_1_0.java  | 85 ++++++++++----------
 1 file changed, 42 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/57e647cf/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index fdcbefa..07a228e 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -155,7 +155,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
     private final UnsignedInteger _incomingWindow;
     private SequenceNumber _nextOutgoingId = new SequenceNumber(_initialOutgoingId.intValue());
     private final UnsignedInteger _outgoingWindow = UnsignedInteger.valueOf(DEFAULT_SESSION_BUFFER_SIZE);
-    private UnsignedInteger _remoteIncomingWindow;
+    private volatile long _remoteIncomingWindow;
     private UnsignedInteger _remoteOutgoingWindow = UnsignedInteger.ZERO;
     private UnsignedInteger _lastSentIncomingLimit;
 
@@ -269,7 +269,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     public boolean hasCreditToSend()
     {
-        boolean b = _remoteIncomingWindow != null && _remoteIncomingWindow.intValue() > 0;
+        boolean b = _remoteIncomingWindow > 0;
         boolean b1 = getOutgoingWindow() != null && getOutgoingWindow().compareTo(UnsignedInteger.ZERO) > 0;
         return b && b1;
     }
@@ -292,7 +292,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
             {
                 final Delivery delivery = new Delivery(xfr, endpoint);
                 _outgoingUnsettled.put(deliveryId, delivery);
-                _remoteIncomingWindow = _remoteIncomingWindow.subtract(UnsignedInteger.ONE);
                 endpoint.addUnsettled(delivery);
             }
         }
@@ -305,18 +304,16 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
                 if (!settled)
                 {
                     delivery.addTransfer(xfr);
-                    _remoteIncomingWindow = _remoteIncomingWindow.subtract(UnsignedInteger.ONE);
                 }
                 else
                 {
-                    _remoteIncomingWindow = _remoteIncomingWindow.add(new UnsignedInteger(delivery.getNumberOfTransfers()));
                     endpoint.settle(delivery.getDeliveryTag());
                     _outgoingUnsettled.remove(deliveryId);
                 }
             }
         }
         xfr.setDeliveryId(deliveryId);
-
+        _remoteIncomingWindow--;
         try
         {
             List<QpidByteBuffer> payload = xfr.getPayload();
@@ -414,49 +411,56 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     public void receiveFlow(final Flow flow)
     {
-        UnsignedInteger handle = flow.getHandle();
-        final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint =
-                handle == null ? null : _inputHandleToEndpoint.get(handle);
-
-        final UnsignedInteger nextOutgoingId =
-                flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
-        long limit = (nextOutgoingId.longValue() + flow.getIncomingWindow().longValue());
-        _remoteIncomingWindow = UnsignedInteger.valueOf(limit - _nextOutgoingId.longValue());
-
-        _nextIncomingId = new SequenceNumber(flow.getNextOutgoingId().intValue());
-        _remoteOutgoingWindow = flow.getOutgoingWindow();
-
-        if (endpoint != null)
+        final SequenceNumber flowNextIncomingId = new SequenceNumber(flow.getNextIncomingId() == null
+                                                                             ? _initialOutgoingId.intValue()
+                                                                             : flow.getNextIncomingId().intValue());
+        if (flowNextIncomingId.compareTo(_nextOutgoingId) > 0)
         {
-            endpoint.receiveFlow(flow);
-
-            if (Boolean.TRUE.equals(flow.getEcho()))
-            {
-                endpoint.sendFlow();
-            }
+            final End end = new End();
+            end.setError(new Error(SessionError.WINDOW_VIOLATION,
+                                   String.format("Next incoming id '%d' exceeds next outgoing id '%d'",
+                                                 flowNextIncomingId,
+                                                 _nextOutgoingId)));
+            end(end);
         }
         else
         {
-            final Collection<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> allLinkEndpoints =
-                    _inputHandleToEndpoint.values();
-            for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> le : allLinkEndpoints)
+            _remoteIncomingWindow = flowNextIncomingId.longValue() + flow.getIncomingWindow().longValue()
+                                    - _nextOutgoingId.longValue();
+
+            _nextIncomingId = new SequenceNumber(flow.getNextOutgoingId().intValue());
+            _remoteOutgoingWindow = flow.getOutgoingWindow();
+
+            UnsignedInteger handle = flow.getHandle();
+            final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint =
+                    handle == null ? null : _inputHandleToEndpoint.get(handle);
+
+            if (endpoint != null)
             {
-                le.flowStateChanged();
-            }
+                endpoint.receiveFlow(flow);
 
-            if (Boolean.TRUE.equals(flow.getEcho()))
+                if (Boolean.TRUE.equals(flow.getEcho()))
+                {
+                    endpoint.sendFlow();
+                }
+            }
+            else
             {
-                sendFlow();
+                final Collection<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> allLinkEndpoints =
+                        _inputHandleToEndpoint.values();
+                for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> le : allLinkEndpoints)
+                {
+                    le.flowStateChanged();
+                }
+
+                if (Boolean.TRUE.equals(flow.getEcho()))
+                {
+                    sendFlow();
+                }
             }
         }
     }
 
-    public void setNextIncomingId(final UnsignedInteger nextIncomingId)
-    {
-        _nextIncomingId = new SequenceNumber(nextIncomingId.intValue());
-
-    }
-
     public void receiveDisposition(final Disposition disposition)
     {
         Role dispositionRole = disposition.getRole();
@@ -532,11 +536,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         send(flow);
     }
 
-    public void setRemoteIncomingWindow(final UnsignedInteger remoteIncomingWindow)
-    {
-        _remoteIncomingWindow = remoteIncomingWindow;
-    }
-
     public void receiveDetach(final Detach detach)
     {
         UnsignedInteger handle = detach.getHandle();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org