You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/15 16:18:17 UTC
qpid-broker-j git commit: QPID-8032: [Broker-J][AMQP 1.0] Coalesce
the dispositions for non-transactional transfers
Repository: qpid-broker-j
Updated Branches:
refs/heads/master bce2424df -> 3b92c3063
QPID-8032: [Broker-J][AMQP 1.0] Coalesce the dispositions for non-transactional transfers
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/3b92c306
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/3b92c306
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/3b92c306
Branch: refs/heads/master
Commit: 3b92c3063ee213a9e4c4527acd139d40a4237d56
Parents: bce2424
Author: Keith Wall <kw...@apache.org>
Authored: Mon Jan 15 15:14:03 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Mon Jan 15 16:17:00 2018 +0000
----------------------------------------------------------------------
.../v1_0/AbstractReceivingLinkEndpoint.java | 55 +++++++---
.../qpid/server/protocol/v1_0/Session_1_0.java | 37 +++++++
.../v1_0/StandardReceivingLinkEndpoint.java | 102 +++++++++++++++----
3 files changed, 161 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3b92c306/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index aec18ad..0872eae 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -22,8 +22,10 @@
package org.apache.qpid.server.protocol.v1_0;
import java.util.Collections;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.qpid.server.protocol.v1_0.delivery.UnsettledDelivery;
import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
@@ -295,7 +297,19 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
final DeliveryState state,
final boolean settled)
{
- if (_unsettled.containsKey(deliveryTag))
+ updateDispositions(Collections.singleton(deliveryTag), state, settled);
+ }
+
+ void updateDispositions(final Set<Binary> deliveryTags,
+ final DeliveryState state,
+ final boolean settled)
+ {
+
+ final Set<Binary> unsettledKeys = new HashSet<>(_unsettled.keySet());
+ unsettledKeys.retainAll(deliveryTags);
+ final int settledDeliveryCount = deliveryTags.size() - unsettledKeys.size();
+
+ if (!unsettledKeys.isEmpty())
{
boolean outcomeUpdate = false;
Outcome outcome = null;
@@ -310,36 +324,49 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
if (outcome != null)
{
- if (!(_unsettled.get(deliveryTag) instanceof Outcome))
+ for (final Binary deliveryTag : unsettledKeys)
{
- Object oldOutcome = _unsettled.put(deliveryTag, outcome);
- outcomeUpdate = !outcome.equals(oldOutcome);
+ if (!(_unsettled.get(deliveryTag) instanceof Outcome))
+ {
+ Object oldOutcome = _unsettled.put(deliveryTag, outcome);
+ outcomeUpdate = outcomeUpdate || !outcome.equals(oldOutcome);
+ }
}
}
if (outcomeUpdate || settled)
{
- getSession().updateDisposition(getRole(), deliveryTag, state, settled);
+ getSession().updateDisposition(getRole(), unsettledKeys, state, settled);
}
if (settled)
{
- if (settled(deliveryTag))
+ int credit = 0;
+ for (final Binary deliveryTag : unsettledKeys)
{
- if (!isDetached() && _creditWindow)
+ if (settled(deliveryTag))
{
- setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
- sendFlowConditional();
- }
- else
- {
- getSession().sendFlowConditional();
+ if (!isDetached() && _creditWindow)
+ {
+ credit++;
+ }
}
}
+
+ if (credit > 0)
+ {
+ setLinkCredit(getLinkCredit().add(UnsignedInteger.valueOf(credit)));
+ sendFlowConditional();
+ }
+ else
+ {
+ getSession().sendFlowConditional();
+ }
}
}
- else if (_creditWindow)
+
+ if (settledDeliveryCount > 0 && _creditWindow)
{
setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
sendFlowConditional();
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3b92c306/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 50b681d..62045a6 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -33,12 +33,16 @@ import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import javax.security.auth.Subject;
@@ -257,6 +261,39 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
updateDisposition(role, deliveryId, deliveryId, state, settled);
}
+ void updateDisposition(final Role role,
+ final Set<Binary> deliveryTags,
+ final DeliveryState state,
+ final boolean settled)
+ {
+ final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry;
+ SortedSet<UnsignedInteger> deliveryIds = deliveryTags.stream()
+ .map(deliveryRegistry::getDeliveryIdByTag)
+ .collect(Collectors.toCollection(TreeSet::new));
+
+ final Iterator<UnsignedInteger> iterator = deliveryIds.iterator();
+ if (iterator.hasNext())
+ {
+ UnsignedInteger begin = iterator.next();
+ UnsignedInteger end = begin;
+ while (iterator.hasNext())
+ {
+ final UnsignedInteger deliveryId = iterator.next();
+ if (!end.add(UnsignedInteger.ONE).equals(deliveryId))
+ {
+ updateDisposition(role, begin, end, state, settled);
+ begin = deliveryId;
+ end = begin;
+ }
+ else
+ {
+ end = deliveryId;
+ }
+ }
+ updateDisposition(role, begin, end, state, settled);
+ }
+ }
+
public boolean hasCreditToSend()
{
boolean b = _remoteIncomingWindow > 0;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3b92c306/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index bc18bfb..fe5c4db 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -25,12 +25,16 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
import java.util.concurrent.ExecutionException;
-import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -80,6 +84,8 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<>();
+ private final Set<PendingDispositionHolder> _pendingDispositions = new LinkedHashSet<>();
+
private final PublishingLink _publishingLink = new PublishingLink()
{
@Override
@@ -306,24 +312,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
if (transaction instanceof AsyncAutoCommitTransaction)
{
- recordFuture(Futures.immediateFuture(null), new ServerTransaction.Action()
- {
- @Override
- public void postCommit()
- {
- updateDisposition(delivery.getDeliveryTag(), resultantState, settled);
- }
-
- @Override
- public void onRollback()
- {
- //TODO: if reject is not supported, check spec behaviour
- Rejected rejected = new Rejected();
- rejected.setError(new Error(AmqpError.ILLEGAL_STATE, "Store transaction unexpectedly rolled-back"));
- DeliveryState state = sourceSupportedOutcomes.contains(Rejected.REJECTED_SYMBOL) ? rejected : resultantState;
- updateDisposition(delivery.getDeliveryTag(), state, settled);
- }
- });
+ _pendingDispositions.add(new PendingDispositionHolder(delivery.getDeliveryTag(), resultantState, settled));
}
else
{
@@ -571,6 +560,49 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
{
cmd.complete();
}
+
+ processPendingDispositions();
+ }
+
+ private void processPendingDispositions()
+ {
+ Iterator<PendingDispositionHolder> itr = _pendingDispositions.isEmpty() ? Collections.emptyIterator() : _pendingDispositions.iterator();
+ if (itr.hasNext())
+ {
+ try
+ {
+ PendingDispositionHolder disposition = itr.next();
+ PendingDispositionHolder current = disposition;
+
+ Set<Binary> deliveryTags = new HashSet<>();
+ deliveryTags.add(disposition.getDeliveryTag());
+
+ while (itr.hasNext())
+ {
+ disposition = itr.next();
+
+ if (current.isSettled() == disposition.isSettled() &&
+ Objects.equals(current.getResultantState(), disposition.getResultantState()))
+ {
+ deliveryTags.add(disposition.getDeliveryTag());
+ }
+ else
+ {
+ updateDispositions(deliveryTags, current.getResultantState(), current.isSettled());
+ deliveryTags.clear();
+ current = disposition;
+ }
+ }
+ if (!deliveryTags.isEmpty())
+ {
+ updateDispositions(deliveryTags, current.getResultantState(), current.isSettled());
+ }
+ }
+ finally
+ {
+ _pendingDispositions.clear();
+ }
+ }
}
private static class AsyncCommand
@@ -627,4 +659,36 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
}
}
+ private static class PendingDispositionHolder
+ {
+ private final Binary _deliveryTag;
+ private final DeliveryState _resultantState;
+ private final boolean _settled;
+
+ PendingDispositionHolder(final Binary deliveryTag,
+ final DeliveryState resultantState,
+ final boolean settled)
+ {
+ _deliveryTag = deliveryTag;
+ _resultantState = resultantState;
+ _settled = settled;
+ }
+
+ Binary getDeliveryTag()
+ {
+ return _deliveryTag;
+ }
+
+ DeliveryState getResultantState()
+ {
+ return _resultantState;
+ }
+
+ boolean isSettled()
+ {
+ return _settled;
+ }
+
+
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org