You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2017/02/23 17:11:05 UTC
svn commit: r1784178 [1/3] - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/logging/messages/
broker-core/src/main/java/org/apache/qpid/server/message/
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/...
Author: orudyy
Date: Thu Feb 23 17:11:04 2017
New Revision: 1784178
URL: http://svn.apache.org/viewvc?rev=1784178&view=rev
Log:
QPID-7618: [Java Broker] Introduce overflow policy for producer flow control and move existing producer flow control functionality into overflow policy handler
Added:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java
- copied, changed from r1784177, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecovererTest.java
Removed:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/LastValueQueueListTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueListTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/SortedQueueEntryListTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueEntryListTest.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/type/transport/AmqpError.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/addQueue.html
qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/editQueue.html
qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/Queue.js
qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/addQueue.js
qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/js/qpid/management/editQueue.js
qpid/java/trunk/broker-plugins/management-http/src/main/java/resources/showQueue.html
qpid/java/trunk/systests/src/main/java/org/apache/qpid/systest/rest/Asserts.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/queue/ProducerFlowControlTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/server/store/VirtualHostMessageStoreTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/QueueRestTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/VirtualHostRestTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/rest/acl/QueueRestACLTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/destination/AddressBasedDestinationTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/client/queue/QueuePolicyTest.java
qpid/java/trunk/test-profiles/Java10Excludes
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/QueueMessages.java Thu Feb 23 17:11:04 2017
@@ -64,6 +64,7 @@ public class QueueMessages
public static final String QUEUE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue";
public static final String CREATED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.created";
+ public static final String DROPPED_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.dropped";
public static final String OVERFULL_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.overfull";
public static final String OPERATION_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.operation";
public static final String FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY = DEFAULT_LOG_HIERARCHY_PREFIX + "queue.flow_to_disk_active";
@@ -75,6 +76,7 @@ public class QueueMessages
{
LoggerFactory.getLogger(QUEUE_LOG_HIERARCHY);
LoggerFactory.getLogger(CREATED_LOG_HIERARCHY);
+ LoggerFactory.getLogger(DROPPED_LOG_HIERARCHY);
LoggerFactory.getLogger(OVERFULL_LOG_HIERARCHY);
LoggerFactory.getLogger(OPERATION_LOG_HIERARCHY);
LoggerFactory.getLogger(FLOW_TO_DISK_ACTIVE_LOG_HIERARCHY);
@@ -208,16 +210,74 @@ public class QueueMessages
/**
* Log a Queue message of the Format:
- * <pre>QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}</pre>
+ * <pre>QUE-1005 : Dropped : {0,number} messages, Depth : {1,number} bytes, {2,number} messages, Capacity : {3,number} bytes, {4,number} messages</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage OVERFULL(Number param1, Number param2)
+ public static LogMessage DROPPED(Number param1, Number param2, Number param3, Number param4, Number param5)
+ {
+ String rawMessage = _messages.getString("DROPPED");
+
+ final Object[] messageArguments = {param1, param2, param3, param4, param5};
+ // Create a new MessageFormat to ensure thread safety.
+ // Sharing a MessageFormat and using applyPattern is not thread safe
+ MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
+
+ final String message = formatter.format(messageArguments);
+
+ return new LogMessage()
+ {
+ public String toString()
+ {
+ return message;
+ }
+
+ public String getLogHierarchy()
+ {
+ return DROPPED_LOG_HIERARCHY;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final LogMessage that = (LogMessage) o;
+
+ return getLogHierarchy().equals(that.getLogHierarchy()) && toString().equals(that.toString());
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = toString().hashCode();
+ result = 31 * result + getLogHierarchy().hashCode();
+ return result;
+ }
+ };
+ }
+
+ /**
+ * Log a Queue message of the Format:
+ * <pre>QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}</pre>
+ * Optional values are contained in [square brackets] and are numbered
+ * sequentially in the method call.
+ *
+ */
+ public static LogMessage OVERFULL(Number param1, Number param2, Number param3, Number param4)
{
String rawMessage = _messages.getString("OVERFULL");
- final Object[] messageArguments = {param1, param2};
+ final Object[] messageArguments = {param1, param2, param3, param4};
// Create a new MessageFormat to ensure thread safety.
// Sharing a MessageFormat and using applyPattern is not thread safe
MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
@@ -382,16 +442,16 @@ public class QueueMessages
/**
* Log a Queue message of the Format:
- * <pre>QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}</pre>
+ * <pre>QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}</pre>
* Optional values are contained in [square brackets] and are numbered
* sequentially in the method call.
*
*/
- public static LogMessage UNDERFULL(Number param1, Number param2)
+ public static LogMessage UNDERFULL(Number param1, Number param2, Number param3, Number param4)
{
String rawMessage = _messages.getString("UNDERFULL");
- final Object[] messageArguments = {param1, param2};
+ final Object[] messageArguments = {param1, param2, param3, param4};
// Create a new MessageFormat to ensure thread safety.
// Sharing a MessageFormat and using applyPattern is not thread safe
MessageFormat formatter = new MessageFormat(rawMessage, _currentLocale);
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/logging/messages/Queue_logmessages.properties Thu Feb 23 17:11:04 2017
@@ -23,8 +23,9 @@
# 3 - priority
CREATED = QUE-1001 : Create : ID: {0}[ Owner: {1}][ AutoDelete][ Durable][ Transient][ Priority: {2,number,#}]
DELETED = QUE-1002 : Deleted : ID: {0}
-OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}
-UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}
+OVERFULL = QUE-1003 : Overfull : Size : {0,number} bytes, Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
+UNDERFULL = QUE-1004 : Underfull : Size : {0,number} bytes, Resume Capacity : {1,number}, Messages : {2,number}, Message Capacity : {3,number}
+DROPPED = QUE-1005 : Dropped : {0,number} messages, Depth : {1,number} bytes, {2,number} messages, Capacity : {3,number} bytes, {4,number} messages
# use similar number to the broker for similar topic
FLOW_TO_DISK_ACTIVE = QUE-1014 : Message flow to disk active : Message memory use {0,number,#} kB exceeds threshold {1,number,#.##} kB
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/message/RoutingResult.java Thu Feb 23 17:11:04 2017
@@ -22,7 +22,9 @@ package org.apache.qpid.server.message;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
@@ -41,14 +43,7 @@ public class RoutingResult<M extends Ser
private final M _message;
private final Set<BaseQueue> _queues = new HashSet<>();
-
- private int _errorCodeAmqp_0_10;
-
- private String _errorCodeAmqp_1_0;
-
- private String _errorMessage;
-
- private boolean _routingFailure = false;
+ private final Map<BaseQueue, CharSequence> _notAcceptingRoutableQueues = new HashMap<>();
public RoutingResult(final M message)
{
@@ -67,7 +62,7 @@ public class RoutingResult<M extends Ser
}
}
- public void addQueues(Collection<? extends BaseQueue> queues)
+ private void addQueues(Collection<? extends BaseQueue> queues)
{
boolean deletedQueues = false;
for(BaseQueue q : queues)
@@ -90,16 +85,13 @@ public class RoutingResult<M extends Ser
public void add(RoutingResult<M> result)
{
- if (result.isRoutingFailure())
- {
- _routingFailure = result._routingFailure;
- _errorCodeAmqp_0_10 = result._errorCodeAmqp_0_10;
- _errorCodeAmqp_1_0 = result._errorCodeAmqp_1_0;
- _errorMessage = result._errorMessage;
- }
- else
+ addQueues(result._queues);
+ for (Map.Entry<BaseQueue, CharSequence> e : result._notAcceptingRoutableQueues.entrySet())
{
- addQueues(result._queues);
+ if (!e.getKey().isDeleted())
+ {
+ _notAcceptingRoutableQueues.put(e.getKey(), e.getValue());
+ }
}
}
@@ -163,31 +155,27 @@ public class RoutingResult<M extends Ser
return !_queues.isEmpty();
}
- public void addRoutingFailure(int errorCodeAmqp_0_10, String errorCodeAmqp_1_0, String reason)
+ public void addNotAcceptingRoutableQueue(BaseQueue q, CharSequence reason)
{
- _routingFailure = true;
- this._errorCodeAmqp_0_10 = errorCodeAmqp_0_10;
- this._errorCodeAmqp_1_0 = errorCodeAmqp_1_0;
- _errorMessage = reason;
+ _notAcceptingRoutableQueues.put(q, reason);
}
- public String getErrorMessage()
+ public boolean hasNotAcceptingRoutableQueue()
{
- return _errorMessage;
+ return !_notAcceptingRoutableQueues.isEmpty();
}
- public int getErrorCodeAmqp_0_10()
+ public String getUnacceptanceCause()
{
- return _errorCodeAmqp_0_10;
- }
-
- public String getErrorCodeAmqp_1_0()
- {
- return _errorCodeAmqp_1_0;
- }
-
- public boolean isRoutingFailure()
- {
- return _routingFailure;
+ StringBuilder refusalMessages = new StringBuilder();
+ for (CharSequence message : _notAcceptingRoutableQueues.values())
+ {
+ if (refusalMessages.length() > 0)
+ {
+ refusalMessages.append(";");
+ }
+ refusalMessages.append(message);
+ }
+ return refusalMessages.toString();
}
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/OverflowPolicy.java Thu Feb 23 17:11:04 2017
@@ -23,5 +23,6 @@ package org.apache.qpid.server.model;
public enum OverflowPolicy
{
NONE,
- RING
+ RING,
+ PRODUCER_FLOW_CONTROL
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Thu Feb 23 17:11:04 2017
@@ -33,7 +33,6 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.MessageInfo;
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.protocol.CapacityChecker;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.NotificationCheck;
import org.apache.qpid.server.queue.QueueConsumer;
@@ -50,7 +49,6 @@ public interface Queue<X extends Queue<X
Comparable<X>, ExchangeReferrer,
BaseQueue,
MessageSource,
- CapacityChecker,
MessageDestination,
Deletable<X>
{
@@ -74,8 +72,7 @@ public interface Queue<X extends Queue<X
String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts";
String NO_LOCAL = "noLocal";
String OWNER = "owner";
- String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes";
- String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes";
+
String QUEUE_FLOW_STOPPED = "queueFlowStopped";
String MAXIMUM_MESSAGE_TTL = "maximumMessageTtl";
String MINIMUM_MESSAGE_TTL = "minimumMessageTtl";
@@ -83,8 +80,8 @@ public interface Queue<X extends Queue<X
String ENSURE_NONDESTRUCTIVE_CONSUMERS = "ensureNondestructiveConsumers";
String HOLD_ON_PUBLISH_ENABLED = "holdOnPublishEnabled";
String OVERFLOW_POLICY = "overflowPolicy";
- String MAX_COUNT = "maxCount";
- String MAX_SIZE = "maxSize";
+ String MAXIMUM_QUEUE_DEPTH_MESSAGES = "maximumQueueDepthMessages";
+ String MAXIMUM_QUEUE_DEPTH_BYTES = "maximumQueueDepthBytes";
String QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT = "queue.minimumEstimatedMemoryFootprint";
@SuppressWarnings("unused")
@@ -170,20 +167,12 @@ public interface Queue<X extends Queue<X
@ManagedAttribute( defaultValue = "${queue.maximumDeliveryAttempts}")
int getMaximumDeliveryAttempts();
+ String QUEUE_FLOW_RESUME_LIMIT = "queue.queueFlowResumeLimit";
@SuppressWarnings("unused")
- @ManagedContextDefault( name = "queue.queueFlowControlSizeBytes")
- long DEFAULT_FLOW_CONTROL_SIZE_BYTES = 0L;
-
- @ManagedAttribute( defaultValue = "${queue.queueFlowControlSizeBytes}")
- long getQueueFlowControlSizeBytes();
-
- @SuppressWarnings("unused")
- @ManagedContextDefault( name = "queue.queueFlowResumeSizeBytes")
- long DEFAULT_FLOW_CONTROL_RESUME_SIZE_BYTES = 0L;
-
- @ManagedAttribute( defaultValue = "${queue.queueFlowResumeSizeBytes}")
- long getQueueFlowResumeSizeBytes();
-
+ @ManagedContextDefault( name = QUEUE_FLOW_RESUME_LIMIT,
+ description = "Percentage used to evaluate flow resume limit based on the values of attributes"
+ + " 'maximumQueueDepthBytes' and 'maximumQueueDepthMessages'.")
+ double DEFAULT_FLOW_CONTROL_RESUME_LIMIT = 80.0;
@SuppressWarnings("unused")
@DerivedAttribute
@@ -261,25 +250,39 @@ public interface Queue<X extends Queue<X
+ "visible may depend on how frequently the virtual host housekeeping thread runs.")
boolean isHoldOnPublishEnabled();
- @ManagedContextDefault( name = "queue.defaultMaxCount", description = "Maximum count of messages in queue when policy_type set to Ring")
- long DEFAULT_MAX_COUNT = 10;
-
- @ManagedAttribute( defaultValue = "${queue.defaultMaxCount}", description = "Maximum count of messages in queue, when policy_type set to Ring")
- long getMaxCount();
-
- @ManagedContextDefault( name = "queue.defaultMaxSize", description = "Maximum size of messages in queue in bytes, when policy_type set to Ring")
- long DEFAULT_MAX_SIZE = 1024;
-
- @ManagedAttribute( defaultValue = "${queue.defaultMaxSize}", description = "Maximum size of messages in queue in bytes, when policy_type set to Ring")
- long getMaxSize();
-
- @SuppressWarnings("unused")
- @ManagedContextDefault( name = "queue.defaultOverflowPolicy")
- OverflowPolicy DEFAULT_POLICY_TYPE = OverflowPolicy.NONE;
-
- @ManagedAttribute( defaultValue = "${queue.defaultOverflowPolicy}", description = "Queue overflow policy. Current options are Ring and None." +
- " Ring overflow policy - when queue message count or size of messages in queue exceeds maximum, oldest message(s) is discarded." +
- " None overflow policy - maximum size and maximum count properties are not applied.")
+ @ManagedContextDefault(name = "queue.defaultMaximumQueueDepthMessages",
+ description = "Maximum number of messages on queue allowed by overflow policy.")
+ long DEFAULT_MAXIMUM_QUEUE_DEPTH_MESSAGES = -1;
+
+ @ManagedAttribute(defaultValue = "${queue.defaultMaximumQueueDepthMessages}",
+ description = "Maximum number of messages on queue allowed by overflow policy."
+ + " Negative value indicates that queue depth is unlimited. Default, -1.")
+ long getMaximumQueueDepthMessages();
+
+ @ManagedContextDefault(name = "queue.defaultMaximumQueueDepthBytes",
+ description = "Maximum number of bytes on queue allowed by overflow policy." )
+ long DEFAULT_MAXIMUM_QUEUE_DEPTH_BYTES = -1;
+
+ @ManagedAttribute(defaultValue = "${queue.defaultMaximumQueueDepthBytes}",
+ description = "Maximum number of bytes on queue allowed by overflow policy."
+ + " Negative value indicates that queue depth is unlimited. Default, -1.")
+ long getMaximumQueueDepthBytes();
+
+ @SuppressWarnings("unused")
+ @ManagedContextDefault(name = "queue.defaultOverflowPolicy",
+ description = "Specifies the default value for queue overflow policy. ")
+ OverflowPolicy DEFAULT_OVERFLOW_POLICY = OverflowPolicy.NONE;
+
+ @ManagedAttribute(defaultValue = "${queue.defaultOverflowPolicy}",
+ description = "Queue overflow policy."
+ + " Current options are ProducerFlowControl, Ring and None."
+ + " ProducerFlowControl overflow policy - when queue message count or size of messages"
+ + " in queue exceeds maximum, the producer is blocked until queue depth falls below "
+ + " the resume threshold."
+ + " Ring overflow policy - when queue message count or size of messages in queue exceeds"
+ + " maximum, oldest messages are discarded."
+ + " None overflow policy - maximum size and maximum count properties are not applied.",
+ mandatory = true)
OverflowPolicy getOverflowPolicy();
@ManagedOperation(nonModifying = true, changesConfiguredObjectState = false)
@@ -297,7 +300,6 @@ public interface Queue<X extends Queue<X
int deleteAndReturnCount();
- void deleteEntry(QueueEntry node);
void setNotificationListener(QueueNotificationListener listener);
@@ -483,4 +485,9 @@ public interface Queue<X extends Queue<X
boolean isHeld(QueueEntry queueEntry, final long evaluationTime);
void checkCapacity();
+
+ void deleteEntry(QueueEntry entry);
+
+ QueueEntry getLesserOldestEntry();
+
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Feb 23 17:11:04 2017
@@ -182,18 +182,9 @@ public abstract class AbstractQueue<X ex
private long _alertRepeatGap;
@ManagedAttributeField
- private long _queueFlowControlSizeBytes;
-
- @ManagedAttributeField( afterSet = "checkCapacity" )
- private long _queueFlowResumeSizeBytes;
-
- @ManagedAttributeField
private ExclusivityPolicy _exclusive;
@ManagedAttributeField
- private OverflowPolicy _overflowPolicy;
-
- @ManagedAttributeField
private MessageDurability _messageDurability;
@ManagedAttributeField
@@ -211,9 +202,6 @@ public abstract class AbstractQueue<X ex
private AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Set<AMQPSession<?, ?>> _blockedSessions =
- Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?, ?>, Boolean>());
-
private final AtomicBoolean _deleted = new AtomicBoolean(false);
private final SettableFuture<Integer> _deleteFuture = SettableFuture.create();
@@ -226,7 +214,6 @@ public abstract class AbstractQueue<X ex
@ManagedAttributeField
private boolean _noLocal;
- private final AtomicBoolean _overfull = new AtomicBoolean(false);
private final FlowToDiskChecker _flowToDiskChecker = new FlowToDiskChecker();
private final CopyOnWriteArrayList<Binding> _bindings = new CopyOnWriteArrayList<>();
private Map<String, Object> _arguments;
@@ -259,10 +246,13 @@ public abstract class AbstractQueue<X ex
private boolean _ensureNondestructiveConsumers;
@ManagedAttributeField
private volatile boolean _holdOnPublishEnabled;
+
+ @ManagedAttributeField
+ private OverflowPolicy _overflowPolicy;
@ManagedAttributeField
- private long _maxCount;
+ private long _maximumQueueDepthMessages;
@ManagedAttributeField
- private long _maxSize;
+ private long _maximumQueueDepthBytes;
private static final int RECOVERING = 1;
private static final int COMPLETING_RECOVERY = 2;
@@ -279,6 +269,7 @@ public abstract class AbstractQueue<X ex
private Map<String, String> _mimeTypeToFileExtension = Collections.emptyMap();
private AdvanceConsumersTask _queueHouseKeepingTask;
private volatile int _bindingCount;
+ private volatile OverflowPolicyHandler _overflowPolicyHandler;
private interface HoldMethod
{
@@ -336,9 +327,10 @@ public abstract class AbstractQueue<X ex
public void onValidate()
{
super.onValidate();
- if (_queueFlowResumeSizeBytes > _queueFlowControlSizeBytes)
+ Double flowResumeLimit = getContextValue(Double.class, QUEUE_FLOW_RESUME_LIMIT);
+ if (flowResumeLimit != null && (flowResumeLimit < 0.0 || flowResumeLimit > 100.0))
{
- throw new IllegalConfigurationException("Flow resume size can't be greater than flow control size");
+ throw new IllegalConfigurationException("Flow resume limit value cannot be greater than 100 or lower than 0");
}
}
@@ -361,6 +353,8 @@ public abstract class AbstractQueue<X ex
_estimatedMinimumMemoryFootprint = getContextValue(Long.class, QUEUE_MINIMUM_ESTIMATED_MEMORY_FOOTPRINT);
_estimatedMessageMemoryOverhead = getContextValue(Long.class, QUEUE_ESTIMATED_MESSAGE_MEMORY_OVERHEAD);
+ _overflowPolicyHandler = createOverflowPolicyHandler(getOverflowPolicy());
+
_queueHouseKeepingTask = new AdvanceConsumersTask();
Subject activeSubject = Subject.getSubject(AccessController.getContext());
Set<SessionPrincipal> sessionPrincipals = activeSubject == null ? Collections.<SessionPrincipal>emptySet() : activeSubject.getPrincipals(SessionPrincipal.class);
@@ -680,15 +674,15 @@ public abstract class AbstractQueue<X ex
}
@Override
- public long getMaxCount()
+ public long getMaximumQueueDepthMessages()
{
- return _maxCount;
+ return _maximumQueueDepthMessages;
}
@Override
- public long getMaxSize()
+ public long getMaximumQueueDepthBytes()
{
- return _maxSize;
+ return _maximumQueueDepthBytes;
}
@Override
@@ -1158,7 +1152,6 @@ public abstract class AbstractQueue<X ex
protected void doEnqueue(final ServerMessage message, final Action<? super MessageInstance> action, MessageEnqueueRecord enqueueRecord)
{
- applyOverflowPolicy(message);
final QueueEntry entry = getEntries().add(message, enqueueRecord);
updateExpiration(entry);
@@ -1178,35 +1171,11 @@ public abstract class AbstractQueue<X ex
{
action.performAction(entry);
}
+ _overflowPolicyHandler.checkOverflow();
}
}
- private void applyOverflowPolicy(final ServerMessage message)
- {
- if (getOverflowPolicy() != null)
- {
- switch (getOverflowPolicy())
- {
- case RING:
- while (getQueueDepthMessages() == getMaxCount())
- {
- QueueEntry entry = getEntries().getOldestEntry();
- deleteEntry(entry);
- }
- while (getQueueDepthBytesIncludingHeader() + message.getSizeIncludingHeader() > getMaxSize())
- {
- QueueEntry entry = getEntries().getOldestEntry();
- deleteEntry(entry);
- }
- break;
- case NONE:
- default:
- break;
- }
- }
- }
-
private void updateExpiration(final QueueEntry entry)
{
long expiration = entry.getMessage().getExpiration();
@@ -1671,6 +1640,7 @@ public abstract class AbstractQueue<X ex
});
}
+ @Override
public void deleteEntry(final QueueEntry entry)
{
boolean acquiredForDequeueing = entry.acquireOrSteal(new Runnable()
@@ -1684,8 +1654,7 @@ public abstract class AbstractQueue<X ex
if(acquiredForDequeueing)
{
- _logger.debug("Dequeuing expired node {}", entry);
- // Then dequeue it.
+ _logger.debug("Dequeuing node {}", entry);
dequeueEntry(entry);
}
}
@@ -1804,64 +1773,9 @@ public abstract class AbstractQueue<X ex
}
@Override
- public void checkCapacity(AMQPSession<?,?> session)
- {
- if(_queueFlowControlSizeBytes != 0L)
- {
- if(_queueStatistics.getQueueSize() > _queueFlowControlSizeBytes)
- {
- _overfull.set(true);
- //Overfull log message
- getEventLogger().message(_logSubject, QueueMessages.OVERFULL(_queueStatistics.getQueueSize(),
- _queueFlowControlSizeBytes));
-
- _blockedSessions.add(session);
-
- session.block(this);
-
- if(_queueStatistics.getQueueSize() <= _queueFlowResumeSizeBytes)
- {
-
- //Underfull log message
- getEventLogger().message(_logSubject,
- QueueMessages.UNDERFULL(_queueStatistics.getQueueSize(), _queueFlowResumeSizeBytes));
-
- session.unblock(this);
- _blockedSessions.remove(session);
-
- }
- }
-
-
-
- }
- }
-
- @Override
public void checkCapacity()
{
- if(getEntries() != null)
- {
- if (_queueFlowControlSizeBytes != 0L)
- {
- if (_overfull.get() && _queueStatistics.getQueueSize() <= _queueFlowResumeSizeBytes)
- {
- if (_overfull.compareAndSet(true, false))
- {
- //Underfull log message
- getEventLogger().message(_logSubject,
- QueueMessages.UNDERFULL(_queueStatistics.getQueueSize(),
- _queueFlowResumeSizeBytes));
- }
-
- for (final AMQPSession<?,?> blockedSession : _blockedSessions)
- {
- blockedSession.unblock(this);
- _blockedSessions.remove(blockedSession);
- }
- }
- }
- }
+ _overflowPolicyHandler.checkOverflow();
}
void notifyConsumers(QueueEntry entry)
@@ -2289,16 +2203,6 @@ public abstract class AbstractQueue<X ex
return _alertThresholdMessageSize;
}
- public long getQueueFlowControlSizeBytes()
- {
- return _queueFlowControlSizeBytes;
- }
-
- public long getQueueFlowResumeSizeBytes()
- {
- return _queueFlowResumeSizeBytes;
- }
-
public Set<NotificationCheck> getNotificationChecks()
{
return _notificationChecks;
@@ -2636,32 +2540,15 @@ public abstract class AbstractQueue<X ex
throw new VirtualHostUnavailableException(this._virtualHost);
}
RoutingResult<M> result = new RoutingResult<>(message);
-
- if(_overflowPolicy == OverflowPolicy.RING && message.getSizeIncludingHeader() > getMaxSize())
+ if (!message.isResourceAcceptable(this))
{
- result.addRoutingFailure(506, "amqp:resource-limit-exceeded", "Message larger than configured maximum depth on " + this.getName()
- + ": size=" + message.getSizeIncludingHeader() + ", max-size=" + getMaxSize());
+ result.addNotAcceptingRoutableQueue(this, String.format("Not accepted by queue '%s'", getName()));
}
- else if(_overflowPolicy == OverflowPolicy.RING && _maxCount == 0)
+ else if (message.isReferenced(this))
{
- result.addRoutingFailure(506, "amqp:resource-limit-exceeded", "Queue '" + getName() + "' maximum count (0) prevents this message from being accepted");
+ result.addNotAcceptingRoutableQueue(this, String.format("Already enqueued on queue '%s'", getName()));
}
- else if (this instanceof PriorityQueue && message.isResourceAcceptable(this) && !message.isReferenced(this))
- {
- if (getMaxCount() == getQueueDepthMessages())
- {
- QueueEntry entry = getEntries().getOldestEntry();
- if(entry.getMessage().getMessageHeader().getPriority() <= message.getMessageHeader().getPriority())
- {
- result.addQueue(this);
- }
- }
- else
- {
- result.addQueue(this);
- }
- }
- else if(message.isResourceAcceptable(this) && !message.isReferenced(this))
+ else
{
result.addQueue(this);
}
@@ -2992,7 +2879,11 @@ public abstract class AbstractQueue<X ex
@Override
public boolean isQueueFlowStopped()
{
- return _overfull.get();
+ if (_overflowPolicyHandler instanceof ProducerFlowControlOverflowPolicyHandler)
+ {
+ return ((ProducerFlowControlOverflowPolicyHandler)_overflowPolicyHandler).isQueueFlowStopped();
+ }
+ return false;
}
@Override
@@ -3008,7 +2899,7 @@ public abstract class AbstractQueue<X ex
}
@Override
- public boolean changeAttribute(String name, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ protected boolean changeAttribute(String name, Object desired) throws IllegalStateException, AccessControlException, IllegalArgumentException
{
if(EXCLUSIVE.equals(name))
{
@@ -3037,14 +2928,39 @@ public abstract class AbstractQueue<X ex
}
+ @Override
+ protected void changeAttributes(Map<String,Object> attributes) throws IllegalStateException, AccessControlException, IllegalArgumentException
+ {
+ OverflowPolicy oldOverflowPolicy = getOverflowPolicy();
+ super.changeAttributes(attributes);
+
+ OverflowPolicy newOverflowPolicy = getOverflowPolicy();
+ if (oldOverflowPolicy != newOverflowPolicy)
+ {
+ _overflowPolicyHandler = createOverflowPolicyHandler(newOverflowPolicy);
+ _overflowPolicyHandler.checkOverflow();
+ }
+ }
+
+ private OverflowPolicyHandler createOverflowPolicyHandler(final OverflowPolicy overflowPolicy)
+ {
+ OverflowPolicyHandlerFactory factory =
+ new QpidServiceLoader().getInstancesByType(OverflowPolicyHandlerFactory.class)
+ .get(String.valueOf(overflowPolicy));
+ if (factory == null)
+ {
+ throw new IllegalStateException(String.format("Factory for overflow policy '%s' is not found",
+ overflowPolicy.name()));
+ }
+ return factory.create(this, getEventLogger());
+ }
+
private static final String[] NON_NEGATIVE_NUMBERS = {
ALERT_REPEAT_GAP,
ALERT_THRESHOLD_MESSAGE_AGE,
ALERT_THRESHOLD_MESSAGE_SIZE,
ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
- QUEUE_FLOW_CONTROL_SIZE_BYTES,
- QUEUE_FLOW_RESUME_SIZE_BYTES,
MAXIMUM_DELIVERY_ATTEMPTS
};
@@ -3053,12 +2969,6 @@ public abstract class AbstractQueue<X ex
{
super.validateChange(proxyForValidation, changedAttributes);
Queue<?> queue = (Queue) proxyForValidation;
- long queueFlowControlSize = queue.getQueueFlowControlSizeBytes();
- long queueFlowControlResumeSize = queue.getQueueFlowResumeSizeBytes();
- if (queueFlowControlResumeSize > queueFlowControlSize)
- {
- throw new IllegalConfigurationException("Flow resume size can't be greater than flow control size");
- }
for (String attrName : NON_NEGATIVE_NUMBERS)
{
@@ -3255,6 +3165,12 @@ public abstract class AbstractQueue<X ex
return messageFinder.getMessageInfo();
}
+ @Override
+ public QueueEntry getLesserOldestEntry()
+ {
+ return getEntries().getLesserOldestEntry();
+ }
+
private class MessageFinder implements QueueEntryVisitor
{
private final long _messageNumber;
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/LastValueQueueList.java Thu Feb 23 17:11:04 2017
@@ -146,6 +146,12 @@ public class LastValueQueueList extends
return addedEntry;
}
+ @Override
+ public QueueEntry getLesserOldestEntry()
+ {
+ return getOldestEntry();
+ }
+
/**
* Returns:
*
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandler.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+public class NoneOverflowPolicyHandler implements OverflowPolicyHandler
+{
+ @Override
+ public void checkOverflow()
+ {
+ // noop
+ }
+
+}
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/NoneOverflowPolicyHandlerFactory.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.PluggableService;
+
+@PluggableService
+public class NoneOverflowPolicyHandlerFactory implements OverflowPolicyHandlerFactory
+{
+
+ @Override
+ public String getType()
+ {
+ return OverflowPolicy.NONE.name();
+ }
+
+ @Override
+ public OverflowPolicyHandler create(final Queue<?> queue,
+ final EventLogger eventLogger)
+ {
+ return new NoneOverflowPolicyHandler();
+ }
+}
Copied: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java (from r1784177, qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java)
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java?p2=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java&p1=qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java&r1=1784177&r2=1784178&rev=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/CapacityChecker.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandler.java Thu Feb 23 17:11:04 2017
@@ -18,11 +18,10 @@
* under the License.
*
*/
-package org.apache.qpid.server.protocol;
-import org.apache.qpid.server.session.AMQPSession;
+package org.apache.qpid.server.queue;
-public interface CapacityChecker
+public interface OverflowPolicyHandler
{
- void checkCapacity(AMQPSession<?,?> channel);
+ void checkOverflow();
}
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/OverflowPolicyHandlerFactory.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.Pluggable;
+
+public interface OverflowPolicyHandlerFactory extends Pluggable
+{
+ OverflowPolicyHandler create(Queue<?> queue, final EventLogger eventLogger);
+}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/PriorityQueueList.java Thu Feb 23 17:11:04 2017
@@ -207,13 +207,27 @@ abstract public class PriorityQueueList
for(PriorityQueueEntrySubList subList : _priorityLists)
{
QueueEntry subListOldest = subList.getOldestEntry();
- if (subListOldest != null)
+ if(oldest == null || (subListOldest != null && subListOldest.getMessage().getMessageNumber() < oldest.getMessage().getMessageNumber()))
{
- return subListOldest;
+ oldest = subListOldest;
}
}
return oldest;
}
+
+ @Override
+ public QueueEntry getLesserOldestEntry()
+ {
+ for(PriorityQueueEntrySubList subList : _priorityLists)
+ {
+ QueueEntry subListLast = subList.getLesserOldestEntry();
+ if(subListLast != null)
+ {
+ return subListLast;
+ }
+ }
+ return null;
+ }
}
static class PriorityQueueEntrySubList extends PriorityQueueList
@@ -245,6 +259,12 @@ abstract public class PriorityQueueList
{
return _listPriority;
}
+
+ @Override
+ public QueueEntry getLesserOldestEntry()
+ {
+ return getOldestEntry();
+ }
}
static class PriorityQueueEntry extends OrderedQueueEntry
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandler.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import java.security.AccessController;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.security.auth.Subject;
+
+import org.apache.qpid.server.connection.SessionPrincipal;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.model.AbstractConfigurationChangeListener;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.session.AMQPSession;
+
+public class ProducerFlowControlOverflowPolicyHandler implements OverflowPolicyHandler
+{
+ private final Handler _handler;
+
+ ProducerFlowControlOverflowPolicyHandler(Queue<?> queue, EventLogger eventLogger)
+ {
+ _handler = new Handler(queue, eventLogger);
+ queue.addChangeListener(_handler);
+ }
+
+ boolean isQueueFlowStopped()
+ {
+ return _handler.isQueueFlowStopped();
+ }
+
+ @Override
+ public void checkOverflow()
+ {
+ _handler.checkOverflow();
+ }
+
+ private static class Handler extends AbstractConfigurationChangeListener implements OverflowPolicyHandler
+ {
+ private final Queue<?> _queue;
+ private final EventLogger _eventLogger;
+ private final AtomicBoolean _overfull = new AtomicBoolean(false);
+ private final Set<AMQPSession<?, ?>> _blockedSessions =
+ Collections.newSetFromMap(new ConcurrentHashMap<AMQPSession<?, ?>, Boolean>());
+ private volatile double _queueFlowResumeLimit;
+ private boolean _checkCapacity;
+
+ private Handler(final Queue<?> queue, final EventLogger eventLogger)
+ {
+ _queue = queue;
+ _eventLogger = eventLogger;
+ Double value = _queue.getContextValue(Double.class, Queue.QUEUE_FLOW_RESUME_LIMIT);
+ if (value != null)
+ {
+ _queueFlowResumeLimit = value;
+ }
+ }
+
+ @Override
+ public void checkOverflow()
+ {
+ long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
+ long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages();
+ if (maximumQueueDepthBytes >= 0L || maximumQueueDepthMessages >= 0L)
+ {
+ checkOverfull(maximumQueueDepthBytes, maximumQueueDepthMessages);
+ }
+
+ checkUnderfull(maximumQueueDepthBytes, maximumQueueDepthMessages);
+ }
+
+ @Override
+ public void attributeSet(final ConfiguredObject<?> object,
+ final String attributeName,
+ final Object oldAttributeValue,
+ final Object newAttributeValue)
+ {
+ super.attributeSet(object, attributeName, oldAttributeValue, newAttributeValue);
+ if (Queue.CONTEXT.equals(attributeName))
+ {
+ Double value = _queue.getContextValue(Double.class, Queue.QUEUE_FLOW_RESUME_LIMIT);
+ double queueFlowResumePercentage = value == null ? 0 : value;
+ if (queueFlowResumePercentage != _queueFlowResumeLimit)
+ {
+ _queueFlowResumeLimit = queueFlowResumePercentage;
+ _checkCapacity = true;
+ }
+ }
+ if (Queue.MAXIMUM_QUEUE_DEPTH_BYTES.equals(attributeName)
+ || Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES.equals(attributeName))
+ {
+ _checkCapacity = true;
+ }
+ }
+
+ @Override
+ public void bulkChangeEnd(final ConfiguredObject<?> object)
+ {
+ super.bulkChangeEnd(object);
+ if (_queue.getOverflowPolicy() == OverflowPolicy.PRODUCER_FLOW_CONTROL)
+ {
+ if (_checkCapacity)
+ {
+ _checkCapacity = false;
+ checkUnderfull(_queue.getMaximumQueueDepthBytes(), _queue.getMaximumQueueDepthMessages());
+ }
+ }
+ else
+ {
+ _queue.removeChangeListener(this);
+ checkUnderfull(-1, -1);
+
+ if (_overfull.compareAndSet(true, false))
+ {
+ _eventLogger.message(_queue.getLogSubject(),
+ QueueMessages.UNDERFULL(_queue.getQueueDepthBytes(),
+ getFlowResumeLimit((long) -1),
+ (long) _queue.getQueueDepthMessages(),
+ getFlowResumeLimit((long) -1)));
+ }
+
+ for (final AMQPSession<?, ?> blockedSession : _blockedSessions)
+ {
+ blockedSession.unblock(_queue);
+ _blockedSessions.remove(blockedSession);
+ }
+ }
+ }
+
+ boolean isQueueFlowStopped()
+ {
+ return _overfull.get();
+ }
+
+ private void checkUnderfull(long maximumQueueDepthBytes, long maximumQueueDepthMessages)
+ {
+ if (_overfull.get())
+ {
+ long queueDepthBytes = _queue.getQueueDepthBytes();
+ long queueDepthMessages = _queue.getQueueDepthMessages();
+
+ if (isUnderfull(queueDepthBytes, maximumQueueDepthBytes)
+ && isUnderfull(queueDepthMessages, maximumQueueDepthMessages))
+ {
+ if (_overfull.compareAndSet(true, false))
+ {
+ _eventLogger.message(_queue.getLogSubject(),
+ QueueMessages.UNDERFULL(queueDepthBytes,
+ getFlowResumeLimit(maximumQueueDepthBytes),
+ queueDepthMessages,
+ getFlowResumeLimit(maximumQueueDepthMessages)));
+ }
+
+ for (final AMQPSession<?, ?> blockedSession : _blockedSessions)
+ {
+ blockedSession.unblock(_queue);
+ _blockedSessions.remove(blockedSession);
+ }
+ }
+ }
+ }
+
+ private void checkOverfull(final long maximumQueueDepthBytes, final long maximumQueueDepthMessages)
+ {
+ final long queueDepthBytes = _queue.getQueueDepthBytes();
+ final long queueDepthMessages = _queue.getQueueDepthMessages();
+
+ if ((maximumQueueDepthBytes >= 0L && queueDepthBytes > maximumQueueDepthBytes) ||
+ (maximumQueueDepthMessages >= 0L && queueDepthMessages > maximumQueueDepthMessages))
+ {
+ Subject subject = Subject.getSubject(AccessController.getContext());
+ Set<SessionPrincipal> sessionPrincipals = subject.getPrincipals(SessionPrincipal.class);
+ if (!sessionPrincipals.isEmpty())
+ {
+ SessionPrincipal sessionPrincipal = sessionPrincipals.iterator().next();
+ if (sessionPrincipal != null)
+ {
+
+ if (_overfull.compareAndSet(false, true))
+ {
+ _eventLogger.message(_queue.getLogSubject(),
+ QueueMessages.OVERFULL(queueDepthBytes,
+ maximumQueueDepthBytes,
+ queueDepthMessages,
+ maximumQueueDepthMessages));
+ }
+
+ final AMQPSession<?, ?> session = sessionPrincipal.getSession();
+ _blockedSessions.add(session);
+ session.block(_queue);
+
+ if (isUnderfull(queueDepthBytes, maximumQueueDepthBytes)
+ && isUnderfull(queueDepthMessages, maximumQueueDepthMessages))
+ {
+
+ session.unblock(_queue);
+ _blockedSessions.remove(session);
+ }
+ }
+ }
+ }
+ }
+
+ private boolean isUnderfull(final long queueDepth,
+ final long maximumQueueDepth)
+ {
+ return maximumQueueDepth < 0 || queueDepth <= getFlowResumeLimit(maximumQueueDepth);
+ }
+
+ private long getFlowResumeLimit(final long maximumQueueDepth)
+ {
+ if (maximumQueueDepth >= 0)
+ {
+ return (long) (_queueFlowResumeLimit / 100.0 * maximumQueueDepth);
+ }
+ return -1;
+ }
+ }
+
+}
+
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/ProducerFlowControlOverflowPolicyHandlerFactory.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.PluggableService;
+
+@PluggableService
+public class ProducerFlowControlOverflowPolicyHandlerFactory implements OverflowPolicyHandlerFactory
+{
+ @Override
+ public String getType()
+ {
+ return OverflowPolicy.PRODUCER_FLOW_CONTROL.name();
+ }
+
+ @Override
+ public OverflowPolicyHandler create(final Queue<?> queue,
+ final EventLogger eventLogger)
+ {
+ return new ProducerFlowControlOverflowPolicyHandler(queue, eventLogger);
+ }
+}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueArgumentsConverter.java Thu Feb 23 17:11:04 2017
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.OverflowPolicy;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
public class QueueArgumentsConverter
@@ -92,8 +93,7 @@ public class QueueArgumentsConverter
ATTRIBUTE_MAPPINGS.put(X_QPID_MAXIMUM_DELIVERY_COUNT, Queue.MAXIMUM_DELIVERY_ATTEMPTS);
- ATTRIBUTE_MAPPINGS.put(X_QPID_CAPACITY, Queue.QUEUE_FLOW_CONTROL_SIZE_BYTES);
- ATTRIBUTE_MAPPINGS.put(X_QPID_FLOW_RESUME_CAPACITY, Queue.QUEUE_FLOW_RESUME_SIZE_BYTES);
+ ATTRIBUTE_MAPPINGS.put(X_QPID_CAPACITY, Queue.MAXIMUM_QUEUE_DEPTH_BYTES);
ATTRIBUTE_MAPPINGS.put(QPID_QUEUE_SORT_KEY, SortedQueue.SORT_KEY);
ATTRIBUTE_MAPPINGS.put(QPID_LAST_VALUE_QUEUE_KEY, LastValueQueue.LVQ_KEY);
@@ -116,8 +116,8 @@ public class QueueArgumentsConverter
ATTRIBUTE_MAPPINGS.put(QPID_LIFETIME_POLICY, Queue.LIFETIME_POLICY);
ATTRIBUTE_MAPPINGS.put(QPID_POLICY_TYPE, Queue.OVERFLOW_POLICY);
- ATTRIBUTE_MAPPINGS.put(QPID_MAX_COUNT, Queue.MAX_COUNT);
- ATTRIBUTE_MAPPINGS.put(QPID_MAX_SIZE, Queue.MAX_SIZE);
+ ATTRIBUTE_MAPPINGS.put(QPID_MAX_COUNT, Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES);
+ ATTRIBUTE_MAPPINGS.put(QPID_MAX_SIZE, Queue.MAXIMUM_QUEUE_DEPTH_BYTES);
}
@@ -141,6 +141,7 @@ public class QueueArgumentsConverter
{
modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.valueOf(String.valueOf(wireArguments.get(QPID_POLICY_TYPE)).toUpperCase()));
}
+
if(wireArguments.containsKey(QPID_SHARED_MSG_GROUP))
{
modelArguments.put(Queue.MESSAGE_GROUP_SHARED_GROUPS,
@@ -156,6 +157,26 @@ public class QueueArgumentsConverter
modelArguments.put(Queue.NO_LOCAL, Boolean.parseBoolean(wireArguments.get(QPID_NO_LOCAL).toString()));
}
+ if (wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY) != null && wireArguments.get(X_QPID_CAPACITY) != null)
+ {
+ double resumeCapacity = Integer.parseInt(wireArguments.get(X_QPID_FLOW_RESUME_CAPACITY).toString());
+ double maximumCapacity = Integer.parseInt(wireArguments.get(X_QPID_CAPACITY).toString());
+ if (resumeCapacity > maximumCapacity)
+ {
+ throw new ConnectionScopedRuntimeException(
+ "Flow resume size can't be greater than flow control size");
+ }
+ Map<String, String> context = (Map<String, String>) modelArguments.get(Queue.CONTEXT);
+ if (context == null)
+ {
+ context = new HashMap<>();
+ modelArguments.put(Queue.CONTEXT, context);
+ }
+ double ratio = resumeCapacity / maximumCapacity;
+ context.put(Queue.QUEUE_FLOW_RESUME_LIMIT, String.format("%.2f", ratio * 100.0));
+ modelArguments.put(Queue.OVERFLOW_POLICY, OverflowPolicy.PRODUCER_FLOW_CONTROL);
+ }
+
}
return modelArguments;
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java Thu Feb 23 17:11:04 2017
@@ -46,4 +46,6 @@ interface QueueEntryList
void updateStatsOnStateChange(QueueEntry entry, QueueEntry.EntryState fromState, QueueEntry.EntryState toState);
+ QueueEntry getLesserOldestEntry();
+
}
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandler.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.QueueMessages;
+import org.apache.qpid.server.model.Queue;
+
+public class RingOverflowPolicyHandler implements OverflowPolicyHandler
+{
+ private final Queue<?> _queue;
+ private final EventLogger _eventLogger;
+
+ RingOverflowPolicyHandler(Queue<?> queue, final EventLogger eventLogger)
+ {
+ _queue = queue;
+ _eventLogger = eventLogger;
+ }
+
+ @Override
+ public void checkOverflow()
+ {
+ final long maximumQueueDepthMessages = _queue.getMaximumQueueDepthMessages();
+ final long maximumQueueDepthBytes = _queue.getMaximumQueueDepthBytes();
+
+ boolean bytesOverflow, messagesOverflow, overflow = false;
+ int counter = 0;
+ int queueDepthMessages;
+ long queueDepthBytes;
+ do
+ {
+ queueDepthMessages = _queue.getQueueDepthMessages();
+ queueDepthBytes = _queue.getQueueDepthBytesIncludingHeader();
+
+ messagesOverflow = maximumQueueDepthMessages >= 0 && queueDepthMessages > maximumQueueDepthMessages;
+ bytesOverflow = maximumQueueDepthBytes >= 0 && queueDepthBytes > maximumQueueDepthBytes;
+
+ if (bytesOverflow || messagesOverflow)
+ {
+ if (!overflow)
+ {
+ overflow = true;
+ }
+
+ QueueEntry entry = _queue.getLesserOldestEntry();
+
+ if (entry != null)
+ {
+ counter++;
+ _queue.deleteEntry(entry);
+ }
+ else
+ {
+ queueDepthMessages = _queue.getQueueDepthMessages();
+ queueDepthBytes = _queue.getQueueDepthBytesIncludingHeader();
+ break;
+ }
+ }
+ }
+ while (bytesOverflow || messagesOverflow);
+
+ if (overflow)
+ {
+ _eventLogger.message(_queue.getLogSubject(),
+ QueueMessages.DROPPED(
+ counter,
+ queueDepthBytes,
+ queueDepthMessages,
+ maximumQueueDepthBytes,
+ maximumQueueDepthMessages));
+ }
+ }
+
+}
Added: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java?rev=1784178&view=auto
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java (added)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/RingOverflowPolicyHandlerFactory.java Thu Feb 23 17:11:04 2017
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.model.OverflowPolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.plugin.PluggableService;
+
+@PluggableService
+public class RingOverflowPolicyHandlerFactory implements OverflowPolicyHandlerFactory
+{
+
+ @Override
+ public String getType()
+ {
+ return OverflowPolicy.RING.name();
+ }
+
+ @Override
+ public OverflowPolicyHandler create(final Queue<?> queue,
+ final EventLogger eventLogger)
+ {
+ return new RingOverflowPolicyHandler(queue, eventLogger);
+ }
+}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/SortedQueueEntryList.java Thu Feb 23 17:11:04 2017
@@ -464,6 +464,28 @@ public class SortedQueueEntryList extend
return 0;
}
+ @Override
+ public QueueEntry getLesserOldestEntry()
+ {
+ SortedQueueEntry lastNode = null;
+ QueueEntryIterator iterator = iterator();
+ while (iterator.advance())
+ {
+ QueueEntry node = iterator.getNode();
+ if (node != null && !node.isDeleted())
+ {
+ SortedQueueEntry sortedQueueEntry = (SortedQueueEntry)node;
+ if (lastNode == null
+ || (lastNode.getKey() != null && !lastNode.getKey().equals(sortedQueueEntry.getKey()))
+ || (lastNode.getKey() == null && sortedQueueEntry.getKey() != null) )
+ {
+ lastNode = sortedQueueEntry;
+ }
+ }
+ }
+ return lastNode;
+ }
+
/**
* Swaps the position of the node in the tree with it's successor
* (that is the node with the next highest key)
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/StandardQueueEntryList.java Thu Feb 23 17:11:04 2017
@@ -47,4 +47,10 @@ public class StandardQueueEntryList exte
return new StandardQueueEntry(this, message, enqueueRecord);
}
+
+ @Override
+ public QueueEntry getLesserOldestEntry()
+ {
+ return getOldestEntry();
+ }
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/store/VirtualHostStoreUpgraderAndRecoverer.java Thu Feb 23 17:11:04 2017
@@ -607,16 +607,89 @@ public class VirtualHostStoreUpgraderAnd
}
else if("Queue".equals(record.getType()))
{
- _queues.put(record.getId(), (String) record.getAttributes().get("name"));
- if(record.getAttributes().containsKey("bindings"))
+ Map<String, Object> attributes = new HashMap<>(record.getAttributes());
+ Object queueFlowControlSizeBytes = attributes.remove("queueFlowControlSizeBytes");
+ Object queueFlowResumeSizeBytes = attributes.remove("queueFlowResumeSizeBytes");
+ if (queueFlowControlSizeBytes != null)
{
- _queueBindings.put(String.valueOf(record.getAttributes().get("name")),
- (List<Map<String, Object>>) record.getAttributes().get("bindings"));
- Map<String, Object> updatedAttributes = new HashMap<>(record.getAttributes());
- updatedAttributes.remove("bindings");
- getUpdateMap().put(record.getId(), new ConfiguredObjectRecordImpl(record.getId(), record.getType(), updatedAttributes, record.getParents()));
+ long queueFlowControlSizeBytesValue = convertAttributeValueToLong("queueFlowControlSizeBytes",
+ queueFlowControlSizeBytes);
+ if (queueFlowControlSizeBytesValue > 0)
+ {
+ if (queueFlowResumeSizeBytes != null)
+ {
+ long queueFlowResumeSizeBytesValue =
+ convertAttributeValueToLong("queueFlowResumeSizeBytes", queueFlowResumeSizeBytes);
+ double ratio = ((double) queueFlowResumeSizeBytesValue)
+ / ((double) queueFlowControlSizeBytesValue);
+ String flowResumeLimit = String.format("%.2f", ratio * 100.0);
+
+ Object context = attributes.get("context");
+ Map<String, String> contextMap;
+ if (context instanceof Map)
+ {
+ contextMap = (Map) context;
+ }
+ else
+ {
+ contextMap = new HashMap<>();
+ attributes.put("context", contextMap);
+ }
+ contextMap.put("queue.queueFlowResumeLimit", flowResumeLimit);
+ }
+ attributes.put("overflowPolicy", "ProducerFlowControl");
+ attributes.put("maximumQueueDepthBytes", queueFlowControlSizeBytes);
+ }
+ }
+
+ if(attributes.containsKey("bindings"))
+ {
+ _queueBindings.put(String.valueOf(attributes.get("name")),
+ (List<Map<String, Object>>) attributes.get("bindings"));
+ attributes.remove("bindings");
+ }
+
+ _queues.put(record.getId(), (String) attributes.get("name"));
+
+ if (!attributes.equals(new HashMap<>(record.getAttributes())))
+ {
+ getUpdateMap().put(record.getId(),
+ new ConfiguredObjectRecordImpl(record.getId(),
+ record.getType(),
+ attributes,
+ record.getParents()));
+ }
+ }
+ }
+
+ private long convertAttributeValueToLong(final String attributeName,
+ final Object attributeValue)
+ {
+ long value;
+ if (attributeValue instanceof Number)
+ {
+ value = ((Number) attributeValue).longValue();
+ }
+ else if (attributeValue instanceof String)
+ {
+ try
+ {
+ value = Long.parseLong((String) attributeValue);
+ }
+ catch (Exception e)
+ {
+ throw new IllegalConfigurationException(String.format(
+ "Cannot evaluate '%s': %s",
+ attributeName, attributeValue));
}
}
+ else
+ {
+ throw new IllegalConfigurationException(String.format("Cannot evaluate '%s': %s",
+ attributeName,
+ String.valueOf(attributeValue)));
+ }
+ return value;
}
@Override
Modified: qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java?rev=1784178&r1=1784177&r2=1784178&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java (original)
+++ qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java Thu Feb 23 17:11:04 2017
@@ -845,8 +845,8 @@ abstract class AbstractQueueTestBase ext
public void testNoneOverflowPolicy()
{
Map<String,Object> attributes = new HashMap<>(_arguments);
- attributes.put(Queue.MAX_COUNT, 2);
- attributes.put(Queue.MAX_SIZE, 100);
+ attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 2);
+ attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 100);
Queue<?> queue = getQueue();
queue.setAttributes(attributes);
@@ -865,19 +865,13 @@ abstract class AbstractQueueTestBase ext
assertEquals("Wrong size of messages in queue",300, queue.getQueueDepthBytesIncludingHeader());
assertEquals("Wrong oldest message", 10l,
((AbstractQueue) queue).getEntries().getOldestEntry().getMessage().getArrivalTime());
- queue.clearQueue();
-
- attributes = new HashMap<>(_arguments);
- attributes.put(Queue.MAX_COUNT, Queue.DEFAULT_MAX_COUNT);
- attributes.put(Queue.MAX_SIZE, Queue.DEFAULT_MAX_SIZE);
- queue.setAttributes(attributes);
}
public void testRingOverflowPolicyMaxCount()
{
Map<String,Object> attributes = new HashMap<>(_arguments);
attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING);
- attributes.put(Queue.MAX_COUNT, 4);
+ attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 4);
Queue<?> queue = getQueue();
queue.setAttributes(attributes);
@@ -902,20 +896,14 @@ abstract class AbstractQueueTestBase ext
assertEquals("Wrong size of messages in queue",80, queue.getQueueDepthBytesIncludingHeader());
assertEquals("Wrong oldest message", 50l,
((AbstractQueue) queue).getEntries().getOldestEntry().getMessage().getArrivalTime());
- queue.clearQueue();
-
- attributes = new HashMap<>(_arguments);
- attributes.put(Queue.OVERFLOW_POLICY, Queue.DEFAULT_POLICY_TYPE);
- attributes.put(Queue.MAX_COUNT, Queue.DEFAULT_MAX_COUNT);
- queue.setAttributes(attributes);
}
public void testRingOverflowPolicyMaxSize()
{
Map<String,Object> attributes = new HashMap<>(_arguments);
attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING);
- attributes.put(Queue.MAX_COUNT, 4);
- attributes.put(Queue.MAX_SIZE, 100);
+ attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 4);
+ attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 100);
Queue<?> queue = getQueue();
queue.setAttributes(attributes);
@@ -944,20 +932,13 @@ abstract class AbstractQueueTestBase ext
assertEquals("Wrong size of messages in queue",90, queue.getQueueDepthBytesIncludingHeader());
assertEquals("Wrong oldest message", 200l,
((AbstractQueue) queue).getEntries().getOldestEntry().getMessage().getArrivalTime());
- queue.clearQueue();
-
- attributes = new HashMap<>(_arguments);
- attributes.put(Queue.OVERFLOW_POLICY, Queue.DEFAULT_POLICY_TYPE);
- attributes.put(Queue.MAX_COUNT, Queue.DEFAULT_MAX_COUNT);
- attributes.put(Queue.MAX_SIZE, Queue.DEFAULT_MAX_SIZE);
- queue.setAttributes(attributes);
}
public void testRingOverflowPolicyMessagesRejected()
{
Map<String,Object> attributes = new HashMap<>(_arguments);
attributes.put(Queue.OVERFLOW_POLICY, OverflowPolicy.RING);
- attributes.put(Queue.MAX_COUNT, 0);
+ attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 0);
Queue<?> queue = getQueue();
queue.setAttributes(attributes);
@@ -967,26 +948,20 @@ abstract class AbstractQueueTestBase ext
message = createMessage(new Long(27), 20, 10);
result = queue.route(message, message.getInitialRoutingAddress(), null);
- assertTrue("Result should include routing failure", result.isRoutingFailure());
+ assertTrue("Result should include not accepting route", result.hasNotAcceptingRoutableQueue());
int headerSize = 20;
int payloadSize = 10;
int id = 28;
attributes = new HashMap<>(_arguments);
- attributes.put(Queue.MAX_COUNT, 10);
- attributes.put(Queue.MAX_SIZE, 10);
+ attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_MESSAGES, 10);
+ attributes.put(Queue.MAXIMUM_QUEUE_DEPTH_BYTES, 10);
queue.setAttributes(attributes);
message = createMessage(new Long(id), headerSize, payloadSize);
result = queue.route(message, message.getInitialRoutingAddress(), null);
- assertTrue("Result should include routing failure", result.isRoutingFailure());
-
- attributes = new HashMap<>(_arguments);
- attributes.put(Queue.OVERFLOW_POLICY, Queue.DEFAULT_POLICY_TYPE);
- attributes.put(Queue.MAX_COUNT, Queue.DEFAULT_MAX_COUNT);
- attributes.put(Queue.MAX_SIZE, Queue.DEFAULT_MAX_SIZE);
- queue.setAttributes(attributes);
+ assertTrue("Result should include not accepting route", result.hasNotAcceptingRoutableQueue());
}
private long getExpirationOnQueue(final Queue<?> queue, long arrivalTime, long expiration)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org