You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/05/11 13:12:57 UTC

[2/2] qpid-broker-j git commit: Revert: QPID-7634: [Java Broker] Ensure flow is sent after receiving Flow.drain=true

Revert: QPID-7634: [Java Broker] Ensure flow is sent after receiving Flow.drain=true

Keep the test


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/12f92edd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/12f92edd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/12f92edd

Branch: refs/heads/master
Commit: 12f92eddc99c92001103431d379af078eb6f0010
Parents: 17ce2e3
Author: Lorenz Quack <lq...@apache.org>
Authored: Thu May 11 11:08:07 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Thu May 11 13:56:14 2017 +0100

----------------------------------------------------------------------
 .../protocol/v1_0/ConsumerTarget_1_0.java       |  5 +++-
 .../protocol/v1_0/SendingLinkEndpoint.java      | 24 ++++++++++++++++----
 2 files changed, 24 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/12f92edd/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 2f98340..41a3fbc 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -309,7 +309,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
 
     public void queueEmpty()
     {
-
+        if(_linkEndpoint.drained())
+        {
+            updateNotifyWorkDesired();
+        }
     }
 
     public void flowStateChanged()

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/12f92edd/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 0f6c7b4..d89b637 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -48,6 +48,7 @@ import org.apache.qpid.server.model.NamedAddressSpace;
 import org.apache.qpid.server.model.NotFoundException;
 import org.apache.qpid.server.model.Queue;
 import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
 import org.apache.qpid.server.protocol.v1_0.type.Binary;
 import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
 import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -88,6 +89,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
     private Integer _priority;
     private final List<Binary> _resumeAcceptedTransfers = new ArrayList<>();
     private final List<MessageInstance> _resumeFullTransfers = new ArrayList<>();
+    private volatile boolean _draining = false;
     private final ConcurrentMap<Binary, UnsettledAction> _unsettledActionMap = new ConcurrentHashMap<>();
     private SendingDestination _destination;
     private EnumSet<ConsumerOption> _consumerOptions;
@@ -411,6 +413,23 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
         return true;
     }
 
+
+    public boolean drained()
+    {
+        if (_draining)
+        {
+            setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
+            setLinkCredit(UnsignedInteger.ZERO);
+            sendFlow();
+            _draining = false;
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+
     @Override
     public void receiveFlow(final Flow flow)
     {
@@ -449,11 +468,8 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
     {
         if(Boolean.TRUE.equals(getDrain()) && getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0)
         {
+            _draining = true;
             getConsumerTarget().flush();
-            setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
-            setLinkCredit(UnsignedInteger.ZERO);
-            sendFlow();
-            getConsumerTarget().updateNotifyWorkDesired();
         }
 
         while(!_resumeAcceptedTransfers.isEmpty() && hasCreditToSend())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org