You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/10/27 13:29:09 UTC

[2/2] qpid-broker-j git commit: QPID-7962: [Broker-J, AMQP 1.0] In some circumstances Broker fails to send Flow back when Flow with drain flag set is received from client.

QPID-7962: [Broker-J, AMQP 1.0] In some circumstances Broker fails to send Flow back when Flow with drain flag set is received from client.

 * When message is stolen from consumer add it to the queue's list of interested consumers
 * Rename QueueConsumer# and ConsumerTarget#queueEmpty to #noMessagesAvailable.
   This is now always called when we currently cannot deliver messages to this consumer.
   This can occur when
    * there is no message (the queue is empty)
    * existing messages do not pass through the consumer's filters
    * the consumer has no credit
    * there is a different consumer with higher priority and credit blocking delivery to this lower priority consumer
   The effect of this method is potential auto-close on 0-8 and potential credit draining on 1.0.


Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/0c6bc977
Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/0c6bc977
Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/0c6bc977

Branch: refs/heads/master
Commit: 0c6bc977321a94d1c44441e91d00c74f53200d40
Parents: 67f78d3
Author: Lorenz Quack <lq...@apache.org>
Authored: Thu Oct 26 13:12:58 2017 +0100
Committer: Lorenz Quack <lq...@apache.org>
Committed: Fri Oct 27 14:23:19 2017 +0100

----------------------------------------------------------------------
 .../qpid/server/consumer/ConsumerTarget.java    |  2 +-
 .../qpid/server/message/MessageContainer.java   | 13 +---
 .../apache/qpid/server/queue/AbstractQueue.java | 63 +++++++-------------
 .../apache/qpid/server/queue/QueueConsumer.java |  2 +-
 .../qpid/server/queue/QueueConsumerImpl.java    |  4 +-
 .../security/TrustStoreMessageSource.java       |  2 +-
 .../AbstractSystemMessageSource.java            |  3 +-
 .../virtualhost/VirtualHostPropertiesNode.java  |  2 +-
 .../consumer/AbstractConsumerTargetTest.java    |  4 +-
 .../server/consumer/TestConsumerTarget.java     |  2 +-
 .../protocol/v0_10/ConsumerTarget_0_10.java     |  2 +-
 .../protocol/v0_8/ConsumerTarget_0_8.java       |  2 +-
 .../protocol/v1_0/ConsumerTarget_1_0.java       |  2 +-
 .../management/amqp/ManagementNodeConsumer.java |  4 +-
 .../management/amqp/ProxyMessageSource.java     |  4 +-
 15 files changed, 42 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
index 1fa7fb5..d766a89 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java
@@ -66,7 +66,7 @@ public interface ConsumerTarget<T extends ConsumerTarget<T>>
 
     void flushBatched();
 
-    void queueEmpty();
+    void noMessagesAvailable();
 
     boolean allocateCredit(ServerMessage msg);
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java b/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java
index d0c01c6..31e3d7b 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/message/MessageContainer.java
@@ -24,19 +24,17 @@ public final class MessageContainer
 {
     private final MessageInstance _messageInstance;
     private final MessageReference<?> _messageReference;
-    private final boolean _hasNoAvailableMessages;
 
-    public MessageContainer(final boolean hasNoAvailableMessages)
+    public MessageContainer()
     {
-        this(null, null, hasNoAvailableMessages);
+        this(null, null);
     }
 
     public MessageContainer(final MessageInstance messageInstance,
-                            final MessageReference<?> messageReference, final boolean hasNoAvailableMessages)
+                            final MessageReference<?> messageReference)
     {
         _messageInstance = messageInstance;
         _messageReference = messageReference;
-        _hasNoAvailableMessages = hasNoAvailableMessages;
     }
 
     public MessageInstance getMessageInstance()
@@ -48,9 +46,4 @@ public final class MessageContainer
     {
         return _messageReference;
     }
-
-    public boolean hasNoAvailableMessages()
-    {
-        return _hasNoAvailableMessages;
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index 5bebdb2..45ada19 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -833,7 +833,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
             target.consumerAdded(queueConsumer);
             if(isEmpty())
             {
-                target.queueEmpty();
+                target.noMessagesAvailable();
             }
             target.updateNotifyWorkDesired();
             target.notifyWork();
@@ -1954,11 +1954,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
 
                 if(messageContainer.getMessageInstance() == null)
                 {
-                    if (messageContainer.hasNoAvailableMessages())
-                    {
-                        queueEmpty = true;
-                    }
-
                     if (consumer.acquires())
                     {
                         if (hasAvailableMessages())
@@ -1966,6 +1961,8 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
                             notifyOtherConsumers(consumer);
                         }
                     }
+
+                    consumer.noMessagesAvailable();
                     messageContainer = null;
                 }
                 else
@@ -1981,11 +1978,6 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         }
         finally
         {
-            if(queueEmpty)
-            {
-                consumer.queueEmpty();
-            }
-
             consumer.flushBatched();
         }
 
@@ -1997,8 +1989,7 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
         return _queueStatistics.getAvailableCount() != 0;
     }
 
-    private static final MessageContainer NO_MESSAGES = new MessageContainer(true);
-    private static final MessageContainer HAS_MESSAGES = new MessageContainer(false);
+    private static final MessageContainer NO_MESSAGES = new MessageContainer();
 
     /**
      * Attempt delivery for the given consumer.
@@ -2011,12 +2002,12 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
      */
     private MessageContainer attemptDelivery(QueueConsumer<?,?> sub)
     {
-        MessageContainer messageContainer;
         // avoid referring old deleted queue entry in sub._queueContext._lastSeen
-        QueueEntry node  = getNextAvailableEntry(sub);
+        QueueEntry node = getNextAvailableEntry(sub);
         boolean subActive = sub.isActive() && !sub.isSuspended();
 
-        if (node != null && subActive && (sub.getPriority() == Integer.MAX_VALUE || noHigherPriorityWithCredit(sub, node)))
+        if (node != null && subActive
+            && (sub.getPriority() == Integer.MAX_VALUE || noHigherPriorityWithCredit(sub, node)))
         {
 
             if (_virtualHost.getState() != State.ACTIVE)
@@ -2024,41 +2015,33 @@ public abstract class AbstractQueue<X extends AbstractQueue<X>>
                 throw new ConnectionScopedRuntimeException("Delivery halted owing to " +
                                                            "virtualhost state " + _virtualHost.getState());
             }
-            messageContainer = HAS_MESSAGES;
 
-            if (node.isAvailable())
+            if (node.isAvailable() && mightAssign(sub, node))
             {
-                if (sub.hasInterest(node) && mightAssign(sub, node))
+                if (sub.allocateCredit(node))
                 {
-                    if (sub.allocateCredit(node))
+                    MessageReference messageReference = null;
+                    if ((sub.acquires() && !assign(sub, node))
+                        || (!sub.acquires() && (messageReference = node.newMessageReference()) == null))
                     {
-                        MessageReference messageReference = null;
-                        if ((sub.acquires() && !assign(sub, node))
-                            || (!sub.acquires() && (messageReference = node.newMessageReference()) == null))
-                        {
-                            // restore credit here that would have been taken away by allocateCredit since we didn't manage
-                            // to acquire the entry for this consumer
-                            sub.restoreCredit(node);
-
-                        }
-                        else
-                        {
-                            setLastSeenEntry(sub, node);
-                            messageContainer = new MessageContainer(node, messageReference, false);
-                        }
+                        // restore credit here that would have been taken away by allocateCredit since we didn't manage
+                        // to acquire the entry for this consumer
+                        sub.restoreCredit(node);
                     }
                     else
                     {
-                        sub.awaitCredit(node);
+                        setLastSeenEntry(sub, node);
+                        return new MessageContainer(node, messageReference);
                     }
                 }
+                else
+                {
+                    sub.awaitCredit(node);
+                }
             }
         }
-        else
-        {
-            messageContainer = node == null ? NO_MESSAGES : HAS_MESSAGES;
-        }
-        return messageContainer;
+
+        return NO_MESSAGES;
     }
 
     private boolean noHigherPriorityWithCredit(final QueueConsumer<?,?> sub, final QueueEntry queueEntry)

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
index cc060f5..5fab62a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java
@@ -31,7 +31,7 @@ public interface QueueConsumer<X extends QueueConsumer<X,T>, T extends ConsumerT
 {
     void flushBatched();
 
-    void queueEmpty();
+    void noMessagesAvailable();
 
     boolean hasInterest(QueueEntry node);
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index 5da8c7d..0972a1a 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -308,9 +308,9 @@ class QueueConsumerImpl<T extends ConsumerTarget>
     }
 
     @Override
-    public void queueEmpty()
+    public void noMessagesAvailable()
     {
-        _target.queueEmpty();
+        _target.noMessagesAvailable();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
index 4643eb5..d976006 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/security/TrustStoreMessageSource.java
@@ -100,7 +100,7 @@ public class TrustStoreMessageSource extends AbstractSystemMessageSource
     {
         final Consumer<T> consumer = super.addConsumer(target, filters, messageClass, consumerName, options, priority);
         consumer.send(createMessage());
-        target.queueEmpty();
+        target.noMessagesAvailable();
         return consumer;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
index 3693c70..76d91af 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractSystemMessageSource.java
@@ -162,8 +162,7 @@ public abstract class AbstractSystemMessageSource implements MessageSource
                 {
                     _queue.remove(0);
                     return new MessageContainer(propertiesMessageInstance,
-                                                propertiesMessageInstance.getMessageReference(),
-                                                false);
+                                                propertiesMessageInstance.getMessageReference());
                 }
             }
             return null;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
----------------------------------------------------------------------
diff --git a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
index 190f020..e8f33b1 100644
--- a/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
+++ b/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostPropertiesNode.java
@@ -57,7 +57,7 @@ public class VirtualHostPropertiesNode extends AbstractSystemMessageSource
     {
         final Consumer<T> consumer = super.addConsumer(target, filters, messageClass, consumerName, options, priority);
         consumer.send(createMessage());
-        target.queueEmpty();
+        target.noMessagesAvailable();
         return consumer;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
index 8e41a9c..37b6871 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/AbstractConsumerTargetTest.java
@@ -71,7 +71,7 @@ public class AbstractConsumerTargetTest extends QpidTestCase
         _messageInstance = mock(MessageInstance.class);
         when(_messageInstance.getOwningResource()).thenReturn(_messageSource);
         final MessageContainer messageContainer =
-                new MessageContainer(_messageInstance, mock(MessageReference.class), false);
+                new MessageContainer(_messageInstance, mock(MessageReference.class));
         when(_consumer.pullMessage()).thenReturn(messageContainer);
         _consumerTarget = new TestAbstractConsumerTarget();
         _consumerTarget.consumerAdded(_consumer);
@@ -263,7 +263,7 @@ public class AbstractConsumerTargetTest extends QpidTestCase
         }
 
         @Override
-        public void queueEmpty()
+        public void noMessagesAvailable()
         {
 
         }

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
----------------------------------------------------------------------
diff --git a/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java b/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
index 4e05b84..96d24f0 100644
--- a/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
+++ b/broker-core/src/test/java/org/apache/qpid/server/consumer/TestConsumerTarget.java
@@ -197,7 +197,7 @@ public class TestConsumerTarget implements ConsumerTarget<TestConsumerTarget>
 
 
     @Override
-    public void queueEmpty()
+    public void noMessagesAvailable()
     {
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index ce5551f..21156de 100644
--- a/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -577,7 +577,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget<ConsumerTarget_0
     }
 
     @Override
-    public void queueEmpty()
+    public void noMessagesAvailable()
     {
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
index 23d8c0e..b4f06da 100644
--- a/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
+++ b/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
@@ -381,7 +381,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget<Consumer
     }
 
     @Override
-    public void queueEmpty()
+    public void noMessagesAvailable()
     {
         if(isAutoClose() && getState() != State.CLOSED)
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
----------------------------------------------------------------------
diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
index 0c0cb8b..fec4c6d 100644
--- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
+++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java
@@ -357,7 +357,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget<ConsumerTarget_1_0>
     }
 
     @Override
-    public void queueEmpty()
+    public void noMessagesAvailable()
     {
         if(_linkEndpoint.drained())
         {

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
index 7035210..7a272fa 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java
@@ -90,9 +90,7 @@ class ManagementNodeConsumer<T extends ConsumerTarget> implements MessageInstanc
             if (!_target.isSuspended() && _target.allocateCredit(managementResponse.getMessage()))
             {
                 _queue.remove(0);
-                return new MessageContainer(managementResponse,
-                                            managementResponse.getMessageReference(),
-                                            false);
+                return new MessageContainer(managementResponse, managementResponse.getMessageReference());
             }
         }
         return null;

http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/0c6bc977/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
----------------------------------------------------------------------
diff --git a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
index 0e546f1..d66dec3 100644
--- a/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
+++ b/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ProxyMessageSource.java
@@ -322,9 +322,9 @@ public class ProxyMessageSource implements MessageSource, MessageDestination
         }
 
         @Override
-        public void queueEmpty()
+        public void noMessagesAvailable()
         {
-            _underlying.queueEmpty();
+            _underlying.noMessagesAvailable();
         }
 
         @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org