You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/06 16:39:33 UTC

[1/3] qpid-broker-j git commit: QPID-7739: [Java Broker] [AMQP 1.0] Change type of field AbstractLinkEndpoint#_deliveryCount to SequenceNumber

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 5a360f8ba -> 57e647cf9


QPID-7739: [Java Broker] [AMQP 1.0] Change type of field AbstractLinkEndpoint#_deliveryCount to SequenceNumber


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/268338ab
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/268338ab
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/268338ab

Branch: refs/heads/master
Commit: 268338aba0764f7e554573417699d3b2fc9ffa5f
Parents: 5a360f8
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Jun 6 12:04:39 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Jun 6 17:07:47 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/AbstractLinkEndpoint.java     | 16 +++++------
 .../v1_0/AbstractReceivingLinkEndpoint.java     |  4 +--
 .../protocol/v1_0/SendingLinkEndpoint.java      | 21 +++++++-------
 .../server/protocol/v1_0/SequenceNumber.java    | 29 +++++++-------------
 .../v1_0/StandardReceivingLinkEndpoint.java     |  2 +-
 .../TxnCoordinatorReceivingLinkEndpoint.java    |  2 +-
 6 files changed, 33 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/268338ab/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 0c99258..87dc899 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -60,7 +60,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
     private volatile boolean _stopped;
     private volatile boolean _stoppedUpdated;
     private Symbol[] _capabilities;
-    private UnsignedInteger _deliveryCount;
+    private SequenceNumber _deliveryCount;
     private UnsignedInteger _linkCredit;
     private UnsignedInteger _available;
     private Boolean _drain;
@@ -174,7 +174,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
         return getSession().getConnection().getAddressSpace();
     }
 
-    public void setDeliveryCount(final UnsignedInteger deliveryCount)
+    protected void setDeliveryCount(final SequenceNumber deliveryCount)
     {
         _deliveryCount = deliveryCount;
     }
@@ -194,7 +194,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
         _drain = drain;
     }
 
-    public UnsignedInteger getDeliveryCount()
+    protected SequenceNumber getDeliveryCount()
     {
         return _deliveryCount;
     }
@@ -303,7 +303,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
 
         if (getRole() == Role.SENDER)
         {
-            attachToSend.setInitialDeliveryCount(_deliveryCount);
+            attachToSend.setInitialDeliveryCount(_deliveryCount.unsignedIntegerValue());
         }
 
         switch (_state)
@@ -402,7 +402,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
             }
             else
             {
-                UnsignedInteger clientsCredit = _lastSentCreditLimit.subtract(_deliveryCount);
+                UnsignedInteger clientsCredit = _lastSentCreditLimit.subtract(_deliveryCount.unsignedIntegerValue());
 
                 // client has used up over half their credit allowance ?
                 boolean sendFlow = _linkCredit.subtract(clientsCredit).compareTo(clientsCredit) >= 0;
@@ -445,18 +445,18 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
         if(_state == State.ATTACHED || _state == State.ATTACH_SENT)
         {
             Flow flow = new Flow();
-            flow.setDeliveryCount(_deliveryCount);
+            flow.setDeliveryCount(_deliveryCount.unsignedIntegerValue());
             flow.setEcho(echo);
             if(_stopped)
             {
                 flow.setLinkCredit(UnsignedInteger.ZERO);
                 flow.setDrain(true);
-                _lastSentCreditLimit = _deliveryCount;
+                _lastSentCreditLimit = _deliveryCount.unsignedIntegerValue();
             }
             else
             {
                 flow.setLinkCredit(_linkCredit);
-                _lastSentCreditLimit = _linkCredit.add(_deliveryCount);
+                _lastSentCreditLimit = _linkCredit.add(_deliveryCount.unsignedIntegerValue());
                 flow.setDrain(_drain);
             }
             flow.setAvailable(_available);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/268338ab/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index abc82a9..22c8b3f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -129,7 +129,7 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
                 }
                 _unsettledIds.put(deliveryTag, transientState);
                 setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
-                setDeliveryCount(getDeliveryCount().add(UnsignedInteger.ONE));
+                getDeliveryCount().incr();
             }
             else
             {
@@ -159,7 +159,7 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
     @Override public void receiveFlow(final Flow flow)
     {
         setAvailable(flow.getAvailable());
-        setDeliveryCount(flow.getDeliveryCount());
+        setDeliveryCount(new SequenceNumber(flow.getDeliveryCount().intValue()));
     }
 
     public boolean settled(final Binary deliveryTag)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/268338ab/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index b9f1e7b..1f346afd 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -100,7 +100,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
     public SendingLinkEndpoint(final Session_1_0 session, final LinkImpl<Source, Target> link)
     {
         super(session, link);
-        setDeliveryCount(UnsignedInteger.valueOf(0));
+        setDeliveryCount(new SequenceNumber(0));
         setAvailable(UnsignedInteger.valueOf(0));
         setCapabilities(Arrays.asList(AMQPConnection_1_0.SHARED_SUBSCRIPTIONS));
     }
@@ -390,7 +390,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
             setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
         }
 
-        setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() + 1)));
+        getDeliveryCount().incr();
 
         xfr.setHandle(getLocalHandle());
 
@@ -418,7 +418,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
     {
         if (_draining)
         {
-            setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
+            getDeliveryCount().add(getLinkCredit().intValue());
             setLinkCredit(UnsignedInteger.ZERO);
             sendFlow();
             _draining = false;
@@ -433,8 +433,8 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
     @Override
     public void receiveFlow(final Flow flow)
     {
-        UnsignedInteger t = flow.getDeliveryCount();
-        UnsignedInteger c = flow.getLinkCredit();
+        UnsignedInteger receiverDeliveryCount = flow.getDeliveryCount();
+        UnsignedInteger receiverLinkCredit = flow.getLinkCredit();
         setDrain(flow.getDrain());
 
         Map options;
@@ -443,20 +443,21 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
              _transactionId = (Binary) options.get(Symbol.valueOf("txn-id"));
         }
 
-        if(t == null)
+        if(receiverDeliveryCount == null)
         {
-            setLinkCredit(c);
+            setLinkCredit(receiverLinkCredit);
         }
         else
         {
-            UnsignedInteger limit = t.add(c);
-            if(limit.compareTo(getDeliveryCount())<=0)
+            // 2.6.7 Flow Control : link_credit_snd := delivery_count_rcv + link_credit_rcv - delivery_count_snd
+            UnsignedInteger limit = receiverDeliveryCount.add(receiverLinkCredit);
+            if(limit.compareTo(getDeliveryCount().unsignedIntegerValue())<=0)
             {
                 setLinkCredit(UnsignedInteger.valueOf(0));
             }
             else
             {
-                setLinkCredit(limit.subtract(getDeliveryCount()));
+                setLinkCredit(limit.subtract(getDeliveryCount().unsignedIntegerValue()));
             }
         }
         flowStateChanged();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/268338ab/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
index 6abe4e3..8bb01ce 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
@@ -21,9 +21,11 @@
 
 package org.apache.qpid.server.protocol.v1_0;
 
-public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+
+public class SequenceNumber implements Comparable<SequenceNumber>
 {
-    private int _seqNo;
+    private volatile int _seqNo;
 
     public SequenceNumber(int seqNo)
     {
@@ -42,17 +44,7 @@ public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable
         return this;
     }
 
-    public static SequenceNumber add(SequenceNumber a, int i)
-    {
-        return a.clone().add(i);
-    }
-
-    public static SequenceNumber subtract(SequenceNumber a, int i)
-    {
-        return a.clone().add(-i);
-    }
-
-    private SequenceNumber add(int i)
+    public SequenceNumber add(int i)
     {
         _seqNo+=i;
         return this;
@@ -92,12 +84,6 @@ public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable
     }
 
     @Override
-    public SequenceNumber clone()
-    {
-        return new SequenceNumber(_seqNo);
-    }
-
-    @Override
     public String toString()
     {
         return "SN{" + _seqNo + '}';
@@ -112,4 +98,9 @@ public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable
     {
         return  ((long) _seqNo) & 0xFFFFFFFFL;
     }
+
+    public UnsignedInteger unsignedIntegerValue()
+    {
+        return UnsignedInteger.valueOf(_seqNo);
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/268338ab/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 92f0eb4..7d837d5 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -467,7 +467,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         Target target = new Target();
         Target attachTarget = (Target) attach.getTarget();
 
-        setDeliveryCount(attach.getInitialDeliveryCount());
+        setDeliveryCount(new SequenceNumber(attach.getInitialDeliveryCount().intValue()));
 
         target.setAddress(attachTarget.getAddress());
         target.setDynamic(attachTarget.getDynamic());

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/268338ab/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index 9bb3406..d6a7be6 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -260,7 +260,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
     public void attachReceived(final Attach attach) throws AmqpErrorException
     {
         super.attachReceived(attach);
-        setDeliveryCount(attach.getInitialDeliveryCount());
+        setDeliveryCount(new SequenceNumber(attach.getInitialDeliveryCount().intValue()));
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/3] qpid-broker-j git commit: QPID-7739: [Java Broker] [AMQP 1.0] Change type of field Session_1_0#_remoteIncomingWindow to long and decrement it on every transfer

Posted by or...@apache.org.
QPID-7739: [Java Broker] [AMQP 1.0] Change type of field Session_1_0#_remoteIncomingWindow to long and decrement it on every transfer


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/57e647cf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/57e647cf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/57e647cf

Branch: refs/heads/master
Commit: 57e647cf96c6a810349124f4099228c448d5c3c3
Parents: ddc1538
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Jun 6 17:03:12 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Jun 6 17:09:00 2017 +0100

----------------------------------------------------------------------
 .../qpid/server/protocol/v1_0/Session_1_0.java  | 85 ++++++++++----------
 1 file changed, 42 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/57e647cf/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index fdcbefa..07a228e 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -155,7 +155,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
     private final UnsignedInteger _incomingWindow;
     private SequenceNumber _nextOutgoingId = new SequenceNumber(_initialOutgoingId.intValue());
     private final UnsignedInteger _outgoingWindow = UnsignedInteger.valueOf(DEFAULT_SESSION_BUFFER_SIZE);
-    private UnsignedInteger _remoteIncomingWindow;
+    private volatile long _remoteIncomingWindow;
     private UnsignedInteger _remoteOutgoingWindow = UnsignedInteger.ZERO;
     private UnsignedInteger _lastSentIncomingLimit;
 
@@ -269,7 +269,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     public boolean hasCreditToSend()
     {
-        boolean b = _remoteIncomingWindow != null && _remoteIncomingWindow.intValue() > 0;
+        boolean b = _remoteIncomingWindow > 0;
         boolean b1 = getOutgoingWindow() != null && getOutgoingWindow().compareTo(UnsignedInteger.ZERO) > 0;
         return b && b1;
     }
@@ -292,7 +292,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
             {
                 final Delivery delivery = new Delivery(xfr, endpoint);
                 _outgoingUnsettled.put(deliveryId, delivery);
-                _remoteIncomingWindow = _remoteIncomingWindow.subtract(UnsignedInteger.ONE);
                 endpoint.addUnsettled(delivery);
             }
         }
@@ -305,18 +304,16 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
                 if (!settled)
                 {
                     delivery.addTransfer(xfr);
-                    _remoteIncomingWindow = _remoteIncomingWindow.subtract(UnsignedInteger.ONE);
                 }
                 else
                 {
-                    _remoteIncomingWindow = _remoteIncomingWindow.add(new UnsignedInteger(delivery.getNumberOfTransfers()));
                     endpoint.settle(delivery.getDeliveryTag());
                     _outgoingUnsettled.remove(deliveryId);
                 }
             }
         }
         xfr.setDeliveryId(deliveryId);
-
+        _remoteIncomingWindow--;
         try
         {
             List<QpidByteBuffer> payload = xfr.getPayload();
@@ -414,49 +411,56 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     public void receiveFlow(final Flow flow)
     {
-        UnsignedInteger handle = flow.getHandle();
-        final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint =
-                handle == null ? null : _inputHandleToEndpoint.get(handle);
-
-        final UnsignedInteger nextOutgoingId =
-                flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
-        long limit = (nextOutgoingId.longValue() + flow.getIncomingWindow().longValue());
-        _remoteIncomingWindow = UnsignedInteger.valueOf(limit - _nextOutgoingId.longValue());
-
-        _nextIncomingId = new SequenceNumber(flow.getNextOutgoingId().intValue());
-        _remoteOutgoingWindow = flow.getOutgoingWindow();
-
-        if (endpoint != null)
+        final SequenceNumber flowNextIncomingId = new SequenceNumber(flow.getNextIncomingId() == null
+                                                                             ? _initialOutgoingId.intValue()
+                                                                             : flow.getNextIncomingId().intValue());
+        if (flowNextIncomingId.compareTo(_nextOutgoingId) > 0)
         {
-            endpoint.receiveFlow(flow);
-
-            if (Boolean.TRUE.equals(flow.getEcho()))
-            {
-                endpoint.sendFlow();
-            }
+            final End end = new End();
+            end.setError(new Error(SessionError.WINDOW_VIOLATION,
+                                   String.format("Next incoming id '%d' exceeds next outgoing id '%d'",
+                                                 flowNextIncomingId,
+                                                 _nextOutgoingId)));
+            end(end);
         }
         else
         {
-            final Collection<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> allLinkEndpoints =
-                    _inputHandleToEndpoint.values();
-            for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> le : allLinkEndpoints)
+            _remoteIncomingWindow = flowNextIncomingId.longValue() + flow.getIncomingWindow().longValue()
+                                    - _nextOutgoingId.longValue();
+
+            _nextIncomingId = new SequenceNumber(flow.getNextOutgoingId().intValue());
+            _remoteOutgoingWindow = flow.getOutgoingWindow();
+
+            UnsignedInteger handle = flow.getHandle();
+            final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint =
+                    handle == null ? null : _inputHandleToEndpoint.get(handle);
+
+            if (endpoint != null)
             {
-                le.flowStateChanged();
-            }
+                endpoint.receiveFlow(flow);
 
-            if (Boolean.TRUE.equals(flow.getEcho()))
+                if (Boolean.TRUE.equals(flow.getEcho()))
+                {
+                    endpoint.sendFlow();
+                }
+            }
+            else
             {
-                sendFlow();
+                final Collection<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> allLinkEndpoints =
+                        _inputHandleToEndpoint.values();
+                for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> le : allLinkEndpoints)
+                {
+                    le.flowStateChanged();
+                }
+
+                if (Boolean.TRUE.equals(flow.getEcho()))
+                {
+                    sendFlow();
+                }
             }
         }
     }
 
-    public void setNextIncomingId(final UnsignedInteger nextIncomingId)
-    {
-        _nextIncomingId = new SequenceNumber(nextIncomingId.intValue());
-
-    }
-
     public void receiveDisposition(final Disposition disposition)
     {
         Role dispositionRole = disposition.getRole();
@@ -532,11 +536,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         send(flow);
     }
 
-    public void setRemoteIncomingWindow(final UnsignedInteger remoteIncomingWindow)
-    {
-        _remoteIncomingWindow = remoteIncomingWindow;
-    }
-
     public void receiveDetach(final Detach detach)
     {
         UnsignedInteger handle = detach.getHandle();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/3] qpid-broker-j git commit: QPID-7739: [Java Broker] [AMQP 1.0] Change type of fields Session_1_0#_incomingWindow and Session_1_0#_outgoingWindow to UnsignedInteger

Posted by or...@apache.org.
QPID-7739: [Java Broker] [AMQP 1.0] Change type of fields Session_1_0#_incomingWindow and Session_1_0#_outgoingWindow to UnsignedInteger


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ddc15384
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ddc15384
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ddc15384

Branch: refs/heads/master
Commit: ddc15384b464aa9a23ec51dfabe9a1e48ce35c37
Parents: 268338a
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Jun 6 13:50:29 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Jun 6 17:08:34 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/AMQPConnection_1_0Impl.java   |  2 +-
 .../qpid/server/protocol/v1_0/Session_1_0.java  | 23 ++++++++++----------
 2 files changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ddc15384/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index bcae0f6..977dc8f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -616,7 +616,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
                                                           begin,
                                                           sendingChannelId,
                                                           receivingChannelId,
-                                                          getContextValue(Integer.class, AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE));
+                                                          getContextValue(Long.class, AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE));
                     session.create();
 
                     _receivingSessions[receivingChannelId] = session;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ddc15384/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 3ec200b..fdcbefa 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -152,9 +152,9 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     private UnsignedInteger _initialOutgoingId = UnsignedInteger.ZERO;
     private SequenceNumber _nextIncomingId;
-    private final int _incomingWindow;
+    private final UnsignedInteger _incomingWindow;
     private SequenceNumber _nextOutgoingId = new SequenceNumber(_initialOutgoingId.intValue());
-    private int _outgoingWindow = DEFAULT_SESSION_BUFFER_SIZE;
+    private final UnsignedInteger _outgoingWindow = UnsignedInteger.valueOf(DEFAULT_SESSION_BUFFER_SIZE);
     private UnsignedInteger _remoteIncomingWindow;
     private UnsignedInteger _remoteOutgoingWindow = UnsignedInteger.ZERO;
     private UnsignedInteger _lastSentIncomingLimit;
@@ -178,7 +178,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
                        Begin begin,
                        int sendingChannelId,
                        int receivingChannelId,
-                       int incomingWindow)
+                       long incomingWindow)
     {
         super(connection, sendingChannelId);
         _sendingChannel = sendingChannelId;
@@ -187,7 +187,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         _nextIncomingId = new SequenceNumber(begin.getNextOutgoingId().intValue());
         _connection = connection;
         _primaryDomain = getPrimaryDomain();
-        _incomingWindow = incomingWindow;
+        _incomingWindow = UnsignedInteger.valueOf(incomingWindow);
 
         AccessController.doPrivileged((new PrivilegedAction<Object>()
         {
@@ -399,7 +399,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
             // TODO - we should use a better metric here, and/or manage session credit across the whole connection
             // send a flow if the window is at least half used up
-            if (UnsignedInteger.valueOf(_incomingWindow).subtract(clientsCredit).compareTo(clientsCredit) >= 0)
+            if (_incomingWindow.subtract(clientsCredit).compareTo(clientsCredit) >= 0)
             {
                 sendFlow();
             }
@@ -409,7 +409,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     public UnsignedInteger getOutgoingWindow()
     {
-        return UnsignedInteger.valueOf(_outgoingWindow);
+        return _outgoingWindow;
     }
 
     public void receiveFlow(final Flow flow)
@@ -522,14 +522,13 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
     {
         if(_nextIncomingId != null)
         {
-            final long nextIncomingId = _nextIncomingId.longValue();
-            flow.setNextIncomingId(UnsignedInteger.valueOf(nextIncomingId));
-            _lastSentIncomingLimit = UnsignedInteger.valueOf(nextIncomingId + _incomingWindow);
+            flow.setNextIncomingId(_nextIncomingId.unsignedIntegerValue());
+            _lastSentIncomingLimit = _incomingWindow.add(_nextIncomingId.unsignedIntegerValue());
         }
-        flow.setIncomingWindow(UnsignedInteger.valueOf(_incomingWindow));
+        flow.setIncomingWindow(_incomingWindow);
 
         flow.setNextOutgoingId(UnsignedInteger.valueOf(_nextOutgoingId.intValue()));
-        flow.setOutgoingWindow(UnsignedInteger.valueOf(_outgoingWindow));
+        flow.setOutgoingWindow(_outgoingWindow);
         send(flow);
     }
 
@@ -679,7 +678,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     UnsignedInteger getIncomingWindow()
     {
-        return UnsignedInteger.valueOf(_incomingWindow);
+        return _incomingWindow;
     }
 
     AccessControlContext getAccessControllerContext()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org