You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/05/11 13:12:57 UTC
[2/2] qpid-broker-j git commit: Revert: QPID-7634: [Java Broker]
Ensure flow is sent after receiving Flow.drain=true
Revert: QPID-7634: [Java Broker] Ensure flow is sent after receiving Flow.drain=true
Keep the test
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/12f92edd
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/12f92edd
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/12f92edd
Branch: refs/heads/master
Commit: 12f92eddc99c92001103431d379af078eb6f0010
Parents: 17ce2e3
Author: Lorenz Quack <lq...@apache.org>
Authored: Thu May 11 11:08:07 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Thu May 11 13:56:14 2017 +0100
----------------------------------------------------------------------
.../protocol/v1_0/ConsumerTarget_1_0.java | 5 +++-
.../protocol/v1_0/SendingLinkEndpoint.java | 24 ++++++++++++++++----
2 files changed, 24 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/12f92edd/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 2f98340..41a3fbc 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -309,7 +309,10 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
public void queueEmpty()
{
-
+ if(_linkEndpoint.drained())
+ {
+ updateNotifyWorkDesired();
+ }
}
public void flowStateChanged()
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/12f92edd/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
index 0f6c7b4..d89b637 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java
@@ -48,6 +48,7 @@ import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.NotFoundException;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException;
+import org.apache.qpid.server.protocol.v1_0.type.BaseTarget;
import org.apache.qpid.server.protocol.v1_0.type.Binary;
import org.apache.qpid.server.protocol.v1_0.type.DeliveryState;
import org.apache.qpid.server.protocol.v1_0.type.Outcome;
@@ -88,6 +89,7 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
private Integer _priority;
private final List<Binary> _resumeAcceptedTransfers = new ArrayList<>();
private final List<MessageInstance> _resumeFullTransfers = new ArrayList<>();
+ private volatile boolean _draining = false;
private final ConcurrentMap<Binary, UnsettledAction> _unsettledActionMap = new ConcurrentHashMap<>();
private SendingDestination _destination;
private EnumSet<ConsumerOption> _consumerOptions;
@@ -411,6 +413,23 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
return true;
}
+
+ public boolean drained()
+ {
+ if (_draining)
+ {
+ setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
+ setLinkCredit(UnsignedInteger.ZERO);
+ sendFlow();
+ _draining = false;
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
@Override
public void receiveFlow(final Flow flow)
{
@@ -449,11 +468,8 @@ public class SendingLinkEndpoint extends AbstractLinkEndpoint<Source, Target>
{
if(Boolean.TRUE.equals(getDrain()) && getLinkCredit().compareTo(UnsignedInteger.ZERO) > 0)
{
+ _draining = true;
getConsumerTarget().flush();
- setDeliveryCount(getDeliveryCount().add(getLinkCredit()));
- setLinkCredit(UnsignedInteger.ZERO);
- sendFlow();
- getConsumerTarget().updateNotifyWorkDesired();
}
while(!_resumeAcceptedTransfers.isEmpty() && hasCreditToSend())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org