You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/06/06 16:39:33 UTC
[1/3] qpid-broker-j git commit: QPID-7739: [Java Broker] [AMQP 1.0]
Change type of field AbstractLinkEndpoint#_deliveryCount to SequenceNumber
Repository: qpid-broker-j
Updated Branches:
refs/heads/master 5a360f8ba -> 57e647cf9
QPID-7739: [Java Broker] [AMQP 1.0] Change type of field AbstractLinkEndpoint#_deliveryCount to SequenceNumber
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/268338ab
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/268338ab
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/268338ab
Branch: refs/heads/master
Commit: 268338aba0764f7e554573417699d3b2fc9ffa5f
Parents: 5a360f8
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Jun 6 12:04:39 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Jun 6 17:07:47 2017 +0100
----------------------------------------------------------------------
.../protocol/v1_0/AbstractLinkEndpoint.java | 16 +++++------
.../v1_0/AbstractReceivingLinkEndpoint.java | 4 +--
.../protocol/v1_0/SendingLinkEndpoint.java | 21 +++++++-------
.../server/protocol/v1_0/SequenceNumber.java | 29 +++++++-------------
.../v1_0/StandardReceivingLinkEndpoint.java | 2 +-
.../TxnCoordinatorReceivingLinkEndpoint.java | 2 +-
6 files changed, 33 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/268338ab/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
index 0c99258..87dc899 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractLinkEndpoint.java
@@ -60,7 +60,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
private volatile boolean _stopped;
private volatile boolean _stoppedUpdated;
private Symbol[] _capabilities;
- private UnsignedInteger _deliveryCount;
+ private SequenceNumber _deliveryCount;
private UnsignedInteger _linkCredit;
private UnsignedInteger _available;
private Boolean _drain;
@@ -174,7 +174,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
return getSession().getConnection().getAddressSpace();
}
- public void setDeliveryCount(final UnsignedInteger deliveryCount)
+ protected void setDeliveryCount(final SequenceNumber deliveryCount)
{
_deliveryCount = deliveryCount;
}
@@ -194,7 +194,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
_drain = drain;
}
- public UnsignedInteger getDeliveryCount()
+ protected SequenceNumber getDeliveryCount()
{
return _deliveryCount;
}
@@ -303,7 +303,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
if (getRole() == Role.SENDER)
{
- attachToSend.setInitialDeliveryCount(_deliveryCount);
+ attachToSend.setInitialDeliveryCount(_deliveryCount.unsignedIntegerValue());
}
switch (_state)
@@ -402,7 +402,7 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
}
else
{
- UnsignedInteger clientsCredit = _lastSentCreditLimit.subtract(_deliveryCount);
+ UnsignedInteger clientsCredit = _lastSentCreditLimit.subtract(_deliveryCount.unsignedIntegerValue());
// client has used up over half their credit allowance ?
boolean sendFlow = _linkCredit.subtract(clientsCredit).compareTo(clientsCredit) >= 0;
@@ -445,18 +445,18 @@ public abstract class AbstractLinkEndpoint<S extends BaseSource, T extends BaseT
if(_state == State.ATTACHED || _state == State.ATTACH_SENT)
{
Flow flow = new Flow();
- flow.setDeliveryCount(_deliveryCount);
+ flow.setDeliveryCount(_deliveryCount.unsignedIntegerValue());
flow.setEcho(echo);
if(_stopped)
{
flow.setLinkCredit(UnsignedInteger.ZERO);
flow.setDrain(true);
- _lastSentCreditLimit = _deliveryCount;
+ _lastSentCreditLimit = _deliveryCount.unsignedIntegerValue();
}
else
{
flow.setLinkCredit(_linkCredit);
- _lastSentCreditLimit = _linkCredit.add(_deliveryCount);
+ _lastSentCreditLimit = _linkCredit.add(_deliveryCount.unsignedIntegerValue());
flow.setDrain(_drain);
}
flow.setAvailable(_available);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/268338ab/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index abc82a9..22c8b3f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -129,7 +129,7 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
}
_unsettledIds.put(deliveryTag, transientState);
setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
- setDeliveryCount(getDeliveryCount().add(UnsignedInteger.ONE));
+ getDeliveryCount().incr();
}
else
{
@@ -159,7 +159,7 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
@Override public void receiveFlow(final Flow flow)
{
setAvailable(flow.getAvailable());
- setDeliveryCount(flow.getDeliveryCount());
+ setDeliveryCount(new SequenceNumber(flow.getDeliveryCount().intValue()));
}
public boolean settled(final Binary deliveryTag)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/268338ab/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index b9f1e7b..1f346afd 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -100,7 +100,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
public SendingLinkEndpoint(final Session_1_0 session, final LinkImpl<Source, Target> link)
{
super(session, link);
- setDeliveryCount(UnsignedInteger.valueOf(0));
+ setDeliveryCount(new SequenceNumber(0));
setAvailable(UnsignedInteger.valueOf(0));
setCapabilities(Arrays.asList(AMQPConnection_1_0.SHARED_SUBSCRIPTIONS));
}
@@ -390,7 +390,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE));
}
- setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() + 1)));
+ getDeliveryCount().incr();
xfr.setHandle(getLocalHandle());
@@ -418,7 +418,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
{
if (_draining)
{
- setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
+ getDeliveryCount().add(getLinkCredit().intValue());
setLinkCredit(UnsignedInteger.ZERO);
sendFlow();
_draining = false;
@@ -433,8 +433,8 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
@Override
public void receiveFlow(final Flow flow)
{
- UnsignedInteger t = flow.getDeliveryCount();
- UnsignedInteger c = flow.getLinkCredit();
+ UnsignedInteger receiverDeliveryCount = flow.getDeliveryCount();
+ UnsignedInteger receiverLinkCredit = flow.getLinkCredit();
setDrain(flow.getDrain());
Map options;
@@ -443,20 +443,21 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
_transactionId = (Binary) options.get(Symbol.valueOf("txn-id"));
}
- if(t == null)
+ if(receiverDeliveryCount == null)
{
- setLinkCredit(c);
+ setLinkCredit(receiverLinkCredit);
}
else
{
- UnsignedInteger limit = t.add(c);
- if(limit.compareTo(getDeliveryCount())<=0)
+ // 2.6.7 Flow Control : link_credit_snd := delivery_count_rcv + link_credit_rcv - delivery_count_snd
+ UnsignedInteger limit = receiverDeliveryCount.add(receiverLinkCredit);
+ if(limit.compareTo(getDeliveryCount().unsignedIntegerValue())<=0)
{
setLinkCredit(UnsignedInteger.valueOf(0));
}
else
{
- setLinkCredit(limit.subtract(getDeliveryCount()));
+ setLinkCredit(limit.subtract(getDeliveryCount().unsignedIntegerValue()));
}
}
flowStateChanged();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/268338ab/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
index 6abe4e3..8bb01ce 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java
@@ -21,9 +21,11 @@
package org.apache.qpid.server.protocol.v1_0;
-public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable
+import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger;
+
+public class SequenceNumber implements Comparable<SequenceNumber>
{
- private int _seqNo;
+ private volatile int _seqNo;
public SequenceNumber(int seqNo)
{
@@ -42,17 +44,7 @@ public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable
return this;
}
- public static SequenceNumber add(SequenceNumber a, int i)
- {
- return a.clone().add(i);
- }
-
- public static SequenceNumber subtract(SequenceNumber a, int i)
- {
- return a.clone().add(-i);
- }
-
- private SequenceNumber add(int i)
+ public SequenceNumber add(int i)
{
_seqNo+=i;
return this;
@@ -92,12 +84,6 @@ public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable
}
@Override
- public SequenceNumber clone()
- {
- return new SequenceNumber(_seqNo);
- }
-
- @Override
public String toString()
{
return "SN{" + _seqNo + '}';
@@ -112,4 +98,9 @@ public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable
{
return ((long) _seqNo) & 0xFFFFFFFFL;
}
+
+ public UnsignedInteger unsignedIntegerValue()
+ {
+ return UnsignedInteger.valueOf(_seqNo);
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/268338ab/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index 92f0eb4..7d837d5 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -467,7 +467,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
Target target = new Target();
Target attachTarget = (Target) attach.getTarget();
- setDeliveryCount(attach.getInitialDeliveryCount());
+ setDeliveryCount(new SequenceNumber(attach.getInitialDeliveryCount().intValue()));
target.setAddress(attachTarget.getAddress());
target.setDynamic(attachTarget.getDynamic());
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/268338ab/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
index 9bb3406..d6a7be6 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/TxnCoordinatorReceivingLinkEndpoint.java
@@ -260,7 +260,7 @@ public class TxnCoordinatorReceivingLinkEndpoint extends AbstractReceivingLinkEn
public void attachReceived(final Attach attach) throws AmqpErrorException
{
super.attachReceived(attach);
- setDeliveryCount(attach.getInitialDeliveryCount());
+ setDeliveryCount(new SequenceNumber(attach.getInitialDeliveryCount().intValue()));
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/3] qpid-broker-j git commit: QPID-7739: [Java Broker] [AMQP 1.0]
Change type of field Session_1_0#_remoteIncomingWindow to long and decrement
it on every transfer
Posted by or...@apache.org.
QPID-7739: [Java Broker] [AMQP 1.0] Change type of field Session_1_0#_remoteIncomingWindow to long and decrement it on every transfer
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/57e647cf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/57e647cf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/57e647cf
Branch: refs/heads/master
Commit: 57e647cf96c6a810349124f4099228c448d5c3c3
Parents: ddc1538
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Jun 6 17:03:12 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Jun 6 17:09:00 2017 +0100
----------------------------------------------------------------------
.../qpid/server/protocol/v1_0/Session_1_0.java | 85 ++++++++++----------
1 file changed, 42 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/57e647cf/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index fdcbefa..07a228e 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -155,7 +155,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
private final UnsignedInteger _incomingWindow;
private SequenceNumber _nextOutgoingId = new SequenceNumber(_initialOutgoingId.intValue());
private final UnsignedInteger _outgoingWindow = UnsignedInteger.valueOf(DEFAULT_SESSION_BUFFER_SIZE);
- private UnsignedInteger _remoteIncomingWindow;
+ private volatile long _remoteIncomingWindow;
private UnsignedInteger _remoteOutgoingWindow = UnsignedInteger.ZERO;
private UnsignedInteger _lastSentIncomingLimit;
@@ -269,7 +269,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
public boolean hasCreditToSend()
{
- boolean b = _remoteIncomingWindow != null && _remoteIncomingWindow.intValue() > 0;
+ boolean b = _remoteIncomingWindow > 0;
boolean b1 = getOutgoingWindow() != null && getOutgoingWindow().compareTo(UnsignedInteger.ZERO) > 0;
return b && b1;
}
@@ -292,7 +292,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
{
final Delivery delivery = new Delivery(xfr, endpoint);
_outgoingUnsettled.put(deliveryId, delivery);
- _remoteIncomingWindow = _remoteIncomingWindow.subtract(UnsignedInteger.ONE);
endpoint.addUnsettled(delivery);
}
}
@@ -305,18 +304,16 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
if (!settled)
{
delivery.addTransfer(xfr);
- _remoteIncomingWindow = _remoteIncomingWindow.subtract(UnsignedInteger.ONE);
}
else
{
- _remoteIncomingWindow = _remoteIncomingWindow.add(new UnsignedInteger(delivery.getNumberOfTransfers()));
endpoint.settle(delivery.getDeliveryTag());
_outgoingUnsettled.remove(deliveryId);
}
}
}
xfr.setDeliveryId(deliveryId);
-
+ _remoteIncomingWindow--;
try
{
List<QpidByteBuffer> payload = xfr.getPayload();
@@ -414,49 +411,56 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
public void receiveFlow(final Flow flow)
{
- UnsignedInteger handle = flow.getHandle();
- final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint =
- handle == null ? null : _inputHandleToEndpoint.get(handle);
-
- final UnsignedInteger nextOutgoingId =
- flow.getNextIncomingId() == null ? _initialOutgoingId : flow.getNextIncomingId();
- long limit = (nextOutgoingId.longValue() + flow.getIncomingWindow().longValue());
- _remoteIncomingWindow = UnsignedInteger.valueOf(limit - _nextOutgoingId.longValue());
-
- _nextIncomingId = new SequenceNumber(flow.getNextOutgoingId().intValue());
- _remoteOutgoingWindow = flow.getOutgoingWindow();
-
- if (endpoint != null)
+ final SequenceNumber flowNextIncomingId = new SequenceNumber(flow.getNextIncomingId() == null
+ ? _initialOutgoingId.intValue()
+ : flow.getNextIncomingId().intValue());
+ if (flowNextIncomingId.compareTo(_nextOutgoingId) > 0)
{
- endpoint.receiveFlow(flow);
-
- if (Boolean.TRUE.equals(flow.getEcho()))
- {
- endpoint.sendFlow();
- }
+ final End end = new End();
+ end.setError(new Error(SessionError.WINDOW_VIOLATION,
+ String.format("Next incoming id '%d' exceeds next outgoing id '%d'",
+ flowNextIncomingId,
+ _nextOutgoingId)));
+ end(end);
}
else
{
- final Collection<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> allLinkEndpoints =
- _inputHandleToEndpoint.values();
- for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> le : allLinkEndpoints)
+ _remoteIncomingWindow = flowNextIncomingId.longValue() + flow.getIncomingWindow().longValue()
+ - _nextOutgoingId.longValue();
+
+ _nextIncomingId = new SequenceNumber(flow.getNextOutgoingId().intValue());
+ _remoteOutgoingWindow = flow.getOutgoingWindow();
+
+ UnsignedInteger handle = flow.getHandle();
+ final LinkEndpoint<? extends BaseSource, ? extends BaseTarget> endpoint =
+ handle == null ? null : _inputHandleToEndpoint.get(handle);
+
+ if (endpoint != null)
{
- le.flowStateChanged();
- }
+ endpoint.receiveFlow(flow);
- if (Boolean.TRUE.equals(flow.getEcho()))
+ if (Boolean.TRUE.equals(flow.getEcho()))
+ {
+ endpoint.sendFlow();
+ }
+ }
+ else
{
- sendFlow();
+ final Collection<LinkEndpoint<? extends BaseSource, ? extends BaseTarget>> allLinkEndpoints =
+ _inputHandleToEndpoint.values();
+ for (LinkEndpoint<? extends BaseSource, ? extends BaseTarget> le : allLinkEndpoints)
+ {
+ le.flowStateChanged();
+ }
+
+ if (Boolean.TRUE.equals(flow.getEcho()))
+ {
+ sendFlow();
+ }
}
}
}
- public void setNextIncomingId(final UnsignedInteger nextIncomingId)
- {
- _nextIncomingId = new SequenceNumber(nextIncomingId.intValue());
-
- }
-
public void receiveDisposition(final Disposition disposition)
{
Role dispositionRole = disposition.getRole();
@@ -532,11 +536,6 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
send(flow);
}
- public void setRemoteIncomingWindow(final UnsignedInteger remoteIncomingWindow)
- {
- _remoteIncomingWindow = remoteIncomingWindow;
- }
-
public void receiveDetach(final Detach detach)
{
UnsignedInteger handle = detach.getHandle();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/3] qpid-broker-j git commit: QPID-7739: [Java Broker] [AMQP 1.0]
Change type of fields Session_1_0#_incomingWindow and
Session_1_0#_outgoingWindow to UnsignedInteger
Posted by or...@apache.org.
QPID-7739: [Java Broker] [AMQP 1.0] Change type of fields Session_1_0#_incomingWindow and Session_1_0#_outgoingWindow to UnsignedInteger
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/ddc15384
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/ddc15384
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/ddc15384
Branch: refs/heads/master
Commit: ddc15384b464aa9a23ec51dfabe9a1e48ce35c37
Parents: 268338a
Author: Alex Rudyy <or...@apache.org>
Authored: Tue Jun 6 13:50:29 2017 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Tue Jun 6 17:08:34 2017 +0100
----------------------------------------------------------------------
.../protocol/v1_0/AMQPConnection_1_0Impl.java | 2 +-
.../qpid/server/protocol/v1_0/Session_1_0.java | 23 ++++++++++----------
2 files changed, 12 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ddc15384/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
index bcae0f6..977dc8f 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java
@@ -616,7 +616,7 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio
begin,
sendingChannelId,
receivingChannelId,
- getContextValue(Integer.class, AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE));
+ getContextValue(Long.class, AMQPConnection_1_0.CONNECTION_SESSION_CREDIT_WINDOW_SIZE));
session.create();
_receivingSessions[receivingChannelId] = session;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/ddc15384/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 3ec200b..fdcbefa 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -152,9 +152,9 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
private UnsignedInteger _initialOutgoingId = UnsignedInteger.ZERO;
private SequenceNumber _nextIncomingId;
- private final int _incomingWindow;
+ private final UnsignedInteger _incomingWindow;
private SequenceNumber _nextOutgoingId = new SequenceNumber(_initialOutgoingId.intValue());
- private int _outgoingWindow = DEFAULT_SESSION_BUFFER_SIZE;
+ private final UnsignedInteger _outgoingWindow = UnsignedInteger.valueOf(DEFAULT_SESSION_BUFFER_SIZE);
private UnsignedInteger _remoteIncomingWindow;
private UnsignedInteger _remoteOutgoingWindow = UnsignedInteger.ZERO;
private UnsignedInteger _lastSentIncomingLimit;
@@ -178,7 +178,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
Begin begin,
int sendingChannelId,
int receivingChannelId,
- int incomingWindow)
+ long incomingWindow)
{
super(connection, sendingChannelId);
_sendingChannel = sendingChannelId;
@@ -187,7 +187,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
_nextIncomingId = new SequenceNumber(begin.getNextOutgoingId().intValue());
_connection = connection;
_primaryDomain = getPrimaryDomain();
- _incomingWindow = incomingWindow;
+ _incomingWindow = UnsignedInteger.valueOf(incomingWindow);
AccessController.doPrivileged((new PrivilegedAction<Object>()
{
@@ -399,7 +399,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
// TODO - we should use a better metric here, and/or manage session credit across the whole connection
// send a flow if the window is at least half used up
- if (UnsignedInteger.valueOf(_incomingWindow).subtract(clientsCredit).compareTo(clientsCredit) >= 0)
+ if (_incomingWindow.subtract(clientsCredit).compareTo(clientsCredit) >= 0)
{
sendFlow();
}
@@ -409,7 +409,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
public UnsignedInteger getOutgoingWindow()
{
- return UnsignedInteger.valueOf(_outgoingWindow);
+ return _outgoingWindow;
}
public void receiveFlow(final Flow flow)
@@ -522,14 +522,13 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
{
if(_nextIncomingId != null)
{
- final long nextIncomingId = _nextIncomingId.longValue();
- flow.setNextIncomingId(UnsignedInteger.valueOf(nextIncomingId));
- _lastSentIncomingLimit = UnsignedInteger.valueOf(nextIncomingId + _incomingWindow);
+ flow.setNextIncomingId(_nextIncomingId.unsignedIntegerValue());
+ _lastSentIncomingLimit = _incomingWindow.add(_nextIncomingId.unsignedIntegerValue());
}
- flow.setIncomingWindow(UnsignedInteger.valueOf(_incomingWindow));
+ flow.setIncomingWindow(_incomingWindow);
flow.setNextOutgoingId(UnsignedInteger.valueOf(_nextOutgoingId.intValue()));
- flow.setOutgoingWindow(UnsignedInteger.valueOf(_outgoingWindow));
+ flow.setOutgoingWindow(_outgoingWindow);
send(flow);
}
@@ -679,7 +678,7 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
UnsignedInteger getIncomingWindow()
{
- return UnsignedInteger.valueOf(_incomingWindow);
+ return _incomingWindow;
}
AccessControlContext getAccessControllerContext()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org