You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2018/01/15 16:18:17 UTC

qpid-broker-j git commit: QPID-8032: [Broker-J][AMQP 1.0] Coalesce the dispositions for non-transactional transfers

Repository: qpid-broker-j
Updated Branches:
  refs/heads/master bce2424df -> 3b92c3063


QPID-8032: [Broker-J][AMQP 1.0] Coalesce the dispositions for non-transactional transfers


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/3b92c306
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/3b92c306
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/3b92c306

Branch: refs/heads/master
Commit: 3b92c3063ee213a9e4c4527acd139d40a4237d56
Parents: bce2424
Author: Keith Wall <kw...@apache.org>
Authored: Mon Jan 15 15:14:03 2018 +0000
Committer: Keith Wall <kw...@apache.org>
Committed: Mon Jan 15 16:17:00 2018 +0000

----------------------------------------------------------------------
 .../v1_0/AbstractReceivingLinkEndpoint.java     |  55 +++++++---
 .../qpid/server/protocol/v1_0/Session_1_0.java  |  37 +++++++
 .../v1_0/StandardReceivingLinkEndpoint.java     | 102 +++++++++++++++----
 3 files changed, 161 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3b92c306/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
index aec18ad..0872eae 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AbstractReceivingLinkEndpoint.java
@@ -22,8 +22,10 @@
 package org.apache.qpid.server.protocol.v1_0;
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.qpid.server.protocol.v1_0.delivery.UnsettledDelivery;
 import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder;
@@ -295,7 +297,19 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
                            final DeliveryState state,
                            final boolean settled)
     {
-        if (_unsettled.containsKey(deliveryTag))
+        updateDispositions(Collections.singleton(deliveryTag), state, settled);
+    }
+
+    void updateDispositions(final Set<Binary> deliveryTags,
+                           final DeliveryState state,
+                           final boolean settled)
+    {
+
+        final Set<Binary> unsettledKeys = new HashSet<>(_unsettled.keySet());
+        unsettledKeys.retainAll(deliveryTags);
+        final int settledDeliveryCount = deliveryTags.size() - unsettledKeys.size();
+
+        if (!unsettledKeys.isEmpty())
         {
             boolean outcomeUpdate = false;
             Outcome outcome = null;
@@ -310,36 +324,49 @@ public abstract class AbstractReceivingLinkEndpoint<T extends BaseTarget> extend
 
             if (outcome != null)
             {
-                if (!(_unsettled.get(deliveryTag) instanceof Outcome))
+                for (final Binary deliveryTag : unsettledKeys)
                 {
-                    Object oldOutcome = _unsettled.put(deliveryTag, outcome);
-                    outcomeUpdate = !outcome.equals(oldOutcome);
+                    if (!(_unsettled.get(deliveryTag) instanceof Outcome))
+                    {
+                        Object oldOutcome = _unsettled.put(deliveryTag, outcome);
+                        outcomeUpdate = outcomeUpdate || !outcome.equals(oldOutcome);
+                    }
                 }
             }
 
             if (outcomeUpdate || settled)
             {
-                getSession().updateDisposition(getRole(), deliveryTag, state, settled);
+                getSession().updateDisposition(getRole(), unsettledKeys, state, settled);
             }
 
             if (settled)
             {
 
-                if (settled(deliveryTag))
+                int credit = 0;
+                for (final Binary deliveryTag : unsettledKeys)
                 {
-                    if (!isDetached() && _creditWindow)
+                    if (settled(deliveryTag))
                     {
-                        setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
-                        sendFlowConditional();
-                    }
-                    else
-                    {
-                        getSession().sendFlowConditional();
+                        if (!isDetached() && _creditWindow)
+                        {
+                            credit++;
+                        }
                     }
                 }
+
+                if (credit > 0)
+                {
+                    setLinkCredit(getLinkCredit().add(UnsignedInteger.valueOf(credit)));
+                    sendFlowConditional();
+                }
+                else
+                {
+                    getSession().sendFlowConditional();
+                }
             }
         }
-        else if (_creditWindow)
+
+        if (settledDeliveryCount > 0 && _creditWindow)
         {
             setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE));
             sendFlowConditional();

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3b92c306/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 50b681d..62045a6 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -33,12 +33,16 @@ import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
 
 import javax.security.auth.Subject;
 
@@ -257,6 +261,39 @@ public class Session_1_0 extends AbstractAMQPSession<Session_1_0, ConsumerTarget
         updateDisposition(role, deliveryId, deliveryId, state, settled);
     }
 
+    void updateDisposition(final Role role,
+                           final Set<Binary> deliveryTags,
+                           final DeliveryState state,
+                           final boolean settled)
+    {
+        final DeliveryRegistry deliveryRegistry = role == Role.RECEIVER ? _incomingDeliveryRegistry : _outgoingDeliveryRegistry;
+        SortedSet<UnsignedInteger> deliveryIds = deliveryTags.stream()
+                                                             .map(deliveryRegistry::getDeliveryIdByTag)
+                                                             .collect(Collectors.toCollection(TreeSet::new));
+
+        final Iterator<UnsignedInteger> iterator = deliveryIds.iterator();
+        if (iterator.hasNext())
+        {
+            UnsignedInteger begin = iterator.next();
+            UnsignedInteger end = begin;
+            while (iterator.hasNext())
+            {
+                final UnsignedInteger deliveryId = iterator.next();
+                if (!end.add(UnsignedInteger.ONE).equals(deliveryId))
+                {
+                    updateDisposition(role, begin, end, state, settled);
+                    begin = deliveryId;
+                    end = begin;
+                }
+                else
+                {
+                    end = deliveryId;
+                }
+            }
+            updateDisposition(role, begin, end, state, settled);
+        }
+    }
+
     public boolean hasCreditToSend()
     {
         boolean b = _remoteIncomingWindow > 0;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/3b92c306/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
index bc18bfb..fe5c4db 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
@@ -25,12 +25,16 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.ExecutionException;
 
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -80,6 +84,8 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
 
     private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<>();
 
+    private final Set<PendingDispositionHolder> _pendingDispositions = new LinkedHashSet<>();
+
     private final PublishingLink _publishingLink = new PublishingLink()
     {
         @Override
@@ -306,24 +312,7 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
 
                     if (transaction instanceof AsyncAutoCommitTransaction)
                     {
-                        recordFuture(Futures.immediateFuture(null), new ServerTransaction.Action()
-                        {
-                            @Override
-                            public void postCommit()
-                            {
-                                updateDisposition(delivery.getDeliveryTag(), resultantState, settled);
-                            }
-
-                            @Override
-                            public void onRollback()
-                            {
-                                //TODO: if reject is not supported, check spec behaviour
-                                Rejected rejected = new Rejected();
-                                rejected.setError(new Error(AmqpError.ILLEGAL_STATE, "Store transaction unexpectedly rolled-back"));
-                                DeliveryState state = sourceSupportedOutcomes.contains(Rejected.REJECTED_SYMBOL) ? rejected : resultantState;
-                                updateDisposition(delivery.getDeliveryTag(), state, settled);
-                            }
-                        });
+                        _pendingDispositions.add(new PendingDispositionHolder(delivery.getDeliveryTag(), resultantState, settled));
                     }
                     else
                     {
@@ -571,6 +560,49 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         {
             cmd.complete();
         }
+
+        processPendingDispositions();
+    }
+
+    private void processPendingDispositions()
+    {
+        Iterator<PendingDispositionHolder> itr = _pendingDispositions.isEmpty() ? Collections.emptyIterator() : _pendingDispositions.iterator();
+        if (itr.hasNext())
+        {
+            try
+            {
+                PendingDispositionHolder disposition = itr.next();
+                PendingDispositionHolder current = disposition;
+
+                Set<Binary> deliveryTags = new HashSet<>();
+                deliveryTags.add(disposition.getDeliveryTag());
+
+                while (itr.hasNext())
+                {
+                    disposition = itr.next();
+
+                    if (current.isSettled() == disposition.isSettled() &&
+                        Objects.equals(current.getResultantState(), disposition.getResultantState()))
+                    {
+                        deliveryTags.add(disposition.getDeliveryTag());
+                    }
+                    else
+                    {
+                        updateDispositions(deliveryTags, current.getResultantState(), current.isSettled());
+                        deliveryTags.clear();
+                        current = disposition;
+                    }
+                }
+                if (!deliveryTags.isEmpty())
+                {
+                    updateDispositions(deliveryTags, current.getResultantState(), current.isSettled());
+                }
+            }
+            finally
+            {
+                _pendingDispositions.clear();
+            }
+        }
     }
 
     private static class AsyncCommand
@@ -627,4 +659,36 @@ public class StandardReceivingLinkEndpoint extends AbstractReceivingLinkEndpoint
         }
     }
 
+    private static class PendingDispositionHolder
+    {
+        private final Binary _deliveryTag;
+        private final DeliveryState _resultantState;
+        private final boolean _settled;
+
+        PendingDispositionHolder(final Binary deliveryTag,
+                                 final DeliveryState resultantState,
+                                 final boolean settled)
+        {
+            _deliveryTag = deliveryTag;
+            _resultantState = resultantState;
+            _settled = settled;
+        }
+
+        Binary getDeliveryTag()
+        {
+            return _deliveryTag;
+        }
+
+        DeliveryState getResultantState()
+        {
+            return _resultantState;
+        }
+
+        boolean isSettled()
+        {
+            return _settled;
+        }
+
+
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org