You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/10/27 13:29:09 UTC
[2/2] qpid-broker-j git commit: QPID-7962: [Broker-J,
AMQP 1.0] In some circumstances Broker fails to send Flow back when
Flow with drain flag set is received from client.
QPID-7962: [Broker-J, AMQP 1.0] In some circumstances Broker fails to send Flow back when Flow with drain flag set is received from client.
* When message is stolen from consumer add it to the queue's list of interested consumers
* Rename QueueConsumer# and ConsumerTarget#queueEmpty to #noMessagesAvailable.
This is now always called when we currently cannot deliver messages to this consumer.
This can occur when
* there is no message (the queue is empty)
* existing messages do not pass through the consumer's filters
* the consumer has no credit
* there is a different consumer with higher priority and credit blocking delivery to this lower priority consumer
The effect of this method is potential auto-close on 0-8 and potential credit draining on 1.0.
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0c6bc977
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0c6bc977
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0c6bc977
Branch: refs/heads/master
Commit: 0c6bc977321a94d1c44441e91d00c74f53200d40
Parents: 67f78d3
Author: Lorenz Quack <lq...@apache.org>
Authored: Thu Oct 26 13:12:58 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Fri Oct 27 14:23:19 2017 +0100
----------------------------------------------------------------------
.../qpid/server/consumer/ConsumerTarget.java | 2 +-
.../qpid/server/message/MessageContainer.java | 13 +---
.../apache/qpid/server/queue/AbstractQueue.java | 63 +++++++-------------
.../apache/qpid/server/queue/QueueConsumer.java | 2 +-
.../qpid/server/queue/QueueConsumerImpl.java | 4 +-
.../security/TrustStoreMessageSource.java | 2 +-
.../AbstractSystemMessageSource.java | 3 +-
.../virtualhost/VirtualHostPropertiesNode.java | 2 +-
.../consumer/AbstractConsumerTargetTest.java | 4 +-
.../server/consumer/TestConsumerTarget.java | 2 +-
.../protocol/v0_10/ConsumerTarget_0_10.java | 2 +-
.../protocol/v0_8/ConsumerTarget_0_8.java | 2 +-
.../protocol/v1_0/ConsumerTarget_1_0.java | 2 +-
.../management/amqp/ManagementNodeConsumer.java | 4 +-
.../management/amqp/ProxyMessageSource.java | 4 +-
15 files changed, 42 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index 1fa7fb5..d766a89 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -66,7 +66,7 @@ public interface ConsumerTarget<T extends ConsumerTarget<T>>
void flushBatched();
- void queueEmpty();
+ void noMessagesAvailable();
boolean allocateCredit(ServerMessage msg);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java
index d0c01c6..31e3d7b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java
@@ -24,19 +24,17 @@ public final class MessageContainer
{
private final MessageInstance _messageInstance;
private final MessageReference<?> _messageReference;
- private final boolean _hasNoAvailableMessages;
- public MessageContainer(final boolean hasNoAvailableMessages)
+ public MessageContainer()
{
- this(null, null, hasNoAvailableMessages);
+ this(null, null);
}
public MessageContainer(final MessageInstance messageInstance,
- final MessageReference<?> messageReference, final boolean hasNoAvailableMessages)
+ final MessageReference<?> messageReference)
{
_messageInstance = messageInstance;
_messageReference = messageReference;
- _hasNoAvailableMessages = hasNoAvailableMessages;
}
public MessageInstance getMessageInstance()
@@ -48,9 +46,4 @@ public final class MessageContainer
{
return _messageReference;
}
-
- public boolean hasNoAvailableMessages()
- {
- return _hasNoAvailableMessages;
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 5bebdb2..45ada19 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -833,7 +833,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
target.consumerAdded(queueConsumer);
if(isEmpty())
{
- target.queueEmpty();
+ target.noMessagesAvailable();
}
target.updateNotifyWorkDesired();
target.notifyWork();
@@ -1954,11 +1954,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
if(messageContainer.getMessageInstance() == null)
{
- if (messageContainer.hasNoAvailableMessages())
- {
- queueEmpty = true;
- }
-
if (consumer.acquires())
{
if (hasAvailableMessages())
@@ -1966,6 +1961,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
notifyOtherConsumers(consumer);
}
}
+
+ consumer.noMessagesAvailable();
messageContainer = null;
}
else
@@ -1981,11 +1978,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
}
finally
{
- if(queueEmpty)
- {
- consumer.queueEmpty();
- }
-
consumer.flushBatched();
}
@@ -1997,8 +1989,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
return _queueStatistics.getAvailableCount() != 0;
}
- private static final MessageContainer NO_MESSAGES = new MessageContainer(true);
- private static final MessageContainer HAS_MESSAGES = new MessageContainer(false);
+ private static final MessageContainer NO_MESSAGES = new MessageContainer();
/**
* Attempt delivery for the given consumer.
@@ -2011,12 +2002,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
*/
private MessageContainer attemptDelivery(QueueConsumer<?,?> sub)
{
- MessageContainer messageContainer;
// avoid referring old deleted queue entry in sub._queueContext._lastSeen
- QueueEntry node = getNextAvailableEntry(sub);
+ QueueEntry node = getNextAvailableEntry(sub);
boolean subActive = sub.isActive() && !sub.isSuspended();
- if (node != null && subActive && (sub.getPriority() == Integer.MAX_VALUE || noHigherPriorityWithCredit(sub, node)))
+ if (node != null && subActive
+ && (sub.getPriority() == Integer.MAX_VALUE || noHigherPriorityWithCredit(sub, node)))
{
if (_virtualHost.getState() != State.ACTIVE)
@@ -2024,41 +2015,33 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
throw new ConnectionScopedRuntimeException("Delivery halted owing to " +
"virtualhost state " + _virtualHost.getState());
}
- messageContainer = HAS_MESSAGES;
- if (node.isAvailable())
+ if (node.isAvailable() && mightAssign(sub, node))
{
- if (sub.hasInterest(node) && mightAssign(sub, node))
+ if (sub.allocateCredit(node))
{
- if (sub.allocateCredit(node))
+ MessageReference messageReference = null;
+ if ((sub.acquires() && !assign(sub, node))
+ || (!sub.acquires() && (messageReference = node.newMessageReference()) == null))
{
- MessageReference messageReference = null;
- if ((sub.acquires() && !assign(sub, node))
- || (!sub.acquires() && (messageReference = node.newMessageReference()) == null))
- {
- // restore credit here that would have been taken away by allocateCredit since we didn't manage
- // to acquire the entry for this consumer
- sub.restoreCredit(node);
-
- }
- else
- {
- setLastSeenEntry(sub, node);
- messageContainer = new MessageContainer(node, messageReference, false);
- }
+ // restore credit here that would have been taken away by allocateCredit since we didn't manage
+ // to acquire the entry for this consumer
+ sub.restoreCredit(node);
}
else
{
- sub.awaitCredit(node);
+ setLastSeenEntry(sub, node);
+ return new MessageContainer(node, messageReference);
}
}
+ else
+ {
+ sub.awaitCredit(node);
+ }
}
}
- else
- {
- messageContainer = node == null ? NO_MESSAGES : HAS_MESSAGES;
- }
- return messageContainer;
+
+ return NO_MESSAGES;
}
private boolean noHigherPriorityWithCredit(final QueueConsumer<?,?> sub, final QueueEntry queueEntry)
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index cc060f5..5fab62a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -31,7 +31,7 @@ public interface QueueConsumer<X extends QueueConsumer<X,T>, T extends ConsumerT
{
void flushBatched();
- void queueEmpty();
+ void noMessagesAvailable();
boolean hasInterest(QueueEntry node);
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 5da8c7d..0972a1a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -308,9 +308,9 @@ class QueueConsumerImpl<T extends ConsumerTarget>
}
@Override
- public void queueEmpty()
+ public void noMessagesAvailable()
{
- _target.queueEmpty();
+ _target.noMessagesAvailable();
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
index 4643eb5..d976006 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
@@ -100,7 +100,7 @@ public class TrustStoreMessageSource extends AbstractSystemMessageSource
{
final Consumer<T> consumer = super.addConsumer(target, filters, messageClass, consumerName, options, priority);
consumer.send(createMessage());
- target.queueEmpty();
+ target.noMessagesAvailable();
return consumer;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
index 3693c70..76d91af 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
@@ -162,8 +162,7 @@ public abstract class AbstractSystemMessageSource implements MessageSource
{
_queue.remove(0);
return new MessageContainer(propertiesMessageInstance,
- propertiesMessageInstance.getMessageReference(),
- false);
+ propertiesMessageInstance.getMessageReference());
}
}
return null;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
index 190f020..e8f33b1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
@@ -57,7 +57,7 @@ public class VirtualHostPropertiesNode extends AbstractSystemMessageSource
{
final Consumer<T> consumer = super.addConsumer(target, filters, messageClass, consumerName, options, priority);
consumer.send(createMessage());
- target.queueEmpty();
+ target.noMessagesAvailable();
return consumer;
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
index 8e41a9c..37b6871 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
@@ -71,7 +71,7 @@ public class AbstractConsumerTargetTest extends QpidTestCase
_messageInstance = mock(MessageInstance.class);
when(_messageInstance.getOwningResource()).thenReturn(_messageSource);
final MessageContainer messageContainer =
- new MessageContainer(_messageInstance, mock(MessageReference.class), false);
+ new MessageContainer(_messageInstance, mock(MessageReference.class));
when(_consumer.pullMessage()).thenReturn(messageContainer);
_consumerTarget = new TestAbstractConsumerTarget();
_consumerTarget.consumerAdded(_consumer);
@@ -263,7 +263,7 @@ public class AbstractConsumerTargetTest extends QpidTestCase
}
@Override
- public void queueEmpty()
+ public void noMessagesAvailable()
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
index 4e05b84..96d24f0 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
@@ -197,7 +197,7 @@ public class TestConsumerTarget implements ConsumerTarget<TestConsumerTarget>
@Override
- public void queueEmpty()
+ public void noMessagesAvailable()
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index ce5551f..21156de 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -577,7 +577,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
}
@Override
- public void queueEmpty()
+ public void noMessagesAvailable()
{
}
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 23d8c0e..b4f06da 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -381,7 +381,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
}
@Override
- public void queueEmpty()
+ public void noMessagesAvailable()
{
if(isAutoClose() && getState() != State.CLOSED)
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 0c0cb8b..fec4c6d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -357,7 +357,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
}
@Override
- public void queueEmpty()
+ public void noMessagesAvailable()
{
if(_linkEndpoint.drained())
{
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index 7035210..7a272fa 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -90,9 +90,7 @@ class ManagementNodeConsumer<T extends ConsumerTarget> implements MessageInstanc
if (!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
{
_queue.remove(0);
- return new MessageContainer(managementResponse,
- managementResponse.getMessageReference(),
- false);
+ return new MessageContainer(managementResponse, managementResponse.getMessageReference());
}
}
return null;
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
index 0e546f1..d66dec3 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
@@ -322,9 +322,9 @@ public class ProxyMessageSource implements MessageSource, MessageDestination
}
@Override
- public void queueEmpty()
+ public void noMessagesAvailable()
{
- _underlying.queueEmpty();
+ _underlying.noMessagesAvailable();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org