You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/05/11 13:22:59 UTC
qpid-broker-j git commit: Revert "QPID-7748: [Java Broker] Remove
superfluous SequenceNumber"
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 01beffa6b -> c94d60e31
Revert "QPID-7748: [Java Broker] Remove superfluous SequenceNumber"
This reverts commit 5017eb060c30a1b6d265defa17ad7b798c2f2f12.
The difference in the compareTo method between SequenceNumber and
UnsignedInteger prohibits the removal of the former.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/c94d60e3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/c94d60e3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/c94d60e3
Branch: refs/heads/master
Commit: c94d60e31817a68413905b398f03a4d4cc9adec5
Parents: 01beffa
Author: Lorenz Quack <lq...@apache.org>
Authored: Thu May 11 14:22:43 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Thu May 11 14:22:43 2017 +0100
----------------------------------------------------------------------
.../server/protocol/v1_0/SequenceNumber.java | 115 +++++++++++++++++++
.../qpid/server/protocol/v1_0/Session_1_0.java | 39 ++++---
.../protocol/v1_0/type/UnsignedInteger.java | 10 --
3 files changed, 138 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c94d60e3/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
new file mode 100644
index 0000000..6abe4e3
--- /dev/null
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
@@ -0,0 +1,115 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.v1_0;
+
+public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable
+{
+ private int _seqNo;
+
+ public SequenceNumber(int seqNo)
+ {
+ _seqNo = seqNo;
+ }
+
+ public SequenceNumber incr()
+ {
+ _seqNo++;
+ return this;
+ }
+
+ public SequenceNumber decr()
+ {
+ _seqNo--;
+ return this;
+ }
+
+ public static SequenceNumber add(SequenceNumber a, int i)
+ {
+ return a.clone().add(i);
+ }
+
+ public static SequenceNumber subtract(SequenceNumber a, int i)
+ {
+ return a.clone().add(-i);
+ }
+
+ private SequenceNumber add(int i)
+ {
+ _seqNo+=i;
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ SequenceNumber that = (SequenceNumber) o;
+
+ if (_seqNo != that._seqNo)
+ {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return _seqNo;
+ }
+
+ public int compareTo(SequenceNumber o)
+ {
+ return _seqNo - o._seqNo;
+ }
+
+ @Override
+ public SequenceNumber clone()
+ {
+ return new SequenceNumber(_seqNo);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "SN{" + _seqNo + '}';
+ }
+
+ public int intValue()
+ {
+ return _seqNo;
+ }
+
+ public long longValue()
+ {
+ return ((long) _seqNo) & 0xFFFFFFFFL;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c94d60e3/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index e570fa6..c203e50 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -146,14 +146,14 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
private int _nextOutgoingDeliveryId;
- private final UnsignedInteger _initialOutgoingId = UnsignedInteger.ZERO;
- private volatile UnsignedInteger _nextIncomingId;
+ private UnsignedInteger _initialOutgoingId = UnsignedInteger.ZERO;
+ private SequenceNumber _nextIncomingId;
private final int _incomingWindow;
- private volatile UnsignedInteger _nextOutgoingId = _initialOutgoingId;
- private final int _outgoingWindow = DEFAULT_SESSION_BUFFER_SIZE;
- private volatile UnsignedInteger _remoteIncomingWindow;
- private volatile UnsignedInteger _remoteOutgoingWindow = UnsignedInteger.ZERO;
- private volatile UnsignedInteger _lastSentIncomingLimit;
+ private SequenceNumber _nextOutgoingId = new SequenceNumber(_initialOutgoingId.intValue());
+ private int _outgoingWindow = DEFAULT_SESSION_BUFFER_SIZE;
+ private UnsignedInteger _remoteIncomingWindow;
+ private UnsignedInteger _remoteOutgoingWindow = UnsignedInteger.ZERO;
+ private UnsignedInteger _lastSentIncomingLimit;
private LinkedHashMap<UnsignedInteger,Delivery> _outgoingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
private LinkedHashMap<UnsignedInteger,Delivery> _incomingUnsettled = new LinkedHashMap<>(DEFAULT_SESSION_BUFFER_SIZE);
@@ -180,7 +180,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
_sendingChannel = sendingChannelId;
_receivingChannel = receivingChannelId;
_sessionState = SessionState.ACTIVE;
- _nextIncomingId = begin.getNextOutgoingId();
+ _nextIncomingId = new SequenceNumber(begin.getNextOutgoingId().intValue());
_connection = connection;
_primaryDomain = getPrimaryDomain();
_incomingWindow = incomingWindow;
@@ -250,11 +250,12 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
{
final LinkedHashMap<UnsignedInteger, Delivery> unsettled =
role == Role.RECEIVER ? _incomingUnsettled : _outgoingUnsettled;
- UnsignedInteger pos = first;
- while (pos.compareTo(last) <= 0)
+ SequenceNumber pos = new SequenceNumber(first.intValue());
+ SequenceNumber end = new SequenceNumber(last.intValue());
+ while (pos.compareTo(end) <= 0)
{
- unsettled.remove(pos);
- pos = pos.increment();
+ unsettled.remove(new UnsignedInteger(pos.intValue()));
+ pos.incr();
}
}
@@ -276,7 +277,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
public void sendTransfer(final Transfer xfr, final SendingLinkEndpoint endpoint, final boolean newDelivery)
{
- _nextOutgoingId = _nextOutgoingId.increment();
+ _nextOutgoingId.incr();
UnsignedInteger deliveryId;
final boolean settled = Boolean.TRUE.equals(xfr.getSettled());
if (newDelivery)
@@ -418,7 +419,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
long limit = (nextOutgoingId.longValue() + flow.getIncomingWindow().longValue());
_remoteIncomingWindow = UnsignedInteger.valueOf(limit - _nextOutgoingId.longValue());
- _nextIncomingId = new UnsignedInteger(flow.getNextOutgoingId().intValue());
+ _nextIncomingId = new SequenceNumber(flow.getNextOutgoingId().intValue());
_remoteOutgoingWindow = flow.getOutgoingWindow();
if (endpoint != null)
@@ -446,6 +447,12 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
}
}
+ public void setNextIncomingId(final UnsignedInteger nextIncomingId)
+ {
+ _nextIncomingId = new SequenceNumber(nextIncomingId.intValue());
+
+ }
+
public void receiveDisposition(final Disposition disposition)
{
Role dispositionRole = disposition.getRole();
@@ -574,8 +581,8 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
public void receiveTransfer(final Transfer transfer)
{
- _nextIncomingId = _nextIncomingId.increment();
- _remoteOutgoingWindow = _remoteOutgoingWindow.decrement();
+ _nextIncomingId.incr();
+ _remoteOutgoingWindow = _remoteOutgoingWindow.subtract(UnsignedInteger.ONE);
UnsignedInteger inputHandle = transfer.getHandle();
LinkEndpoint<? extends BaseSource, ? extends BaseTarget> linkEndpoint = _inputHandleToEndpoint.get(inputHandle);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/c94d60e3/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedInteger.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedInteger.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedInteger.java
index 1a75a07..81b9e98 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedInteger.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/UnsignedInteger.java
@@ -133,16 +133,6 @@ public final class UnsignedInteger extends Number implements Comparable<Unsigned
return UnsignedInteger.valueOf(val);
}
- public UnsignedInteger increment()
- {
- return add(UnsignedInteger.ONE);
- }
-
- public UnsignedInteger decrement()
- {
- return subtract(UnsignedInteger.ONE);
- }
-
public static UnsignedInteger valueOf(final String value)
{
long longVal = Long.parseLong(value);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org