You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/12/20 18:50:40 UTC
svn commit: r1775336 - in
/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0:
Delivery.java ReceivingLinkEndpoint.java Session_1_0.java
Author: kwall
Date: Tue Dec 20 18:50:40 2016
New Revision: 1775336
URL: http://svn.apache.org/viewvc?rev=1775336&view=rev
Log:
QPID-7592: [Java Broker] Remove settled entries from the outgoing unsettled map.
Also removed dead code from ReceivingLinkEndpoint
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java?rev=1775336&r1=1775335&r2=1775336&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java Tue Dec 20 18:50:40 2016
@@ -25,11 +25,12 @@ import org.apache.qpid.server.protocol.v
public class Delivery
{
- private boolean _complete;
- private boolean _settled;
private final UnsignedInteger _deliveryId;
private final Binary _deliveryTag;
private final LinkEndpoint _linkEndpoint;
+ private boolean _complete;
+ private boolean _settled;
+ private int _numberOfTransfers = 0;
public Delivery(Transfer transfer, final LinkEndpoint endpoint)
{
@@ -60,8 +61,9 @@ public class Delivery
_settled = settled;
}
- public void addTransfer(Transfer transfer)
+ public final void addTransfer(Transfer transfer)
{
+ _numberOfTransfers++;
if(Boolean.TRUE.equals(transfer.getAborted()) || !Boolean.TRUE.equals(transfer.getMore()))
{
setComplete(true);
@@ -86,4 +88,9 @@ public class Delivery
{
return _deliveryTag;
}
+
+ public int getNumberOfTransfers()
+ {
+ return _numberOfTransfers;
+ }
}
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?rev=1775336&r1=1775335&r2=1775336&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Tue Dec 20 18:50:40 2016
@@ -21,14 +21,9 @@
package org.apache.qpid.server.protocol.v1_0;
-import java.util.ArrayList;
import java.util.Collections;
-import java.util.Iterator;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
-import java.util.SortedMap;
-import java.util.TreeMap;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
@@ -90,7 +85,6 @@ public class ReceivingLinkEndpoint exten
private Map<Binary, TransientState> _unsettledIds = new LinkedHashMap<Binary, TransientState>();
private boolean _creditWindow;
private boolean _remoteDrain;
- private UnsignedInteger _remoteTransferCount;
private UnsignedInteger _drainLimit;
@@ -262,14 +256,6 @@ public class ReceivingLinkEndpoint exten
sendFlowConditional();
}
- public void drain()
- {
- setDrain(true);
- _creditWindow = false;
- _drainLimit = getDeliveryCount().add(getLinkCredit());
- sendFlowWithEcho();
- }
-
@Override
public void receiveDeliveryState(final Delivery unsettled, final DeliveryState state, final Boolean settled)
{
@@ -284,102 +270,9 @@ public class ReceivingLinkEndpoint exten
}
}
- public void requestTransactionalSend(Object txnId)
- {
- setDrain(true);
- _creditWindow = false;
- setTransactionId(txnId);
- sendFlow();
- }
-
- private void sendFlow(final Object transactionId)
- {
- sendFlow();
- }
-
-
- public void clearDrain()
- {
- setDrain(false);
- sendFlow();
- }
-
- public void updateAllDisposition(Binary deliveryTag, DeliveryState deliveryState, boolean settled)
- {
- if(!_unsettledIds.isEmpty())
- {
- Binary firstTag = _unsettledIds.keySet().iterator().next();
- Binary lastTag = deliveryTag;
- updateDispositions(firstTag, lastTag, deliveryState, settled);
- }
- }
-
- private void updateDispositions(Binary firstTag, Binary lastTag, DeliveryState state, boolean settled)
- {
- SortedMap<UnsignedInteger, UnsignedInteger> ranges = new TreeMap<UnsignedInteger, UnsignedInteger>();
-
- Iterator<Binary> iter = _unsettledIds.keySet().iterator();
- List<Binary> tagsToUpdate = new ArrayList<Binary>();
- Binary tag = null;
- while (iter.hasNext() && !(tag = iter.next()).equals(firstTag)) ;
- if (firstTag.equals(tag))
- {
- tagsToUpdate.add(tag);
-
- UnsignedInteger deliveryId = _unsettledIds.get(firstTag).getDeliveryId();
-
- UnsignedInteger first = deliveryId;
- UnsignedInteger last = first;
-
- if (!firstTag.equals(lastTag))
- {
- while (iter.hasNext())
- {
- tag = iter.next();
- tagsToUpdate.add(tag);
- deliveryId = _unsettledIds.get(tag).getDeliveryId();
-
- if (deliveryId.equals(last.add(UnsignedInteger.ONE)))
- {
- last = deliveryId;
- }
- else
- {
- ranges.put(first, last);
- first = last = deliveryId;
- }
-
- if (tag.equals(lastTag))
- {
- break;
- }
- }
- }
- ranges.put(first, last);
- }
-
- if (settled)
- {
-
- for (Binary deliveryTag : tagsToUpdate)
- {
- if (settled(deliveryTag) && _creditWindow)
- {
- setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
- }
- }
- sendFlowConditional();
- }
-
-
- for (Map.Entry<UnsignedInteger, UnsignedInteger> range : ranges.entrySet())
- {
- getSession().updateDisposition(getRole(), range.getKey(), range.getValue(), state, settled);
- }
- }
@Override
public void settle(Binary deliveryTag)
Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1775336&r1=1775335&r2=1775336&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Tue Dec 20 18:50:40 2016
@@ -142,7 +142,7 @@ public class Session_1_0 implements AMQS
private final Set<ConsumerTarget_1_0> _consumersWithPendingWork = new ScheduledConsumerTargetSet<>();
private Iterator<ConsumerTarget_1_0> _processPendingIterator;
- private SessionState _state ;
+ private SessionState _state;
private final Map<String, SendingLinkEndpoint> _sendingLinkMap = new HashMap<>();
private final Map<String, ReceivingLinkEndpoint> _receivingLinkMap = new HashMap<>();
@@ -315,23 +315,16 @@ public class Session_1_0 implements AMQS
disposition.setState(state);
-
- if(settled)
+ if (settled)
{
- if(role == Role.RECEIVER)
+ final LinkedHashMap<UnsignedInteger, Delivery> unsettled =
+ role == Role.RECEIVER ? _incomingUnsettled : _outgoingUnsettled;
+ SequenceNumber pos = new SequenceNumber(first.intValue());
+ SequenceNumber end = new SequenceNumber(last.intValue());
+ while (pos.compareTo(end) <= 0)
{
- SequenceNumber pos = new SequenceNumber(first.intValue());
- SequenceNumber end = new SequenceNumber(last.intValue());
- while(pos.compareTo(end)<=0)
- {
- Delivery d = _incomingUnsettled.remove(new UnsignedInteger(pos.intValue()));
-
-/*
- _availableIncomingCredit += d.getTransfers().size();
-*/
-
- pos.incr();
- }
+ unsettled.remove(new UnsignedInteger(pos.intValue()));
+ pos.incr();
}
}
@@ -355,35 +348,39 @@ public class Session_1_0 implements AMQS
{
_nextOutgoingTransferId.incr();
UnsignedInteger deliveryId;
- if(newDelivery)
+ final boolean settled = Boolean.TRUE.equals(xfr.getSettled());
+ if (newDelivery)
{
deliveryId = UnsignedInteger.valueOf(_nextOutgoingDeliveryId++);
endpoint.setLastDeliveryId(deliveryId);
+ if (!settled)
+ {
+ final Delivery delivery = new Delivery(xfr, endpoint);
+ _outgoingUnsettled.put(deliveryId, delivery);
+ _outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
+ endpoint.addUnsettled(delivery);
+ }
}
else
{
deliveryId = endpoint.getLastDeliveryId();
- }
- xfr.setDeliveryId(deliveryId);
-
- if(!Boolean.TRUE.equals(xfr.getSettled()))
- {
- Delivery delivery;
- if((delivery = _outgoingUnsettled.get(deliveryId))== null)
+ final Delivery delivery = _outgoingUnsettled.get(deliveryId);
+ if (delivery != null)
{
- delivery = new Delivery(xfr, endpoint);
- _outgoingUnsettled.put(deliveryId, delivery);
-
- }
- else
- {
- delivery.addTransfer(xfr);
+ if (!settled)
+ {
+ delivery.addTransfer(xfr);
+ _outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
+ }
+ else
+ {
+ _outgoingSessionCredit = _outgoingSessionCredit.add(new UnsignedInteger(delivery.getNumberOfTransfers()));
+ endpoint.settle(delivery.getDeliveryTag());
+ _outgoingUnsettled.remove(deliveryId);
+ }
}
- _outgoingSessionCredit = _outgoingSessionCredit.subtract(UnsignedInteger.ONE);
- endpoint.addUnsettled(delivery);
-
}
-
+ xfr.setDeliveryId(deliveryId);
try
{
@@ -395,31 +392,29 @@ public class Session_1_0 implements AMQS
{
// TODO - should make this iterative and not recursive
+ Transfer secondTransfer = new Transfer();
- Transfer secondTransfer = new Transfer();
-
- secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
- secondTransfer.setHandle(xfr.getHandle());
- secondTransfer.setSettled(xfr.getSettled());
- secondTransfer.setState(xfr.getState());
- secondTransfer.setMessageFormat(xfr.getMessageFormat());
- secondTransfer.setPayload(payload);
-
- sendTransfer(secondTransfer, endpoint, false);
+ secondTransfer.setDeliveryTag(xfr.getDeliveryTag());
+ secondTransfer.setHandle(xfr.getHandle());
+ secondTransfer.setSettled(xfr.getSettled());
+ secondTransfer.setState(xfr.getState());
+ secondTransfer.setMessageFormat(xfr.getMessageFormat());
+ secondTransfer.setPayload(payload);
- secondTransfer.dispose();
+ sendTransfer(secondTransfer, endpoint, false);
+ secondTransfer.dispose();
}
- if(payload != null)
+ if (payload != null)
{
- for(QpidByteBuffer buf : payload)
+ for (QpidByteBuffer buf : payload)
{
buf.dispose();
}
}
}
- catch(OversizeFrameException e)
+ catch (OversizeFrameException e)
{
throw new ConnectionScopedRuntimeException(e);
}
@@ -453,10 +448,7 @@ public class Session_1_0 implements AMQS
reply.setError(error);
_connection.sendEnd(sendChannel, reply, true);
break;
-
-
}
-
}
public UnsignedInteger getNextOutgoingId()
@@ -545,8 +537,12 @@ public class Session_1_0 implements AMQS
if(delivery != null)
{
delivery.getLinkEndpoint().receiveDeliveryState(delivery,
- disposition.getState(),
- disposition.getSettled());
+ disposition.getState(),
+ disposition.getSettled());
+ if (Boolean.TRUE.equals(disposition.getSettled()))
+ {
+ unsettledTransfers.remove(deliveryId);
+ }
}
deliveryId = deliveryId.add(UnsignedInteger.ONE);
}
@@ -1179,7 +1175,7 @@ public class Session_1_0 implements AMQS
LifetimePolicy lifetimePolicy = properties == null
? null
: (LifetimePolicy) properties.get(LIFETIME_POLICY);
- Map<String,Object> attributes = new HashMap<String,Object>();
+ Map<String,Object> attributes = new HashMap<>();
attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, queueName);
attributes.put(Queue.DURABLE, false);
@@ -1262,7 +1258,9 @@ public class Session_1_0 implements AMQS
byte[] data = txnId.getArray();
if(data.length > 4)
+ {
throw new IllegalArgumentException();
+ }
int id = 0;
for(int i = 0; i < data.length; i++)
@@ -1283,7 +1281,6 @@ public class Session_1_0 implements AMQS
data[1] = (byte) ((txnId & 0xff0000) >> 16);
data[0] = (byte) ((txnId & 0xff000000) >> 24);
return new Binary(data);
-
}
@Override
@@ -1394,9 +1391,9 @@ public class Session_1_0 implements AMQS
private boolean isQueueDestinationForLink(final Queue<?> queue, final ReceivingDestination recvDest)
{
- return (recvDest instanceof NodeReceivingDestination && queue == ((NodeReceivingDestination) recvDest).getDestination())
- || recvDest instanceof QueueDestination && queue == ((QueueDestination) recvDest).getQueue();
-
+ return (recvDest instanceof NodeReceivingDestination
+ && queue == ((NodeReceivingDestination) recvDest).getDestination())
+ || recvDest instanceof QueueDestination && queue == ((QueueDestination) recvDest).getQueue();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org