You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/02/23 17:11:05 UTC

svn commit: r1784178 [1/3] - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/logging/messages/ broker-core/src/main/java/org/apache/qpid/server/message/ broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/...

Author: orudyy
Date: Thu Feb 23 17:11:04 2017
New Revision: 1784178

URL: http://svn.apache.org/viewvc?rev=1784178&view=rev
Log:
QPID-7618: [Java Broker] Introduce overflow policy for producer flow control and move existing producer flow control functionality into overflow policy handler

Added:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java
      - copied, changed from r1784177, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
Removed:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
    qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
    qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java
    qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/addQueue.html
    qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/editQueue.html
    qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
    qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
    qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js
    qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/showQueue.html
    qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/QueueRestTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/acl/QueueRestACLTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java
    qpid/java/trunk/test-profiles/Java10Excludes

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java Thu Feb 23 17:11:04 2017
@@ -64,6 +64,7 @@ public class QueueMessages
 
     public static final String QUEUE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue";
     public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.created";
+    public static final String DROPPED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.dropped";
     public static final String OVERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.overfull";
     public static final String OPERATION_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.operation";
     public static final String FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.flow_to_disk_active";
@@ -75,6 +76,7 @@ public class QueueMessages
     {
         LoggerFactory.getLogger(QUEUE_LOG_HIERARCHY);
         LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
+        LoggerFactory.getLogger(DROPPED_LOG_HIERARCHY);
         LoggerFactory.getLogger(OVERFULL_LOG_HIERARCHY);
         LoggerFactory.getLogger(OPERATION_LOG_HIERARCHY);
         LoggerFactory.getLogger(FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY);
@@ -208,16 +210,74 @@ public class QueueMessages
 
     /**
      * Log a Queue message of the Format:
-     * <pre>QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}</pre>
+     * <pre>QUE-1005 : Dropped : {0,number} messages, Depth : {1,number} bytes, {2,number} messages, Capacity : {3,number} bytes, {4,number} messages</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.
      *
      */
-    public static LogMessage OVERFULL(Number param1, Number param2)
+    public static LogMessage DROPPED(Number param1, Number param2, Number param3, Number param4, Number param5)
+    {
+        String rawMessage = _messages.getString("DROPPED");
+
+        final Object[] messageArguments = {param1, param2, param3, param4, param5};
+        // Create a new MessageFormat to ensure thread safety.
+        // Sharing a MessageFormat and using applyPattern is not thread safe
+        MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+        final String message = formatter.format(messageArguments);
+
+        return new LogMessage()
+        {
+            public String toString()
+            {
+                return message;
+            }
+
+            public String getLogHierarchy()
+            {
+                return DROPPED_LOG_HIERARCHY;
+            }
+
+            @Override
+            public boolean equals(final Object o)
+            {
+                if (this == o)
+                {
+                    return true;
+                }
+                if (o == null || getClass() != o.getClass())
+                {
+                    return false;
+                }
+
+                final LogMessage that = (LogMessage) o;
+
+                return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+            }
+
+            @Override
+            public int hashCode()
+            {
+                int result = toString().hashCode();
+                result = 31 * result + getLogHierarchy().hashCode();
+                return result;
+            }
+        };
+    }
+
+    /**
+     * Log a Queue message of the Format:
+     * <pre>QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}</pre>
+     * Optional values are contained in [square brackets] and are numbered
+     * sequentially in the method call.
+     *
+     */
+    public static LogMessage OVERFULL(Number param1, Number param2, Number param3, Number param4)
     {
         String rawMessage = _messages.getString("OVERFULL");
 
-        final Object[] messageArguments = {param1, param2};
+        final Object[] messageArguments = {param1, param2, param3, param4};
         // Create a new MessageFormat to ensure thread safety.
         // Sharing a MessageFormat and using applyPattern is not thread safe
         MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
@@ -382,16 +442,16 @@ public class QueueMessages
 
     /**
      * Log a Queue message of the Format:
-     * <pre>QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}</pre>
+     * <pre>QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}</pre>
      * Optional values are contained in [square brackets] and are numbered
      * sequentially in the method call.
      *
      */
-    public static LogMessage UNDERFULL(Number param1, Number param2)
+    public static LogMessage UNDERFULL(Number param1, Number param2, Number param3, Number param4)
     {
         String rawMessage = _messages.getString("UNDERFULL");
 
-        final Object[] messageArguments = {param1, param2};
+        final Object[] messageArguments = {param1, param2, param3, param4};
         // Create a new MessageFormat to ensure thread safety.
         // Sharing a MessageFormat and using applyPattern is not thread safe
         MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties Thu Feb 23 17:11:04 2017
@@ -23,8 +23,9 @@
 # 3 - priority
 CREATED = QUE-1001 : Create : ID: {0}[ Owner: {1}][ AutoDelete][ Durable][ Transient][ Priority: {2,number,#}]
 DELETED = QUE-1002 : Deleted : ID: {0}
-OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}
-UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
+OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
+UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
+DROPPED = QUE-1005 : Dropped : {0,number} messages, Depth : {1,number} bytes, {2,number} messages, Capacity : {3,number} bytes, {4,number} messages
 
 # use similar number to the broker for similar topic
 FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active :  Message memory use {0,number,#} kB exceeds threshold {1,number,#.##} kB

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java Thu Feb 23 17:11:04 2017
@@ -22,7 +22,9 @@ package org.apache.qpid.server.message;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 import java.util.Set;
 
 import org.slf4j.Logger;
@@ -41,14 +43,7 @@ public class RoutingResult<M extends Ser
     private final M _message;
 
     private final Set<BaseQueue> _queues = new HashSet<>();
-
-    private int _errorCodeAmqp_0_10;
-
-    private String _errorCodeAmqp_1_0;
-
-    private String _errorMessage;
-
-    private boolean _routingFailure = false;
+    private final Map<BaseQueue, CharSequence> _notAcceptingRoutableQueues = new HashMap<>();
 
     public RoutingResult(final M message)
     {
@@ -67,7 +62,7 @@ public class RoutingResult<M extends Ser
         }
     }
 
-    public void addQueues(Collection<? extends BaseQueue> queues)
+    private void addQueues(Collection<? extends BaseQueue> queues)
     {
         boolean deletedQueues = false;
         for(BaseQueue q : queues)
@@ -90,16 +85,13 @@ public class RoutingResult<M extends Ser
 
     public void add(RoutingResult<M> result)
     {
-        if (result.isRoutingFailure())
-        {
-            _routingFailure = result._routingFailure;
-            _errorCodeAmqp_0_10 = result._errorCodeAmqp_0_10;
-            _errorCodeAmqp_1_0 = result._errorCodeAmqp_1_0;
-            _errorMessage = result._errorMessage;
-        }
-        else
+        addQueues(result._queues);
+        for (Map.Entry<BaseQueue, CharSequence> e : result._notAcceptingRoutableQueues.entrySet())
         {
-            addQueues(result._queues);
+            if (!e.getKey().isDeleted())
+            {
+                _notAcceptingRoutableQueues.put(e.getKey(), e.getValue());
+            }
         }
     }
 
@@ -163,31 +155,27 @@ public class RoutingResult<M extends Ser
         return !_queues.isEmpty();
     }
 
-    public void addRoutingFailure(int errorCodeAmqp_0_10, String errorCodeAmqp_1_0, String reason)
+    public void addNotAcceptingRoutableQueue(BaseQueue q, CharSequence reason)
     {
-        _routingFailure = true;
-        this._errorCodeAmqp_0_10 = errorCodeAmqp_0_10;
-        this._errorCodeAmqp_1_0 = errorCodeAmqp_1_0;
-        _errorMessage = reason;
+        _notAcceptingRoutableQueues.put(q, reason);
     }
 
-    public String getErrorMessage()
+    public boolean hasNotAcceptingRoutableQueue()
     {
-        return _errorMessage;
+        return !_notAcceptingRoutableQueues.isEmpty();
     }
 
-    public int getErrorCodeAmqp_0_10()
+    public String getUnacceptanceCause()
     {
-        return _errorCodeAmqp_0_10;
-    }
-
-    public String getErrorCodeAmqp_1_0()
-    {
-        return _errorCodeAmqp_1_0;
-    }
-
-    public boolean isRoutingFailure()
-    {
-        return _routingFailure;
+        StringBuilder refusalMessages = new StringBuilder();
+        for (CharSequence message : _notAcceptingRoutableQueues.values())
+        {
+            if (refusalMessages.length() > 0)
+            {
+                refusalMessages.append(";");
+            }
+            refusalMessages.append(message);
+        }
+        return refusalMessages.toString();
     }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java Thu Feb 23 17:11:04 2017
@@ -23,5 +23,6 @@ package org.apache.qpid.server.model;
 public enum OverflowPolicy
 {
     NONE,
-    RING
+    RING,
+    PRODUCER_FLOW_CONTROL
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Thu Feb 23 17:11:04 2017
@@ -33,7 +33,6 @@ import org.apache.qpid.server.message.Me
 import org.apache.qpid.server.message.MessageInfo;
 import org.apache.qpid.server.message.MessageSource;
 import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.CapacityChecker;
 import org.apache.qpid.server.queue.BaseQueue;
 import org.apache.qpid.server.queue.NotificationCheck;
 import org.apache.qpid.server.queue.QueueConsumer;
@@ -50,7 +49,6 @@ public interface Queue<X extends Queue<X
                                                    Comparable<X>, ExchangeReferrer,
                                                    BaseQueue,
                                                    MessageSource,
-                                                   CapacityChecker,
                                                    MessageDestination,
                                                    Deletable<X>
 {
@@ -74,8 +72,7 @@ public interface Queue<X extends Queue<X
     String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts";
     String NO_LOCAL = "noLocal";
     String OWNER = "owner";
-    String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes";
-    String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes";
+
     String QUEUE_FLOW_STOPPED = "queueFlowStopped";
     String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl";
     String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
@@ -83,8 +80,8 @@ public interface Queue<X extends Queue<X
     String ENSURE_NONDESTRUCTIVE_CONSUMERS = "ensureNondestructiveConsumers";
     String HOLD_ON_PUBLISH_ENABLED = "holdOnPublishEnabled";
     String OVERFLOW_POLICY = "overflowPolicy";
-    String MAX_COUNT = "maxCount";
-    String MAX_SIZE = "maxSize";
+    String MAXIMUM_QUEUE_DEPTH_MESSAGES = "maximumQueueDepthMessages";
+    String MAXIMUM_QUEUE_DEPTH_BYTES = "maximumQueueDepthBytes";
 
     String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
     @SuppressWarnings("unused")
@@ -170,20 +167,12 @@ public interface Queue<X extends Queue<X
     @ManagedAttribute( defaultValue = "${queue.maximumDeliveryAttempts}")
     int getMaximumDeliveryAttempts();
 
+    String QUEUE_FLOW_RESUME_LIMIT = "queue.queueFlowResumeLimit";
     @SuppressWarnings("unused")
-    @ManagedContextDefault( name = "queue.queueFlowControlSizeBytes")
-    long DEFAULT_FLOW_CONTROL_SIZE_BYTES = 0L;
-
-    @ManagedAttribute( defaultValue = "${queue.queueFlowControlSizeBytes}")
-    long getQueueFlowControlSizeBytes();
-
-    @SuppressWarnings("unused")
-    @ManagedContextDefault( name = "queue.queueFlowResumeSizeBytes")
-    long DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES = 0L;
-
-    @ManagedAttribute( defaultValue = "${queue.queueFlowResumeSizeBytes}")
-    long getQueueFlowResumeSizeBytes();
-
+    @ManagedContextDefault( name = QUEUE_FLOW_RESUME_LIMIT,
+            description = "Percentage used to evaluate flow resume limit based on the values of attributes"
+                          + " 'maximumQueueDepthBytes' and 'maximumQueueDepthMessages'.")
+    double DEFAULT_FLOW_CONTROL_RESUME_LIMIT = 80.0;
 
     @SuppressWarnings("unused")
     @DerivedAttribute
@@ -261,25 +250,39 @@ public interface Queue<X extends Queue<X
                                      + "visible may depend on how frequently the virtual host housekeeping thread runs.")
     boolean isHoldOnPublishEnabled();
 
-    @ManagedContextDefault( name = "queue.defaultMaxCount", description = "Maximum count of messages in queue when policy_type set to Ring")
-    long DEFAULT_MAX_COUNT = 10;
-
-    @ManagedAttribute( defaultValue = "${queue.defaultMaxCount}", description = "Maximum count of messages in queue, when policy_type set to Ring")
-    long getMaxCount();
-
-    @ManagedContextDefault( name = "queue.defaultMaxSize", description = "Maximum size of messages in queue in bytes, when policy_type set to Ring")
-    long DEFAULT_MAX_SIZE = 1024;
-
-    @ManagedAttribute( defaultValue = "${queue.defaultMaxSize}", description = "Maximum size of messages in queue in bytes, when policy_type set to Ring")
-    long getMaxSize();
-
-    @SuppressWarnings("unused")
-    @ManagedContextDefault( name = "queue.defaultOverflowPolicy")
-    OverflowPolicy DEFAULT_POLICY_TYPE = OverflowPolicy.NONE;
-
-    @ManagedAttribute( defaultValue = "${queue.defaultOverflowPolicy}", description = "Queue overflow policy. Current options are Ring and None." +
-            " Ring overflow policy - when queue message count or size of messages in queue exceeds maximum, oldest message(s) is discarded." +
-            " None overflow policy - maximum size and maximum count properties are not applied.")
+    @ManagedContextDefault(name = "queue.defaultMaximumQueueDepthMessages",
+            description = "Maximum number of messages on queue allowed by overflow policy.")
+    long DEFAULT_MAXIMUM_QUEUE_DEPTH_MESSAGES = -1;
+
+    @ManagedAttribute(defaultValue = "${queue.defaultMaximumQueueDepthMessages}",
+            description = "Maximum number of messages on queue allowed by overflow policy."
+                          + " Negative value indicates that queue depth is unlimited. Default, -1.")
+    long getMaximumQueueDepthMessages();
+
+    @ManagedContextDefault(name = "queue.defaultMaximumQueueDepthBytes",
+            description = "Maximum number of bytes on queue allowed by overflow policy." )
+    long DEFAULT_MAXIMUM_QUEUE_DEPTH_BYTES = -1;
+
+    @ManagedAttribute(defaultValue = "${queue.defaultMaximumQueueDepthBytes}",
+            description = "Maximum number of bytes on queue allowed by overflow policy."
+                          + "  Negative value indicates that queue depth is unlimited. Default, -1.")
+    long getMaximumQueueDepthBytes();
+
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = "queue.defaultOverflowPolicy",
+            description = "Specifies the default value for queue overflow policy. ")
+    OverflowPolicy DEFAULT_OVERFLOW_POLICY = OverflowPolicy.NONE;
+
+    @ManagedAttribute(defaultValue = "${queue.defaultOverflowPolicy}",
+            description = "Queue overflow policy."
+                          + " Current options are ProducerFlowControl, Ring and None."
+                          + " ProducerFlowControl overflow policy - when queue message count or size of messages"
+                          + " in queue exceeds maximum, the producer is blocked until queue depth falls below "
+                          + " the resume threshold."
+                          + " Ring overflow policy - when queue message count or size of messages in queue exceeds"
+                          + " maximum, oldest messages are discarded."
+                          + " None overflow policy - maximum size and maximum count properties are not applied.",
+            mandatory = true)
     OverflowPolicy getOverflowPolicy();
 
     @ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
@@ -297,7 +300,6 @@ public interface Queue<X extends Queue<X
 
     int deleteAndReturnCount();
 
-    void deleteEntry(QueueEntry node);
 
     void setNotificationListener(QueueNotificationListener listener);
 
@@ -483,4 +485,9 @@ public interface Queue<X extends Queue<X
     boolean isHeld(QueueEntry queueEntry, final long evaluationTime);
 
     void checkCapacity();
+
+    void deleteEntry(QueueEntry entry);
+
+    QueueEntry getLesserOldestEntry();
+
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Feb 23 17:11:04 2017
@@ -182,18 +182,9 @@ public abstract class AbstractQueue<X ex
     private long _alertRepeatGap;
 
     @ManagedAttributeField
-    private long _queueFlowControlSizeBytes;
-
-    @ManagedAttributeField( afterSet = "checkCapacity" )
-    private long _queueFlowResumeSizeBytes;
-
-    @ManagedAttributeField
     private ExclusivityPolicy _exclusive;
 
     @ManagedAttributeField
-    private OverflowPolicy _overflowPolicy;
-
-    @ManagedAttributeField
     private MessageDurability _messageDurability;
 
     @ManagedAttributeField
@@ -211,9 +202,6 @@ public abstract class AbstractQueue<X ex
 
     private AtomicBoolean _stopped = new AtomicBoolean(false);
 
-    private final Set<AMQPSession<?, ?>> _blockedSessions =
-            Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?, ?>, Boolean>());
-
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
     private final SettableFuture<Integer> _deleteFuture = SettableFuture.create();
 
@@ -226,7 +214,6 @@ public abstract class AbstractQueue<X ex
     @ManagedAttributeField
     private boolean _noLocal;
 
-    private final AtomicBoolean _overfull = new AtomicBoolean(false);
     private final FlowToDiskChecker _flowToDiskChecker = new FlowToDiskChecker();
     private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<>();
     private Map<String, Object> _arguments;
@@ -259,10 +246,13 @@ public abstract class AbstractQueue<X ex
     private boolean _ensureNondestructiveConsumers;
     @ManagedAttributeField
     private volatile boolean _holdOnPublishEnabled;
+
+    @ManagedAttributeField
+    private OverflowPolicy _overflowPolicy;
     @ManagedAttributeField
-    private long _maxCount;
+    private long _maximumQueueDepthMessages;
     @ManagedAttributeField
-    private long _maxSize;
+    private long _maximumQueueDepthBytes;
 
     private static final int RECOVERING = 1;
     private static final int COMPLETING_RECOVERY = 2;
@@ -279,6 +269,7 @@ public abstract class AbstractQueue<X ex
     private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
     private AdvanceConsumersTask _queueHouseKeepingTask;
     private volatile int _bindingCount;
+    private volatile OverflowPolicyHandler _overflowPolicyHandler;
 
     private interface HoldMethod
     {
@@ -336,9 +327,10 @@ public abstract class AbstractQueue<X ex
     public void onValidate()
     {
         super.onValidate();
-        if (_queueFlowResumeSizeBytes > _queueFlowControlSizeBytes)
+        Double flowResumeLimit = getContextValue(Double.class, QUEUE_FLOW_RESUME_LIMIT);
+        if (flowResumeLimit != null && (flowResumeLimit < 0.0 || flowResumeLimit > 100.0))
         {
-            throw new IllegalConfigurationException("Flow resume size can't be greater than flow control size");
+            throw new IllegalConfigurationException("Flow resume limit value cannot be greater than 100 or lower than 0");
         }
     }
 
@@ -361,6 +353,8 @@ public abstract class AbstractQueue<X ex
         _estimatedMinimumMemoryFootprint = getContextValue(Long.class, QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT);
         _estimatedMessageMemoryOverhead = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD);
 
+        _overflowPolicyHandler = createOverflowPolicyHandler(getOverflowPolicy());
+
         _queueHouseKeepingTask = new AdvanceConsumersTask();
         Subject activeSubject = Subject.getSubject(AccessController.getContext());
         Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
@@ -680,15 +674,15 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
-    public long getMaxCount()
+    public long getMaximumQueueDepthMessages()
     {
-        return _maxCount;
+        return _maximumQueueDepthMessages;
     }
 
     @Override
-    public long getMaxSize()
+    public long getMaximumQueueDepthBytes()
     {
-        return _maxSize;
+        return _maximumQueueDepthBytes;
     }
 
     @Override
@@ -1158,7 +1152,6 @@ public abstract class AbstractQueue<X ex
 
     protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
     {
-        applyOverflowPolicy(message);
         final QueueEntry entry = getEntries().add(message, enqueueRecord);
         updateExpiration(entry);
 
@@ -1178,35 +1171,11 @@ public abstract class AbstractQueue<X ex
             {
                 action.performAction(entry);
             }
+            _overflowPolicyHandler.checkOverflow();
         }
 
     }
 
-    private void applyOverflowPolicy(final ServerMessage message)
-    {
-        if (getOverflowPolicy() != null)
-        {
-            switch (getOverflowPolicy())
-            {
-                case RING:
-                    while (getQueueDepthMessages() == getMaxCount())
-                    {
-                        QueueEntry entry = getEntries().getOldestEntry();
-                        deleteEntry(entry);
-                    }
-                    while (getQueueDepthBytesIncludingHeader() + message.getSizeIncludingHeader() > getMaxSize())
-                    {
-                        QueueEntry entry = getEntries().getOldestEntry();
-                        deleteEntry(entry);
-                    }
-                    break;
-                case NONE:
-                default:
-                    break;
-            }
-        }
-    }
-
     private void updateExpiration(final QueueEntry entry)
     {
         long expiration = entry.getMessage().getExpiration();
@@ -1671,6 +1640,7 @@ public abstract class AbstractQueue<X ex
                     });
     }
 
+    @Override
     public void deleteEntry(final QueueEntry entry)
     {
         boolean acquiredForDequeueing = entry.acquireOrSteal(new Runnable()
@@ -1684,8 +1654,7 @@ public abstract class AbstractQueue<X ex
 
         if(acquiredForDequeueing)
         {
-            _logger.debug("Dequeuing expired node {}", entry);
-            // Then dequeue it.
+            _logger.debug("Dequeuing node {}", entry);
             dequeueEntry(entry);
         }
     }
@@ -1804,64 +1773,9 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
-    public void checkCapacity(AMQPSession<?,?> session)
-    {
-        if(_queueFlowControlSizeBytes != 0L)
-        {
-            if(_queueStatistics.getQueueSize() > _queueFlowControlSizeBytes)
-            {
-                _overfull.set(true);
-                //Overfull log message
-                getEventLogger().message(_logSubject, QueueMessages.OVERFULL(_queueStatistics.getQueueSize(),
-                                                                             _queueFlowControlSizeBytes));
-
-                _blockedSessions.add(session);
-
-                session.block(this);
-
-                if(_queueStatistics.getQueueSize() <= _queueFlowResumeSizeBytes)
-                {
-
-                    //Underfull log message
-                    getEventLogger().message(_logSubject,
-                                             QueueMessages.UNDERFULL(_queueStatistics.getQueueSize(), _queueFlowResumeSizeBytes));
-
-                   session.unblock(this);
-                   _blockedSessions.remove(session);
-
-                }
-            }
-
-
-
-        }
-    }
-
-    @Override
     public void checkCapacity()
     {
-        if(getEntries() != null)
-        {
-            if (_queueFlowControlSizeBytes != 0L)
-            {
-                if (_overfull.get() && _queueStatistics.getQueueSize() <= _queueFlowResumeSizeBytes)
-                {
-                    if (_overfull.compareAndSet(true, false))
-                    {
-                        //Underfull log message
-                        getEventLogger().message(_logSubject,
-                                                 QueueMessages.UNDERFULL(_queueStatistics.getQueueSize(),
-                                                                         _queueFlowResumeSizeBytes));
-                    }
-
-                    for (final AMQPSession<?,?> blockedSession : _blockedSessions)
-                    {
-                        blockedSession.unblock(this);
-                        _blockedSessions.remove(blockedSession);
-                    }
-                }
-            }
-        }
+        _overflowPolicyHandler.checkOverflow();
     }
 
     void notifyConsumers(QueueEntry entry)
@@ -2289,16 +2203,6 @@ public abstract class AbstractQueue<X ex
         return _alertThresholdMessageSize;
     }
 
-    public long getQueueFlowControlSizeBytes()
-    {
-        return _queueFlowControlSizeBytes;
-    }
-
-    public long getQueueFlowResumeSizeBytes()
-    {
-        return _queueFlowResumeSizeBytes;
-    }
-
     public Set<NotificationCheck> getNotificationChecks()
     {
         return _notificationChecks;
@@ -2636,32 +2540,15 @@ public abstract class AbstractQueue<X ex
             throw new VirtualHostUnavailableException(this._virtualHost);
         }
         RoutingResult<M> result = new RoutingResult<>(message);
-
-        if(_overflowPolicy == OverflowPolicy.RING && message.getSizeIncludingHeader() > getMaxSize())
+        if (!message.isResourceAcceptable(this))
         {
-            result.addRoutingFailure(506, "amqp:resource-limit-exceeded", "Message larger than configured maximum depth on " + this.getName()
-                    + ": size=" + message.getSizeIncludingHeader() + ", max-size=" + getMaxSize());
+            result.addNotAcceptingRoutableQueue(this, String.format("Not accepted by queue '%s'", getName()));
         }
-        else if(_overflowPolicy == OverflowPolicy.RING && _maxCount == 0)
+        else if (message.isReferenced(this))
         {
-            result.addRoutingFailure(506, "amqp:resource-limit-exceeded", "Queue '" + getName() + "' maximum count (0) prevents this message from being accepted");
+            result.addNotAcceptingRoutableQueue(this, String.format("Already enqueued on queue '%s'", getName()));
         }
-        else if (this instanceof PriorityQueue && message.isResourceAcceptable(this) && !message.isReferenced(this))
-        {
-            if (getMaxCount() == getQueueDepthMessages())
-            {
-                QueueEntry entry = getEntries().getOldestEntry();
-                if(entry.getMessage().getMessageHeader().getPriority() <= message.getMessageHeader().getPriority())
-                {
-                    result.addQueue(this);
-                }
-            }
-            else
-            {
-                result.addQueue(this);
-            }
-        }
-        else if(message.isResourceAcceptable(this) && !message.isReferenced(this))
+        else
         {
             result.addQueue(this);
         }
@@ -2992,7 +2879,11 @@ public abstract class AbstractQueue<X ex
     @Override
     public boolean isQueueFlowStopped()
     {
-        return _overfull.get();
+        if (_overflowPolicyHandler instanceof ProducerFlowControlOverflowPolicyHandler)
+        {
+            return ((ProducerFlowControlOverflowPolicyHandler)_overflowPolicyHandler).isQueueFlowStopped();
+        }
+        return false;
     }
 
     @Override
@@ -3008,7 +2899,7 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
-    public boolean changeAttribute(String name, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
+    protected boolean changeAttribute(String name, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
     {
         if(EXCLUSIVE.equals(name))
         {
@@ -3037,14 +2928,39 @@ public abstract class AbstractQueue<X ex
 
     }
 
+    @Override
+    protected void changeAttributes(Map<String,Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
+    {
+        OverflowPolicy oldOverflowPolicy = getOverflowPolicy();
+        super.changeAttributes(attributes);
+
+        OverflowPolicy newOverflowPolicy = getOverflowPolicy();
+        if (oldOverflowPolicy != newOverflowPolicy)
+        {
+            _overflowPolicyHandler = createOverflowPolicyHandler(newOverflowPolicy);
+            _overflowPolicyHandler.checkOverflow();
+        }
+    }
+
+    private OverflowPolicyHandler createOverflowPolicyHandler(final OverflowPolicy overflowPolicy)
+    {
+        OverflowPolicyHandlerFactory factory =
+                new QpidServiceLoader().getInstancesByType(OverflowPolicyHandlerFactory.class)
+                                       .get(String.valueOf(overflowPolicy));
+        if (factory == null)
+        {
+            throw new IllegalStateException(String.format("Factory for overflow policy '%s' is not found",
+                                                          overflowPolicy.name()));
+        }
+        return factory.create(this, getEventLogger());
+    }
+
     private static final String[] NON_NEGATIVE_NUMBERS = {
         ALERT_REPEAT_GAP,
         ALERT_THRESHOLD_MESSAGE_AGE,
         ALERT_THRESHOLD_MESSAGE_SIZE,
         ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
         ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
-        QUEUE_FLOW_CONTROL_SIZE_BYTES,
-        QUEUE_FLOW_RESUME_SIZE_BYTES,
         MAXIMUM_DELIVERY_ATTEMPTS
     };
 
@@ -3053,12 +2969,6 @@ public abstract class AbstractQueue<X ex
     {
         super.validateChange(proxyForValidation, changedAttributes);
         Queue<?> queue = (Queue) proxyForValidation;
-        long queueFlowControlSize = queue.getQueueFlowControlSizeBytes();
-        long queueFlowControlResumeSize = queue.getQueueFlowResumeSizeBytes();
-        if (queueFlowControlResumeSize > queueFlowControlSize)
-        {
-            throw new IllegalConfigurationException("Flow resume size can't be greater than flow control size");
-        }
 
         for (String attrName : NON_NEGATIVE_NUMBERS)
         {
@@ -3255,6 +3165,12 @@ public abstract class AbstractQueue<X ex
         return messageFinder.getMessageInfo();
     }
 
+    @Override
+    public QueueEntry getLesserOldestEntry()
+    {
+        return getEntries().getLesserOldestEntry();
+    }
+
     private class MessageFinder implements QueueEntryVisitor
     {
         private final long _messageNumber;

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java Thu Feb 23 17:11:04 2017
@@ -146,6 +146,12 @@ public class LastValueQueueList extends
         return addedEntry;
     }
 
+    @Override
+    public QueueEntry getLesserOldestEntry()
+    {
+        return getOldestEntry();
+    }
+
     /**
      * Returns:
      *

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,30 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+public class NoneOverflowPolicyHandler implements OverflowPolicyHandler
+{
+    @Override
+    public void checkOverflow()
+    {
+        // noop
+    }
+
+}

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,43 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.PluggableService;
+
+@PluggableService
+public class NoneOverflowPolicyHandlerFactory implements OverflowPolicyHandlerFactory
+{
+
+    @Override
+    public String getType()
+    {
+        return OverflowPolicy.NONE.name();
+    }
+
+    @Override
+    public OverflowPolicyHandler create(final Queue<?> queue,
+                                        final EventLogger eventLogger)
+    {
+        return new NoneOverflowPolicyHandler();
+    }
+}

Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java (from r1784177, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java&r1=1784177&r2=1784178&rev=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java Thu Feb 23 17:11:04 2017
@@ -18,11 +18,10 @@
  * under the License.
  *
  */
-package org.apache.qpid.server.protocol;
 
-import org.apache.qpid.server.session.AMQPSession;
+package org.apache.qpid.server.queue;
 
-public interface CapacityChecker
+public interface OverflowPolicyHandler
 {
-    void checkCapacity(AMQPSession<?,?> channel);
+    void checkOverflow();
 }

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,29 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.Pluggable;
+
+public interface OverflowPolicyHandlerFactory extends Pluggable
+{
+    OverflowPolicyHandler create(Queue<?> queue, final EventLogger eventLogger);
+}

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Thu Feb 23 17:11:04 2017
@@ -207,13 +207,27 @@ abstract public class PriorityQueueList
             for(PriorityQueueEntrySubList subList : _priorityLists)
             {
                 QueueEntry subListOldest = subList.getOldestEntry();
-                if (subListOldest != null)
+                if(oldest == null || (subListOldest != null && subListOldest.getMessage().getMessageNumber() < oldest.getMessage().getMessageNumber()))
                 {
-                    return subListOldest;
+                    oldest = subListOldest;
                 }
             }
             return oldest;
         }
+
+        @Override
+        public QueueEntry getLesserOldestEntry()
+        {
+            for(PriorityQueueEntrySubList subList : _priorityLists)
+            {
+                QueueEntry subListLast = subList.getLesserOldestEntry();
+                if(subListLast != null)
+                {
+                    return subListLast;
+                }
+            }
+            return null;
+        }
     }
 
     static class PriorityQueueEntrySubList extends PriorityQueueList
@@ -245,6 +259,12 @@ abstract public class PriorityQueueList
         {
             return _listPriority;
         }
+
+        @Override
+        public QueueEntry getLesserOldestEntry()
+        {
+            return getOldestEntry();
+        }
     }
 
     static class PriorityQueueEntry extends OrderedQueueEntry

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,243 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import java.security.AccessController;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.security.auth.Subject;
+
+import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.session.AMQPSession;
+
+public class ProducerFlowControlOverflowPolicyHandler implements OverflowPolicyHandler
+{
+    private final Handler _handler;
+
+    ProducerFlowControlOverflowPolicyHandler(Queue<?> queue, EventLogger eventLogger)
+    {
+        _handler = new Handler(queue, eventLogger);
+        queue.addChangeListener(_handler);
+    }
+
+    boolean isQueueFlowStopped()
+    {
+        return _handler.isQueueFlowStopped();
+    }
+
+    @Override
+    public void checkOverflow()
+    {
+        _handler.checkOverflow();
+    }
+
+    private static class Handler extends AbstractConfigurationChangeListener implements OverflowPolicyHandler
+    {
+        private final Queue<?> _queue;
+        private final EventLogger _eventLogger;
+        private final AtomicBoolean _overfull = new AtomicBoolean(false);
+        private final Set<AMQPSession<?, ?>> _blockedSessions =
+                Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?, ?>, Boolean>());
+        private volatile double _queueFlowResumeLimit;
+        private boolean _checkCapacity;
+
+        private Handler(final Queue<?> queue, final EventLogger eventLogger)
+        {
+            _queue = queue;
+            _eventLogger = eventLogger;
+            Double value = _queue.getContextValue(Double.class, Queue.QUEUE_FLOW_RESUME_LIMIT);
+            if (value != null)
+            {
+                _queueFlowResumeLimit = value;
+            }
+        }
+
+        @Override
+        public void checkOverflow()
+        {
+            long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
+            long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages();
+            if (maximumQueueDepthBytes >= 0L || maximumQueueDepthMessages >= 0L)
+            {
+                checkOverfull(maximumQueueDepthBytes, maximumQueueDepthMessages);
+            }
+
+            checkUnderfull(maximumQueueDepthBytes, maximumQueueDepthMessages);
+        }
+
+        @Override
+        public void attributeSet(final ConfiguredObject<?> object,
+                                 final String attributeName,
+                                 final Object oldAttributeValue,
+                                 final Object newAttributeValue)
+        {
+            super.attributeSet(object, attributeName, oldAttributeValue, newAttributeValue);
+            if (Queue.CONTEXT.equals(attributeName))
+            {
+                Double value = _queue.getContextValue(Double.class, Queue.QUEUE_FLOW_RESUME_LIMIT);
+                double queueFlowResumePercentage = value == null ? 0 : value;
+                if (queueFlowResumePercentage != _queueFlowResumeLimit)
+                {
+                    _queueFlowResumeLimit = queueFlowResumePercentage;
+                    _checkCapacity = true;
+                }
+            }
+            if (Queue.MAXIMUM_QUEUE_DEPTH_BYTES.equals(attributeName)
+                || Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES.equals(attributeName))
+            {
+                _checkCapacity = true;
+            }
+        }
+
+        @Override
+        public void bulkChangeEnd(final ConfiguredObject<?> object)
+        {
+            super.bulkChangeEnd(object);
+            if (_queue.getOverflowPolicy() == OverflowPolicy.PRODUCER_FLOW_CONTROL)
+            {
+                if (_checkCapacity)
+                {
+                    _checkCapacity = false;
+                    checkUnderfull(_queue.getMaximumQueueDepthBytes(), _queue.getMaximumQueueDepthMessages());
+                }
+            }
+            else
+            {
+                _queue.removeChangeListener(this);
+                checkUnderfull(-1, -1);
+
+                if (_overfull.compareAndSet(true, false))
+                {
+                    _eventLogger.message(_queue.getLogSubject(),
+                                         QueueMessages.UNDERFULL(_queue.getQueueDepthBytes(),
+                                                                 getFlowResumeLimit((long) -1),
+                                                                 (long) _queue.getQueueDepthMessages(),
+                                                                 getFlowResumeLimit((long) -1)));
+                }
+
+                for (final AMQPSession<?, ?> blockedSession : _blockedSessions)
+                {
+                    blockedSession.unblock(_queue);
+                    _blockedSessions.remove(blockedSession);
+                }
+            }
+        }
+
+        boolean isQueueFlowStopped()
+        {
+            return _overfull.get();
+        }
+
+        private void checkUnderfull(long maximumQueueDepthBytes, long maximumQueueDepthMessages)
+        {
+            if (_overfull.get())
+            {
+                long queueDepthBytes = _queue.getQueueDepthBytes();
+                long queueDepthMessages = _queue.getQueueDepthMessages();
+
+                if (isUnderfull(queueDepthBytes, maximumQueueDepthBytes)
+                    && isUnderfull(queueDepthMessages, maximumQueueDepthMessages))
+                {
+                    if (_overfull.compareAndSet(true, false))
+                    {
+                        _eventLogger.message(_queue.getLogSubject(),
+                                             QueueMessages.UNDERFULL(queueDepthBytes,
+                                                                     getFlowResumeLimit(maximumQueueDepthBytes),
+                                                                     queueDepthMessages,
+                                                                     getFlowResumeLimit(maximumQueueDepthMessages)));
+                    }
+
+                    for (final AMQPSession<?, ?> blockedSession : _blockedSessions)
+                    {
+                        blockedSession.unblock(_queue);
+                        _blockedSessions.remove(blockedSession);
+                    }
+                }
+            }
+        }
+
+        private void checkOverfull(final long maximumQueueDepthBytes, final long maximumQueueDepthMessages)
+        {
+            final long queueDepthBytes = _queue.getQueueDepthBytes();
+            final long queueDepthMessages = _queue.getQueueDepthMessages();
+
+            if ((maximumQueueDepthBytes >= 0L && queueDepthBytes > maximumQueueDepthBytes) ||
+                (maximumQueueDepthMessages >= 0L && queueDepthMessages > maximumQueueDepthMessages))
+            {
+                Subject subject = Subject.getSubject(AccessController.getContext());
+                Set<SessionPrincipal> sessionPrincipals = subject.getPrincipals(SessionPrincipal.class);
+                if (!sessionPrincipals.isEmpty())
+                {
+                    SessionPrincipal sessionPrincipal = sessionPrincipals.iterator().next();
+                    if (sessionPrincipal != null)
+                    {
+
+                        if (_overfull.compareAndSet(false, true))
+                        {
+                            _eventLogger.message(_queue.getLogSubject(),
+                                                 QueueMessages.OVERFULL(queueDepthBytes,
+                                                                        maximumQueueDepthBytes,
+                                                                        queueDepthMessages,
+                                                                        maximumQueueDepthMessages));
+                        }
+
+                        final AMQPSession<?, ?> session = sessionPrincipal.getSession();
+                        _blockedSessions.add(session);
+                        session.block(_queue);
+
+                        if (isUnderfull(queueDepthBytes, maximumQueueDepthBytes)
+                            && isUnderfull(queueDepthMessages, maximumQueueDepthMessages))
+                        {
+
+                            session.unblock(_queue);
+                            _blockedSessions.remove(session);
+                        }
+                    }
+                }
+            }
+        }
+
+        private boolean isUnderfull(final long queueDepth,
+                                    final long maximumQueueDepth)
+        {
+            return maximumQueueDepth < 0 || queueDepth <= getFlowResumeLimit(maximumQueueDepth);
+        }
+
+        private long getFlowResumeLimit(final long maximumQueueDepth)
+        {
+            if (maximumQueueDepth >= 0)
+            {
+                return (long) (_queueFlowResumeLimit / 100.0 * maximumQueueDepth);
+            }
+            return -1;
+        }
+    }
+
+}
+

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,42 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.PluggableService;
+
+@PluggableService
+public class ProducerFlowControlOverflowPolicyHandlerFactory implements OverflowPolicyHandlerFactory
+{
+    @Override
+    public String getType()
+    {
+        return OverflowPolicy.PRODUCER_FLOW_CONTROL.name();
+    }
+
+    @Override
+    public OverflowPolicyHandler create(final Queue<?> queue,
+                                        final EventLogger eventLogger)
+    {
+        return new ProducerFlowControlOverflowPolicyHandler(queue, eventLogger);
+    }
+}

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java Thu Feb 23 17:11:04 2017
@@ -27,6 +27,7 @@ import java.util.Map;
 import org.apache.qpid.server.model.ConfiguredObject;
 import org.apache.qpid.server.model.OverflowPolicy;
 import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
 
 public class QueueArgumentsConverter
@@ -92,8 +93,7 @@ public class QueueArgumentsConverter
 
         ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_DELIVERY_COUNT, Queue.MAXIMUM_DELIVERY_ATTEMPTS);
 
-        ATTRIBUTE_MAPPINGS.put(X_QPID_CAPACITY, Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES);
-        ATTRIBUTE_MAPPINGS.put(X_QPID_FLOW_RESUME_CAPACITY, Queue.QUEUE_FLOW_RESUME_SIZE_BYTES);
+        ATTRIBUTE_MAPPINGS.put(X_QPID_CAPACITY, Queue.MAXIMUM_QUEUE_DEPTH_BYTES);
 
         ATTRIBUTE_MAPPINGS.put(QPID_QUEUE_SORT_KEY, SortedQueue.SORT_KEY);
         ATTRIBUTE_MAPPINGS.put(QPID_LAST_VALUE_QUEUE_KEY, LastValueQueue.LVQ_KEY);
@@ -116,8 +116,8 @@ public class QueueArgumentsConverter
         ATTRIBUTE_MAPPINGS.put(QPID_LIFETIME_POLICY, Queue.LIFETIME_POLICY);
 
         ATTRIBUTE_MAPPINGS.put(QPID_POLICY_TYPE, Queue.OVERFLOW_POLICY);
-        ATTRIBUTE_MAPPINGS.put(QPID_MAX_COUNT, Queue.MAX_COUNT);
-        ATTRIBUTE_MAPPINGS.put(QPID_MAX_SIZE, Queue.MAX_SIZE);
+        ATTRIBUTE_MAPPINGS.put(QPID_MAX_COUNT, Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES);
+        ATTRIBUTE_MAPPINGS.put(QPID_MAX_SIZE, Queue.MAXIMUM_QUEUE_DEPTH_BYTES);
     }
 
 
@@ -141,6 +141,7 @@ public class QueueArgumentsConverter
             {
                 modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.valueOf(String.valueOf(wireArguments.get(QPID_POLICY_TYPE)).toUpperCase()));
             }
+
             if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP))
             {
                 modelArguments.put(Queue.MESSAGE_GROUP_SHARED_GROUPS,
@@ -156,6 +157,26 @@ public class QueueArgumentsConverter
                 modelArguments.put(Queue.NO_LOCAL, Boolean.parseBoolean(wireArguments.get(QPID_NO_LOCAL).toString()));
             }
 
+            if (wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY) != null && wireArguments.get(X_QPID_CAPACITY) != null)
+            {
+                double resumeCapacity = Integer.parseInt(wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY).toString());
+                double maximumCapacity = Integer.parseInt(wireArguments.get(X_QPID_CAPACITY).toString());
+                if (resumeCapacity > maximumCapacity)
+                {
+                    throw new ConnectionScopedRuntimeException(
+                            "Flow resume size can't be greater than flow control size");
+                }
+                Map<String, String> context = (Map<String, String>) modelArguments.get(Queue.CONTEXT);
+                if (context == null)
+                {
+                    context = new HashMap<>();
+                    modelArguments.put(Queue.CONTEXT, context);
+                }
+                double ratio = resumeCapacity / maximumCapacity;
+                context.put(Queue.QUEUE_FLOW_RESUME_LIMIT, String.format("%.2f", ratio * 100.0));
+                modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL);
+            }
+
         }
         return modelArguments;
     }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Thu Feb 23 17:11:04 2017
@@ -46,4 +46,6 @@ interface QueueEntryList
 
     void updateStatsOnStateChange(QueueEntry entry, QueueEntry.EntryState fromState, QueueEntry.EntryState toState);
 
+    QueueEntry getLesserOldestEntry();
+
 }

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,91 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.model.Queue;
+
+public class RingOverflowPolicyHandler implements OverflowPolicyHandler
+{
+    private final Queue<?> _queue;
+    private final EventLogger _eventLogger;
+
+    RingOverflowPolicyHandler(Queue<?> queue, final EventLogger eventLogger)
+    {
+        _queue = queue;
+        _eventLogger = eventLogger;
+    }
+
+    @Override
+    public void checkOverflow()
+    {
+        final long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages();
+        final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
+
+        boolean bytesOverflow, messagesOverflow, overflow = false;
+        int counter = 0;
+        int queueDepthMessages;
+        long queueDepthBytes;
+        do
+        {
+            queueDepthMessages = _queue.getQueueDepthMessages();
+            queueDepthBytes = _queue.getQueueDepthBytesIncludingHeader();
+
+            messagesOverflow = maximumQueueDepthMessages >= 0 && queueDepthMessages > maximumQueueDepthMessages;
+            bytesOverflow = maximumQueueDepthBytes >= 0 && queueDepthBytes > maximumQueueDepthBytes;
+
+            if (bytesOverflow || messagesOverflow)
+            {
+                if (!overflow)
+                {
+                    overflow = true;
+                }
+
+                QueueEntry entry = _queue.getLesserOldestEntry();
+
+                if (entry != null)
+                {
+                    counter++;
+                    _queue.deleteEntry(entry);
+                }
+                else
+                {
+                    queueDepthMessages = _queue.getQueueDepthMessages();
+                    queueDepthBytes = _queue.getQueueDepthBytesIncludingHeader();
+                    break;
+                }
+            }
+        }
+        while (bytesOverflow || messagesOverflow);
+
+        if (overflow)
+        {
+            _eventLogger.message(_queue.getLogSubject(),
+                                 QueueMessages.DROPPED(
+                                         counter,
+                                         queueDepthBytes,
+                                         queueDepthMessages,
+                                         maximumQueueDepthBytes,
+                                         maximumQueueDepthMessages));
+        }
+    }
+
+}

Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,43 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.PluggableService;
+
+@PluggableService
+public class RingOverflowPolicyHandlerFactory implements OverflowPolicyHandlerFactory
+{
+
+    @Override
+    public String getType()
+    {
+        return OverflowPolicy.RING.name();
+    }
+
+    @Override
+    public OverflowPolicyHandler create(final Queue<?> queue,
+                                        final EventLogger eventLogger)
+    {
+        return new RingOverflowPolicyHandler(queue, eventLogger);
+    }
+}

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Thu Feb 23 17:11:04 2017
@@ -464,6 +464,28 @@ public class SortedQueueEntryList extend
         return 0;
     }
 
+    @Override
+    public QueueEntry getLesserOldestEntry()
+    {
+        SortedQueueEntry lastNode = null;
+        QueueEntryIterator iterator = iterator();
+        while (iterator.advance())
+        {
+            QueueEntry node = iterator.getNode();
+            if (node != null && !node.isDeleted())
+            {
+                SortedQueueEntry sortedQueueEntry = (SortedQueueEntry)node;
+                if (lastNode == null
+                    || (lastNode.getKey() != null && !lastNode.getKey().equals(sortedQueueEntry.getKey()))
+                    || (lastNode.getKey() == null && sortedQueueEntry.getKey() != null) )
+                {
+                    lastNode = sortedQueueEntry;
+                }
+            }
+        }
+        return lastNode;
+    }
+
     /**
      * Swaps the position of the node in the tree with it's successor
      * (that is the node with the next highest key)

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java Thu Feb 23 17:11:04 2017
@@ -47,4 +47,10 @@ public class StandardQueueEntryList exte
         return new StandardQueueEntry(this, message, enqueueRecord);
     }
 
+
+    @Override
+    public QueueEntry getLesserOldestEntry()
+    {
+        return getOldestEntry();
+    }
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java Thu Feb 23 17:11:04 2017
@@ -607,16 +607,89 @@ public class VirtualHostStoreUpgraderAnd
             }
             else if("Queue".equals(record.getType()))
             {
-                _queues.put(record.getId(), (String) record.getAttributes().get("name"));
-                if(record.getAttributes().containsKey("bindings"))
+                Map<String, Object> attributes = new HashMap<>(record.getAttributes());
+                Object queueFlowControlSizeBytes = attributes.remove("queueFlowControlSizeBytes");
+                Object queueFlowResumeSizeBytes = attributes.remove("queueFlowResumeSizeBytes");
+                if (queueFlowControlSizeBytes != null)
                 {
-                    _queueBindings.put(String.valueOf(record.getAttributes().get("name")),
-                                       (List<Map<String, Object>>) record.getAttributes().get("bindings"));
-                    Map<String, Object> updatedAttributes = new HashMap<>(record.getAttributes());
-                    updatedAttributes.remove("bindings");
-                    getUpdateMap().put(record.getId(), new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()));
+                    long queueFlowControlSizeBytesValue = convertAttributeValueToLong("queueFlowControlSizeBytes",
+                                                                                      queueFlowControlSizeBytes);
+                    if (queueFlowControlSizeBytesValue > 0)
+                    {
+                        if (queueFlowResumeSizeBytes != null)
+                        {
+                            long queueFlowResumeSizeBytesValue =
+                                    convertAttributeValueToLong("queueFlowResumeSizeBytes", queueFlowResumeSizeBytes);
+                            double ratio = ((double) queueFlowResumeSizeBytesValue)
+                                           / ((double) queueFlowControlSizeBytesValue);
+                            String flowResumeLimit = String.format("%.2f", ratio * 100.0);
+
+                            Object context = attributes.get("context");
+                            Map<String, String> contextMap;
+                            if (context instanceof Map)
+                            {
+                                contextMap = (Map) context;
+                            }
+                            else
+                            {
+                                contextMap = new HashMap<>();
+                                attributes.put("context", contextMap);
+                            }
+                            contextMap.put("queue.queueFlowResumeLimit", flowResumeLimit);
+                        }
+                        attributes.put("overflowPolicy", "ProducerFlowControl");
+                        attributes.put("maximumQueueDepthBytes", queueFlowControlSizeBytes);
+                    }
+                }
+
+                if(attributes.containsKey("bindings"))
+                {
+                    _queueBindings.put(String.valueOf(attributes.get("name")),
+                                       (List<Map<String, Object>>) attributes.get("bindings"));
+                    attributes.remove("bindings");
+                }
+
+                _queues.put(record.getId(), (String) attributes.get("name"));
+
+                if (!attributes.equals(new HashMap<>(record.getAttributes())))
+                {
+                    getUpdateMap().put(record.getId(),
+                                       new ConfiguredObjectRecordImpl(record.getId(),
+                                                                      record.getType(),
+                                                                      attributes,
+                                                                      record.getParents()));
+                }
+            }
+        }
+
+        private long convertAttributeValueToLong(final String attributeName,
+                                                 final Object attributeValue)
+        {
+            long value;
+            if (attributeValue instanceof Number)
+            {
+                value = ((Number) attributeValue).longValue();
+            }
+            else if (attributeValue instanceof String)
+            {
+                try
+                {
+                    value = Long.parseLong((String) attributeValue);
+                }
+                catch (Exception e)
+                {
+                    throw new IllegalConfigurationException(String.format(
+                            "Cannot evaluate '%s': %s",
+                            attributeName, attributeValue));
                 }
             }
+            else
+            {
+                throw new IllegalConfigurationException(String.format("Cannot evaluate '%s': %s",
+                                                                      attributeName,
+                                                                      String.valueOf(attributeValue)));
+            }
+            return value;
         }
 
         @Override

Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Thu Feb 23 17:11:04 2017
@@ -845,8 +845,8 @@ abstract class AbstractQueueTestBase ext
     public void testNoneOverflowPolicy()
     {
         Map<String,Object> attributes = new HashMap<>(_arguments);
-        attributes.put(Queue.MAX_COUNT, 2);
-        attributes.put(Queue.MAX_SIZE, 100);
+        attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 2);
+        attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 100);
 
         Queue<?> queue = getQueue();
         queue.setAttributes(attributes);
@@ -865,19 +865,13 @@ abstract class AbstractQueueTestBase ext
         assertEquals("Wrong size of messages in queue",300, queue.getQueueDepthBytesIncludingHeader());
         assertEquals("Wrong oldest message", 10l,
                 ((AbstractQueue) queue).getEntries().getOldestEntry().getMessage().getArrivalTime());
-        queue.clearQueue();
-
-        attributes = new HashMap<>(_arguments);
-        attributes.put(Queue.MAX_COUNT, Queue.DEFAULT_MAX_COUNT);
-        attributes.put(Queue.MAX_SIZE, Queue.DEFAULT_MAX_SIZE);
-        queue.setAttributes(attributes);
     }
 
     public void testRingOverflowPolicyMaxCount()
     {
         Map<String,Object> attributes = new HashMap<>(_arguments);
         attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING);
-        attributes.put(Queue.MAX_COUNT, 4);
+        attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 4);
 
         Queue<?> queue = getQueue();
         queue.setAttributes(attributes);
@@ -902,20 +896,14 @@ abstract class AbstractQueueTestBase ext
         assertEquals("Wrong size of messages in queue",80, queue.getQueueDepthBytesIncludingHeader());
         assertEquals("Wrong oldest message", 50l,
                 ((AbstractQueue) queue).getEntries().getOldestEntry().getMessage().getArrivalTime());
-        queue.clearQueue();
-
-        attributes = new HashMap<>(_arguments);
-        attributes.put(Queue.OVERFLOW_POLICY, Queue.DEFAULT_POLICY_TYPE);
-        attributes.put(Queue.MAX_COUNT, Queue.DEFAULT_MAX_COUNT);
-        queue.setAttributes(attributes);
     }
 
     public void testRingOverflowPolicyMaxSize()
     {
         Map<String,Object> attributes = new HashMap<>(_arguments);
         attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING);
-        attributes.put(Queue.MAX_COUNT, 4);
-        attributes.put(Queue.MAX_SIZE, 100);
+        attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 4);
+        attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 100);
 
         Queue<?> queue = getQueue();
         queue.setAttributes(attributes);
@@ -944,20 +932,13 @@ abstract class AbstractQueueTestBase ext
         assertEquals("Wrong size of messages in queue",90, queue.getQueueDepthBytesIncludingHeader());
         assertEquals("Wrong oldest message", 200l,
                 ((AbstractQueue) queue).getEntries().getOldestEntry().getMessage().getArrivalTime());
-        queue.clearQueue();
-
-        attributes = new HashMap<>(_arguments);
-        attributes.put(Queue.OVERFLOW_POLICY, Queue.DEFAULT_POLICY_TYPE);
-        attributes.put(Queue.MAX_COUNT, Queue.DEFAULT_MAX_COUNT);
-        attributes.put(Queue.MAX_SIZE, Queue.DEFAULT_MAX_SIZE);
-        queue.setAttributes(attributes);
     }
 
     public void testRingOverflowPolicyMessagesRejected()
     {
         Map<String,Object> attributes = new HashMap<>(_arguments);
         attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING);
-        attributes.put(Queue.MAX_COUNT, 0);
+        attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 0);
 
         Queue<?> queue = getQueue();
         queue.setAttributes(attributes);
@@ -967,26 +948,20 @@ abstract class AbstractQueueTestBase ext
 
         message = createMessage(new Long(27), 20, 10);
         result = queue.route(message, message.getInitialRoutingAddress(), null);
-        assertTrue("Result should include routing failure", result.isRoutingFailure());
+        assertTrue("Result should include not accepting route", result.hasNotAcceptingRoutableQueue());
 
         int headerSize = 20;
         int payloadSize = 10;
         int id = 28;
 
         attributes = new HashMap<>(_arguments);
-        attributes.put(Queue.MAX_COUNT, 10);
-        attributes.put(Queue.MAX_SIZE, 10);
+        attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 10);
+        attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 10);
         queue.setAttributes(attributes);
 
         message = createMessage(new Long(id), headerSize, payloadSize);
         result = queue.route(message, message.getInitialRoutingAddress(), null);
-        assertTrue("Result should include routing failure", result.isRoutingFailure());
-
-        attributes = new HashMap<>(_arguments);
-        attributes.put(Queue.OVERFLOW_POLICY, Queue.DEFAULT_POLICY_TYPE);
-        attributes.put(Queue.MAX_COUNT, Queue.DEFAULT_MAX_COUNT);
-        attributes.put(Queue.MAX_SIZE, Queue.DEFAULT_MAX_SIZE);
-        queue.setAttributes(attributes);
+        assertTrue("Result should include not accepting route", result.hasNotAcceptingRoutableQueue());
     }
 
     private long getExpirationOnQueue(final Queue<?> queue, long arrivalTime, long expiration)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org