You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/05/11 13:22:59 UTC

qpid-broker-j git commit: Revert "QPID-7748: [Java Broker] Remove superfluous SequenceNumber"

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master 01beffa6b -> c94d60e31


Revert "QPID-7748: [Java Broker] Remove superfluous SequenceNumber"

This reverts commit 5017eb060c30a1b6d265defa17ad7b798c2f2f12.
The difference in the compareTo method between SequenceNumber and
UnsignedInteger prohibits the removal of the former.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c94d60e3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c94d60e3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c94d60e3

Branch: refs/heads/master
Commit: c94d60e31817a68413905b398f03a4d4cc9adec5
Parents: 01beffa
Author: Lorenz Quack <lq...@apache.org>
Authored: Thu May 11 14:22:43 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Thu May 11 14:22:43 2017 +0100

----------------------------------------------------------------------
 .../server/protocol/v1_0/SequenceNumber.java    | 115 +++++++++++++++++++
 .../qpid/server/protocol/v1_0/Session_1_0.java  |  39 ++++---
 .../protocol/v1_0/type/UnsignedInteger.java     |  10 --
 3 files changed, 138 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c94d60e3/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
new file mode 100644
index 0000000..6abe4e3
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable
+{
+    private int _seqNo;
+
+    public SequenceNumber(int seqNo)
+    {
+        _seqNo = seqNo;
+    }
+
+    public SequenceNumber incr()
+    {
+        _seqNo++;
+        return this;
+    }
+
+    public SequenceNumber decr()
+    {
+        _seqNo--;
+        return this;
+    }
+
+    public static SequenceNumber add(SequenceNumber a, int i)
+    {
+        return a.clone().add(i);
+    }
+
+    public static SequenceNumber subtract(SequenceNumber a, int i)
+    {
+        return a.clone().add(-i);
+    }
+
+    private SequenceNumber add(int i)
+    {
+        _seqNo+=i;
+        return this;
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o)
+        {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass())
+        {
+            return false;
+        }
+
+        SequenceNumber that = (SequenceNumber) o;
+
+        if (_seqNo != that._seqNo)
+        {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return _seqNo;
+    }
+
+    public int compareTo(SequenceNumber o)
+    {
+        return _seqNo - o._seqNo;
+    }
+
+    @Override
+    public SequenceNumber clone()
+    {
+        return new SequenceNumber(_seqNo);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "SN{" + _seqNo + '}';
+    }
+
+    public int intValue()
+    {
+        return _seqNo;
+    }
+
+    public long longValue()
+    {
+        return  ((long) _seqNo) & 0xFFFFFFFFL;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c94d60e3/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index e570fa6..c203e50 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -146,14 +146,14 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     private int _nextOutgoingDeliveryId;
 
-    private final UnsignedInteger _initialOutgoingId = UnsignedInteger.ZERO;
-    private volatile UnsignedInteger _nextIncomingId;
+    private UnsignedInteger _initialOutgoingId = UnsignedInteger.ZERO;
+    private SequenceNumber _nextIncomingId;
     private final int _incomingWindow;
-    private volatile UnsignedInteger _nextOutgoingId = _initialOutgoingId;
-    private final int _outgoingWindow = DEFAULT_SESSION_BUFFER_SIZE;
-    private volatile UnsignedInteger _remoteIncomingWindow;
-    private volatile UnsignedInteger _remoteOutgoingWindow = UnsignedInteger.ZERO;
-    private volatile UnsignedInteger _lastSentIncomingLimit;
+    private SequenceNumber _nextOutgoingId = new SequenceNumber(_initialOutgoingId.intValue());
+    private int _outgoingWindow = DEFAULT_SESSION_BUFFER_SIZE;
+    private UnsignedInteger _remoteIncomingWindow;
+    private UnsignedInteger _remoteOutgoingWindow = UnsignedInteger.ZERO;
+    private UnsignedInteger _lastSentIncomingLimit;
 
     private LinkedHashMap<UnsignedInteger,Delivery> _outgoingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
     private LinkedHashMap<UnsignedInteger,Delivery> _incomingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
@@ -180,7 +180,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         _sendingChannel = sendingChannelId;
         _receivingChannel = receivingChannelId;
         _sessionState = SessionState.ACTIVE;
-        _nextIncomingId = begin.getNextOutgoingId();
+        _nextIncomingId = new SequenceNumber(begin.getNextOutgoingId().intValue());
         _connection = connection;
         _primaryDomain = getPrimaryDomain();
         _incomingWindow = incomingWindow;
@@ -250,11 +250,12 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         {
             final LinkedHashMap<UnsignedInteger, Delivery> unsettled =
                     role == Role.RECEIVER ? _incomingUnsettled : _outgoingUnsettled;
-            UnsignedInteger pos = first;
-            while (pos.compareTo(last) <= 0)
+            SequenceNumber pos = new SequenceNumber(first.intValue());
+            SequenceNumber end = new SequenceNumber(last.intValue());
+            while (pos.compareTo(end) <= 0)
             {
-                unsettled.remove(pos);
-                pos = pos.increment();
+                unsettled.remove(new UnsignedInteger(pos.intValue()));
+                pos.incr();
             }
         }
 
@@ -276,7 +277,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     public void sendTransfer(final Transfer xfr, final SendingLinkEndpoint endpoint, final boolean newDelivery)
     {
-        _nextOutgoingId = _nextOutgoingId.increment();
+        _nextOutgoingId.incr();
         UnsignedInteger deliveryId;
         final boolean settled = Boolean.TRUE.equals(xfr.getSettled());
         if (newDelivery)
@@ -418,7 +419,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         long limit = (nextOutgoingId.longValue() + flow.getIncomingWindow().longValue());
         _remoteIncomingWindow = UnsignedInteger.valueOf(limit - _nextOutgoingId.longValue());
 
-        _nextIncomingId = new UnsignedInteger(flow.getNextOutgoingId().intValue());
+        _nextIncomingId = new SequenceNumber(flow.getNextOutgoingId().intValue());
         _remoteOutgoingWindow = flow.getOutgoingWindow();
 
         if (endpoint != null)
@@ -446,6 +447,12 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         }
     }
 
+    public void setNextIncomingId(final UnsignedInteger nextIncomingId)
+    {
+        _nextIncomingId = new SequenceNumber(nextIncomingId.intValue());
+
+    }
+
     public void receiveDisposition(final Disposition disposition)
     {
         Role dispositionRole = disposition.getRole();
@@ -574,8 +581,8 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
 
     public void receiveTransfer(final Transfer transfer)
     {
-        _nextIncomingId = _nextIncomingId.increment();
-        _remoteOutgoingWindow = _remoteOutgoingWindow.decrement();
+        _nextIncomingId.incr();
+        _remoteOutgoingWindow = _remoteOutgoingWindow.subtract(UnsignedInteger.ONE);
 
         UnsignedInteger inputHandle = transfer.getHandle();
         LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint = _inputHandleToEndpoint.get(inputHandle);

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c94d60e3/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedInteger.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedInteger.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedInteger.java
index 1a75a07..81b9e98 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedInteger.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedInteger.java
@@ -133,16 +133,6 @@ public final class UnsignedInteger extends Number implements Comparable<Unsigned
         return UnsignedInteger.valueOf(val);
     }
 
-    public UnsignedInteger increment()
-    {
-        return add(UnsignedInteger.ONE);
-    }
-
-    public UnsignedInteger decrement()
-    {
-        return subtract(UnsignedInteger.ONE);
-    }
-
     public static UnsignedInteger valueOf(final String value)
     {
         long longVal = Long.parseLong(value);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org