You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/08/26 03:29:55 UTC

[activemq-artemis] branch master updated: ARTEMIS-2457 implement ring queue

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 51c2022  ARTEMIS-2457 implement ring queue
     new 16adc8d  This closes #2802
51c2022 is described below

commit 51c2022f388a5c70adbc5de6b799f9072960c2f1
Author: Justin Bertram <jb...@apache.org>
AuthorDate: Mon Aug 5 07:21:23 2019 -0500

    ARTEMIS-2457 implement ring queue
---
 .../activemq/artemis/api/core/QueueAttributes.java |  13 +
 .../apache/activemq/artemis/logs/AuditLogger.java  |  10 +-
 .../org/apache/activemq/artemis/utils/Wait.java    |  13 +-
 .../api/config/ActiveMQDefaultConfiguration.java   |   6 +
 .../artemis/api/core/client/ClientSession.java     |   2 +
 .../api/core/management/ActiveMQServerControl.java |  89 ++++++
 .../artemis/api/core/management/QueueControl.java  |   6 +
 .../artemis/core/client/impl/QueueQueryImpl.java   |  13 +-
 .../protocol/core/impl/ActiveMQSessionContext.java |   2 +-
 .../impl/wireformat/CreateQueueMessage_V2.java     |  29 +-
 .../SessionQueueQueryResponseMessage_V3.java       |  19 +-
 .../artemis/core/server/QueueQueryResult.java      |  11 +-
 .../core/config/CoreQueueConfiguration.java        |  14 +
 .../deployers/impl/FileConfigurationParser.java    |  10 +-
 .../management/impl/ActiveMQServerControlImpl.java |  72 ++++-
 .../core/management/impl/QueueControlImpl.java     |  15 +
 .../core/paging/cursor/PagedReferenceImpl.java     |  10 +
 .../artemis/core/persistence/QueueBindingInfo.java |   2 +
 .../journal/AbstractJournalStorageManager.java     |   2 +-
 .../codec/PersistentQueueBindingEncoding.java      |  20 +-
 .../artemis/core/postoffice/PostOffice.java        |  16 +
 .../core/postoffice/impl/PostOfficeImpl.java       |  23 ++
 .../protocol/core/ServerSessionPacketHandler.java  |   2 +-
 .../artemis/core/server/ActiveMQServer.java        |  31 ++
 .../artemis/core/server/MessageReference.java      |   4 +
 .../apache/activemq/artemis/core/server/Queue.java |   6 +
 .../activemq/artemis/core/server/QueueConfig.java  |  20 +-
 .../artemis/core/server/ServerSession.java         |  23 ++
 .../core/server/impl/ActiveMQServerImpl.java       | 113 ++++++-
 .../server/impl/GroupFirstMessageReference.java    |  10 +
 .../artemis/core/server/impl/LastValueQueue.java   |  10 +
 .../core/server/impl/MessageReferenceImpl.java     |  12 +
 .../core/server/impl/PostOfficeJournalLoader.java  |   3 +-
 .../artemis/core/server/impl/QueueFactoryImpl.java |   2 +-
 .../artemis/core/server/impl/QueueImpl.java        | 189 +++++++++---
 ...essageMetrics.java => QueueMessageMetrics.java} |  37 ++-
 .../server/impl/ScheduledDeliveryHandlerImpl.java  |   4 +-
 .../core/server/impl/ServerSessionImpl.java        |  46 ++-
 .../core/settings/impl/AddressSettings.java        |  33 +-
 .../resources/schema/artemis-configuration.xsd     |  10 +
 .../core/config/impl/FileConfigurationTest.java    |   4 +
 .../impl/journal/QueueBindingEncodingTest.java     | 106 +++++++
 .../server/impl/ScheduledDeliveryHandlerTest.java  |  15 +
 .../resources/ConfigurationTest-full-config.xml    |   3 +-
 ...rationTest-xinclude-config-address-settings.xml |   1 +
 ...ConfigurationTest-xinclude-config-addresses.xml |   2 +-
 .../src/test/resources/artemis-configuration.xsd   |  10 +
 docs/user-manual/en/SUMMARY.md                     |   1 +
 docs/user-manual/en/address-model.md               |   5 +
 docs/user-manual/en/configuration-index.md         |   2 +
 docs/user-manual/en/ring-queues.md                 | 187 +++++++++++
 .../tests/integration/client/UpdateQueueTest.java  |  15 +-
 .../management/ActiveMQServerControlTest.java      |  60 ++++
 .../ActiveMQServerControlUsingCoreTest.java        |  10 +
 .../management/QueueControlUsingCoreTest.java      |   9 +
 .../tests/integration/server/RingQueueTest.java    | 343 +++++++++++++++++++++
 .../tests/unit/core/postoffice/impl/FakeQueue.java |  16 +
 .../core/server/impl/fakes/FakePostOffice.java     |  19 ++
 58 files changed, 1655 insertions(+), 105 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
index 17da20f..24f9f34 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
@@ -39,6 +39,7 @@ public class QueueAttributes implements Serializable {
    public static final String AUTO_DELETE = "auto-delete";
    public static final String AUTO_DELETE_DELAY = "auto-delete-delay";
    public static final String AUTO_DELETE_MESSAGE_COUNT = "auto-delete-message-count";
+   public static final String RING_SIZE = "ring-size";
 
    private RoutingType routingType;
    private SimpleString filterString;
@@ -58,6 +59,7 @@ public class QueueAttributes implements Serializable {
    private Boolean autoDelete;
    private Long autoDeleteDelay;
    private Long autoDeleteMessageCount;
+   private Long ringSize;
 
 
    public void set(String key, String value) {
@@ -98,6 +100,8 @@ public class QueueAttributes implements Serializable {
             setAutoDeleteDelay(Long.valueOf(value));
          } else if (key.equals(AUTO_DELETE_MESSAGE_COUNT)) {
             setAutoDeleteMessageCount(Long.valueOf(value));
+         } else if (key.equals(RING_SIZE)) {
+            setRingSize(Long.valueOf(value));
          }
       }
    }
@@ -263,4 +267,13 @@ public class QueueAttributes implements Serializable {
       this.autoDeleteMessageCount = autoDeleteMessageCount;
       return this;
    }
+
+   public Long getRingSize() {
+      return ringSize;
+   }
+
+   public QueueAttributes setRingSize(Long ringSize) {
+      this.ringSize = ringSize;
+      return this;
+   }
 }
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
index 4399060..cbea7b9 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/logs/AuditLogger.java
@@ -2265,8 +2265,16 @@ public interface AuditLogger extends BasicLogger {
    }
 
    @LogMessage(level = Logger.Level.INFO)
-   @Message(id = 601501, value = "User {0} is getting messages acknowledged attemps on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
+   @Message(id = 601501, value = "User {0} is getting messages acknowledged attempts on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
    void getAcknowledgeAttempts(String user, Object source, Object... args);
 
+   static void getRingSize(Object source, Object... args) {
+      LOGGER.getRingSize(getCaller(), source, arrayToString(args));
+   }
+
+   @LogMessage(level = Logger.Level.INFO)
+   @Message(id = 601502, value = "User {0} is getting ring size on target resource: {1} {2}", format = Message.Format.MESSAGE_FORMAT)
+   void getRingSize(String user, Object source, Object... args);
+
 
 }
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
index c6405d2..870fbf2 100644
--- a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/Wait.java
@@ -28,6 +28,7 @@ public class Wait {
 
    public static final long MAX_WAIT_MILLIS = 30 * 1000;
    public static final int SLEEP_MILLIS = 1000;
+   public static final String DEFAULT_FAILURE_MESSAGE = "Condition wasn't met";
 
    public interface Condition {
 
@@ -81,7 +82,7 @@ public class Wait {
    }
 
    public static void assertTrue(Condition condition) {
-      assertTrue("Condition wasn't met", condition);
+      assertTrue(DEFAULT_FAILURE_MESSAGE, condition);
    }
 
    public static void assertFalse(Condition condition) throws Exception {
@@ -92,6 +93,14 @@ public class Wait {
       assertTrue(failureMessage, () -> !condition.isSatisfied());
    }
 
+   public static void assertFalse(String failureMessage, Condition condition, final long duration) {
+      assertTrue(failureMessage, () -> !condition.isSatisfied(), duration, SLEEP_MILLIS);
+   }
+
+   public static void assertFalse(Condition condition, final long duration, final long sleep) {
+      assertTrue(DEFAULT_FAILURE_MESSAGE, () -> !condition.isSatisfied(), duration, sleep);
+   }
+
 
    public static void assertTrue(String failureMessage, Condition condition) {
       assertTrue(failureMessage, condition, MAX_WAIT_MILLIS);
@@ -102,7 +111,7 @@ public class Wait {
    }
 
    public static void assertTrue(Condition condition, final long duration, final long sleep) throws Exception {
-      assertTrue("condition not met", condition, duration, sleep);
+      assertTrue(DEFAULT_FAILURE_MESSAGE, condition, duration, sleep);
    }
 
 
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 30f4246..d41e8d3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -521,6 +521,8 @@ public final class ActiveMQDefaultConfiguration {
 
    public static final long DEFAULT_QUEUE_AUTO_DELETE_MESSAGE_COUNT = 0;
 
+   public static final long DEFAULT_RING_SIZE = -1;
+
    public static final int DEFAULT_CONSUMERS_BEFORE_DISPATCH = 0;
 
    public static final long DEFAULT_DELAY_BEFORE_DISPATCH = -1;
@@ -1431,6 +1433,10 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_QUEUE_AUTO_DELETE_MESSAGE_COUNT;
    }
 
+   public static long getDefaultRingSize() {
+      return DEFAULT_RING_SIZE;
+   }
+
    public static int getDefaultConsumersBeforeDispatch() {
       return DEFAULT_CONSUMERS_BEFORE_DISPATCH;
    }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index f0f988c..59b8869 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -170,6 +170,8 @@ public interface ClientSession extends XAResource, AutoCloseable {
       Long getAutoDeleteDelay();
 
       Long getAutoDeleteMessageCount();
+
+      Long getRingSize();
    }
 
    // Lifecycle operations ------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 059f786..4516cca 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -712,6 +712,52 @@ public interface ActiveMQServerControl {
     * @param durable            is the queue durable?
     * @param maxConsumers       the maximum number of consumers allowed on this queue at any one time
     * @param purgeOnNoConsumers delete this queue when the last consumer disconnects
+    * @param exclusive          if the queue should route exclusively to one consumer
+    * @param lastValue          use last-value semantics
+    * @param consumersBeforeDispatch number of consumers needed before dispatch can start
+    * @param delayBeforeDispatch delay to wait before dispatching if number of consumers before dispatch is not met
+    * @param autoCreateAddress  create an address with default values should a matching address not be found
+    * @param ringSize           the size this queue should maintain according to ring semantics
+    * @return a textual summary of the queue
+    * @throws Exception
+    */
+   @Operation(desc = "Create a queue", impact = MBeanOperationInfo.ACTION)
+   String createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
+                      @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
+                      @Parameter(name = "name", desc = "Name of the queue") String name,
+                      @Parameter(name = "filter", desc = "Filter of the queue") String filterStr,
+                      @Parameter(name = "durable", desc = "Is the queue durable?") boolean durable,
+                      @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
+                      @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean purgeOnNoConsumers,
+                      @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") boolean exclusive,
+                      @Parameter(name = "groupRebalance", desc = "If the queue should rebalance groups when a consumer is added") boolean groupRebalance,
+                      @Parameter(name = "groupBuckets", desc = "Number of buckets that should be used for message groups, -1 (default) is unlimited, and groups by raw key instead") int groupBuckets,
+                      @Parameter(name = "groupFirstKey", desc = "Key used to mark a message is first in a group for a consumer") String groupFirstKey,
+                      @Parameter(name = "lastValue", desc = "Use last-value semantics") boolean lastValue,
+                      @Parameter(name = "lastValueKey", desc = "Use the specified property key for the last value") String lastValueKey,
+                      @Parameter(name = "nonDestructive", desc = "If the queue is non-destructive") boolean nonDestructive,
+                      @Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") int consumersBeforeDispatch,
+                      @Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") long delayBeforeDispatch,
+                      @Parameter(name = "autoDelete", desc = "If the queue should be deleted once no consumers") boolean autoDelete,
+                      @Parameter(name = "autoDeleteDelay", desc = "How long to wait (in milliseconds) before deleting auto-created queues after the queue has 0 consumers") long autoDeleteDelay,
+                      @Parameter(name = "autoDeleteMessageCount", desc = "The message count the queue must be at or below before it can be evaluated to be auto deleted, 0 waits until empty queue (default) and -1 disables this check") long autoDeleteMessageCount,
+                      @Parameter(name = "autoCreateAddress", desc = "Create an address with default values should a matching address not be found") boolean autoCreateAddress,
+                      @Parameter(name = "ringSize", desc = "The size this queue should maintain according to ring semantics") long ringSize) throws Exception;
+
+   /**
+    * Create a queue.
+    * <br>
+    * If {@code address} is {@code null} it will be defaulted to {@code name}.
+    * <br>
+    * This method throws a {@link org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException}) exception if the queue already exits.
+    *
+    * @param address            address to bind the queue to
+    * @param routingType        the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
+    * @param name               name of the queue
+    * @param filterStr          filter of the queue
+    * @param durable            is the queue durable?
+    * @param maxConsumers       the maximum number of consumers allowed on this queue at any one time
+    * @param purgeOnNoConsumers delete this queue when the last consumer disconnects
     * @param autoCreateAddress  create an address with default values should a matching address not be found
     * @return a textual summary of the queue
     * @throws Exception
@@ -788,9 +834,12 @@ public interface ActiveMQServerControl {
     *
     * @param name               name of the queue
     * @param routingType        the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
+    * @param filter             the filter to use on the queue
     * @param maxConsumers       the maximum number of consumers allowed on this queue at any one time
     * @param purgeOnNoConsumers delete this queue when the last consumer disconnects
     * @param exclusive          if the queue should route exclusively to one consumer
+    * @param groupRebalance     if the queue should rebalance groups when a consumer is added
+    * @param groupBuckets       number of buckets that should be used for message groups, -1 (default) is unlimited, and groups by raw key instead
     * @param nonDestructive     If the queue is non-destructive
     * @param consumersBeforeDispatch number of consumers needed before dispatch can start
     * @param delayBeforeDispatch delay to wait before dispatching if number of consumers before dispatch is not met
@@ -817,9 +866,13 @@ public interface ActiveMQServerControl {
     *
     * @param name               name of the queue
     * @param routingType        the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
+    * @param filter             the filter to use on the queue
     * @param maxConsumers       the maximum number of consumers allowed on this queue at any one time
     * @param purgeOnNoConsumers delete this queue when the last consumer disconnects
     * @param exclusive          if the queue should route exclusively to one consumer
+    * @param groupRebalance     if the queue should rebalance groups when a consumer is added
+    * @param groupBuckets       number of buckets that should be used for message groups, -1 (default) is unlimited, and groups by raw key instead
+    * @param groupFirstKey      key used to mark a message is first in a group for a consumer
     * @param nonDestructive     If the queue is non-destructive
     * @param consumersBeforeDispatch number of consumers needed before dispatch can start
     * @param delayBeforeDispatch delay to wait before dispatching if number of consumers before dispatch is not met
@@ -843,6 +896,42 @@ public interface ActiveMQServerControl {
                       @Parameter(name = "user", desc = "The user associated with this queue") String user) throws Exception;
 
    /**
+    * Update a queue
+    *
+    * @param name               name of the queue
+    * @param routingType        the routing type used for this address, {@code MULTICAST} or {@code ANYCAST}
+    * @param filter             the filter to use on the queue
+    * @param maxConsumers       the maximum number of consumers allowed on this queue at any one time
+    * @param purgeOnNoConsumers delete this queue when the last consumer disconnects
+    * @param exclusive          if the queue should route exclusively to one consumer
+    * @param groupRebalance     if the queue should rebalance groups when a consumer is added
+    * @param groupBuckets       number of buckets that should be used for message groups, -1 (default) is unlimited, and groups by raw key instead
+    * @param groupFirstKey      key used to mark a message is first in a group for a consumer
+    * @param nonDestructive     If the queue is non-destructive
+    * @param consumersBeforeDispatch number of consumers needed before dispatch can start
+    * @param delayBeforeDispatch delay to wait before dispatching if number of consumers before dispatch is not met
+    * @param user               the user associated with this queue
+    * @param ringSize           the size this queue should maintain according to ring semantics
+    * @return
+    * @throws Exception
+    */
+   @Operation(desc = "Update a queue", impact = MBeanOperationInfo.ACTION)
+   String updateQueue(@Parameter(name = "name", desc = "Name of the queue") String name,
+                      @Parameter(name = "routingType", desc = "The routing type used for this address, MULTICAST or ANYCAST") String routingType,
+                      @Parameter(name = "filter", desc = "The filter to use on the queue") String filter,
+                      @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
+                      @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
+                      @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive,
+                      @Parameter(name = "groupRebalance", desc = "If the queue should rebalance groups when a consumer is added") Boolean groupRebalance,
+                      @Parameter(name = "groupBuckets", desc = "Number of buckets that should be used for message groups, -1 (default) is unlimited, and groups by raw key instead") Integer groupBuckets,
+                      @Parameter(name = "groupFirstKey", desc = "Key used to mark a message is first in a group for a consumer") String groupFirstKey,
+                      @Parameter(name = "nonDestructive", desc = "If the queue is non-destructive") Boolean nonDestructive,
+                      @Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") Integer consumersBeforeDispatch,
+                      @Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") Long delayBeforeDispatch,
+                      @Parameter(name = "user", desc = "The user associated with this queue") String user,
+                      @Parameter(name = "ringSize", desc = "the size this queue should maintain according to ring semantics") Long ringSize) throws Exception;
+
+   /**
     * Deploy a durable queue.
     * <br>
     * If {@code address} is {@code null} it will be defaulted to {@code name}.
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
index bc4f380..192815d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/QueueControl.java
@@ -664,4 +664,10 @@ public interface QueueControl {
    @Operation(desc = "List all the existent group to consumers mappings on the Queue")
    String listGroupsAsJSON() throws Exception;
 
+   /**
+    * Will return the ring size.
+    */
+   @Attribute(desc = "Get the ring size")
+   long getRingSize();
+
 }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
index 7213313..93df02a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
@@ -72,6 +72,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
 
    private final Long autoDeleteMessageCount;
 
+   private final Long ringSize;
+
 
    private final Integer defaultConsumerWindowSize;
 
@@ -158,7 +160,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
                          final Long autoDeleteDelay,
                          final Long autoDeleteMessageCount,
                          final Integer defaultConsumerWindowSize) {
-      this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize);
+      this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, groupRebalance, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, defaultConsumerWindowSize, null);
    }
 
    public QueueQueryImpl(final boolean durable,
@@ -186,7 +188,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
                          final Boolean autoDelete,
                          final Long autoDeleteDelay,
                          final Long autoDeleteMessageCount,
-                         final Integer defaultConsumerWindowSize) {
+                         final Integer defaultConsumerWindowSize,
+                         final Long ringSize) {
       this.durable = durable;
       this.temporary = temporary;
       this.consumerCount = consumerCount;
@@ -213,6 +216,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
       this.autoDeleteDelay = autoDeleteDelay;
       this.autoDeleteMessageCount = autoDeleteMessageCount;
       this.defaultConsumerWindowSize = defaultConsumerWindowSize;
+      this.ringSize = ringSize;
    }
 
    @Override
@@ -344,5 +348,10 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
    public Long getAutoDeleteMessageCount() {
       return autoDeleteMessageCount;
    }
+
+   @Override
+   public Long getRingSize() {
+      return ringSize;
+   }
 }
 
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index 1709a3a..26c405c 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -877,7 +877,7 @@ public class ActiveMQSessionContext extends SessionContext {
       // We try to recreate any non-durable or auto-created queues, since they might not be there on failover/reconnect.
       // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover/reconnection
       if (!queueInfo.isDurable() || queueInfo.isAutoCreated()) {
-         CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isGroupRebalance(), queueInfo.getGroupBuckets(), queueInfo.getGroupFirstKey(), queueInfo.isLastValue(), queueInfo.getLastValueKey(), [...]
+         CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isGroupRebalance(), queueInfo.getGroupBuckets(), queueInfo.getGroupFirstKey(), queueInfo.isLastValue(), queueInfo.getLastValueKey(), [...]
 
          sendPacketWithoutLock(sessionChannel, createQueueRequest);
       }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
index 0c84267..b071afa 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
@@ -56,6 +56,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
 
    private Long autoDeleteMessageCount;
 
+   private Long ringSize;
+
    public CreateQueueMessage_V2(final SimpleString address,
                                 final SimpleString queueName,
                                 final boolean temporary,
@@ -84,7 +86,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
          queueAttributes.getDelayBeforeDispatch(),
          queueAttributes.getAutoDelete(),
          queueAttributes.getAutoDeleteDelay(),
-         queueAttributes.getAutoDeleteMessageCount()
+         queueAttributes.getAutoDeleteMessageCount(),
+         queueAttributes.getRingSize()
       );
    }
 
@@ -109,7 +112,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
                                 final Long delayBeforeDispatch,
                                 final Boolean autoDelete,
                                 final Long autoDeleteDelay,
-                                final Long autoDeleteMessageCount) {
+                                final Long autoDeleteMessageCount,
+                                final Long ringSize) {
       this();
 
       this.address = address;
@@ -134,6 +138,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       this.autoDelete = autoDelete;
       this.autoDeleteDelay = autoDeleteDelay;
       this.autoDeleteMessageCount = autoDeleteMessageCount;
+      this.ringSize = ringSize;
    }
 
    public CreateQueueMessage_V2() {
@@ -161,6 +166,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       buff.append(", autoDelete=" + autoDelete);
       buff.append(", autoDeleteDelay=" + autoDeleteDelay);
       buff.append(", autoDeleteMessageCount=" + autoDeleteMessageCount);
+      buff.append(", ringSize=" + ringSize);
 
       buff.append("]");
       return buff.toString();
@@ -294,6 +300,14 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       this.autoDeleteMessageCount = autoDeleteMessageCount;
    }
 
+   public Long getRingSize() {
+      return ringSize;
+   }
+
+   public void setRingSize(Long ringSize) {
+      this.ringSize = ringSize;
+   }
+
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
@@ -313,7 +327,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       BufferHelper.writeNullableLong(buffer, autoDeleteDelay);
       BufferHelper.writeNullableLong(buffer, autoDeleteMessageCount);
       buffer.writeNullableSimpleString(groupFirstKey);
-
+      BufferHelper.writeNullableLong(buffer, ringSize);
    }
 
    @Override
@@ -341,6 +355,9 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       if (buffer.readableBytes() > 0) {
          groupFirstKey = buffer.readNullableSimpleString();
       }
+      if (buffer.readableBytes() > 0) {
+         ringSize = BufferHelper.readNullableLong(buffer);
+      }
    }
 
    @Override
@@ -363,6 +380,7 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       result = prime * result + (autoDelete == null ? 0 : autoDelete.hashCode());
       result = prime * result + (autoDeleteDelay == null ? 0 : autoDeleteDelay.hashCode());
       result = prime * result + (autoDeleteMessageCount == null ? 0 : autoDeleteMessageCount.hashCode());
+      result = prime * result + (ringSize == null ? 0 : ringSize.hashCode());
       return result;
    }
 
@@ -441,6 +459,11 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
             return false;
       } else if (!autoDeleteMessageCount.equals(other.autoDeleteMessageCount))
          return false;
+      if (ringSize == null) {
+         if (other.ringSize != null)
+            return false;
+      } else if (!ringSize.equals(other.ringSize))
+         return false;
       if (routingType == null) {
          if (other.routingType != null)
             return false;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
index bf7ab05..f736172 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
@@ -60,6 +60,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
 
    protected Integer defaultConsumerWindowSize;
 
+   private Long ringSize;
+
    public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
       this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.getGroupBuckets(), result.getGroupFirstKey(), result.isLastValue(), result.getLastValueKey(), result.isNonDestr [...]
    }
@@ -273,6 +275,10 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       return autoDeleteMessageCount;
    }
 
+   public Long getRingSize() {
+      return ringSize;
+   }
+
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
@@ -293,6 +299,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       BufferHelper.writeNullableLong(buffer, autoDeleteDelay);
       BufferHelper.writeNullableLong(buffer, autoDeleteMessageCount);
       buffer.writeNullableSimpleString(groupFirstKey);
+      BufferHelper.writeNullableLong(buffer, ringSize);
 
    }
 
@@ -324,6 +331,9 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       if (buffer.readableBytes() > 0) {
          groupFirstKey = buffer.readNullableSimpleString();
       }
+      if (buffer.readableBytes() > 0) {
+         ringSize = BufferHelper.readNullableLong(buffer);
+      }
    }
 
    @Override
@@ -347,6 +357,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       result = prime * result + (autoDeleteDelay == null ? 0 : autoDeleteDelay.hashCode());
       result = prime * result + (autoDeleteMessageCount == null ? 0 : autoDeleteMessageCount.hashCode());
       result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
+      result = prime * result + (ringSize == null ? 0 : ringSize.hashCode());
       return result;
    }
 
@@ -377,12 +388,13 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       buff.append(", autoDeleteDelay=" + autoDeleteDelay);
       buff.append(", autoDeleteMessageCount=" + autoDeleteMessageCount);
       buff.append(", defaultConsumerWindowSize=" + defaultConsumerWindowSize);
+      buff.append(", ringSize=" + ringSize);
       return buff.toString();
    }
 
    @Override
    public ClientSession.QueueQuery toQueueQuery() {
-      return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMe [...]
+      return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), getGroupBuckets(), getGroupFirstKey(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), isAutoDelete(), getAutoDeleteDelay(), getAutoDeleteMe [...]
    }
 
    @Override
@@ -458,6 +470,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
             return false;
       } else if (!autoDeleteMessageCount.equals(other.autoDeleteMessageCount))
          return false;
+      if (ringSize == null) {
+         if (other.ringSize != null)
+            return false;
+      } else if (!ringSize.equals(other.ringSize))
+         return false;
       if (defaultConsumerWindowSize == null) {
          if (other.defaultConsumerWindowSize != null)
             return false;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
index 5493f8e..f27966d 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
@@ -73,6 +73,8 @@ public class QueueQueryResult {
 
    private Integer defaultConsumerWindowSize;
 
+   private Long ringSize;
+
    public QueueQueryResult(final SimpleString name,
                            final SimpleString address,
                            final boolean durable,
@@ -98,7 +100,8 @@ public class QueueQueryResult {
                            final Boolean autoDelete,
                            final Long autoDeleteDelay,
                            final Long autoDeleteMessageCount,
-                           final Integer defaultConsumerWindowSize) {
+                           final Integer defaultConsumerWindowSize,
+                           final Long ringSize) {
       this.durable = durable;
 
       this.temporary = temporary;
@@ -150,6 +153,8 @@ public class QueueQueryResult {
       this.autoDeleteMessageCount = autoDeleteMessageCount;
 
       this.defaultConsumerWindowSize = defaultConsumerWindowSize;
+
+      this.ringSize = ringSize;
    }
 
    public boolean isExists() {
@@ -259,4 +264,8 @@ public class QueueQueryResult {
    public Long getAutoDeleteMessageCount() {
       return autoDeleteMessageCount;
    }
+
+   public Long getRingSize() {
+      return ringSize;
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
index 07ce8a6..b7876f7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
@@ -55,6 +55,8 @@ public class CoreQueueConfiguration implements Serializable {
 
    private Long delayBeforeDispatch;
 
+   private Long ringSize = ActiveMQDefaultConfiguration.getDefaultRingSize();
+
    private Boolean purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
 
    private RoutingType routingType = ActiveMQDefaultConfiguration.getDefaultRoutingType();
@@ -118,6 +120,10 @@ public class CoreQueueConfiguration implements Serializable {
       return delayBeforeDispatch;
    }
 
+   public Long getRingSize() {
+      return ringSize;
+   }
+
    /**
     * @param address the address to set
     */
@@ -175,6 +181,14 @@ public class CoreQueueConfiguration implements Serializable {
    }
 
    /**
+    * @param ringSize for this queue, default is -1
+    */
+   public CoreQueueConfiguration setRingSize(Long ringSize) {
+      this.ringSize = ringSize;
+      return this;
+   }
+
+   /**
     * @param purgeOnNoConsumers delete this queue when consumer count reaches 0, default is false
     */
    public CoreQueueConfiguration setPurgeOnNoConsumers(Boolean purgeOnNoConsumers) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 8d016ef..3b9062d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -264,6 +264,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
    private static final String DEFAULT_CONSUMER_WINDOW_SIZE = "default-consumer-window-size";
 
+   private static final String DEFAULT_RING_SIZE = "default-ring-size";
+
 
    // Attributes ----------------------------------------------------
 
@@ -1158,6 +1160,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
             addressSettings.setDefaultAddressRoutingType(routingType);
          } else if (DEFAULT_CONSUMER_WINDOW_SIZE.equalsIgnoreCase(name)) {
             addressSettings.setDefaultConsumerWindowSize(XMLUtil.parseInt(child));
+         } else if (DEFAULT_RING_SIZE.equalsIgnoreCase(name)) {
+            addressSettings.setDefaultRingSize(XMLUtil.parseLong(child));
          }
       }
       return setting;
@@ -1203,6 +1207,7 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
       Boolean nonDestructive = null;
       Integer consumersBeforeDispatch = null;
       Long delayBeforeDispatch = null;
+      Long ringSize = ActiveMQDefaultConfiguration.getDefaultRingSize();
 
       NamedNodeMap attributes = node.getAttributes();
       for (int i = 0; i < attributes.getLength(); i++) {
@@ -1230,6 +1235,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
             consumersBeforeDispatch = Integer.parseInt(item.getNodeValue());
          } else if (item.getNodeName().equals("delay-before-dispatch")) {
             delayBeforeDispatch = Long.parseLong(item.getNodeValue());
+         } else if (item.getNodeName().equals("ring-size")) {
+            ringSize = Long.parseLong(item.getNodeValue());
          }
       }
 
@@ -1264,7 +1271,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
               .setLastValueKey(lastValueKey)
               .setNonDestructive(nonDestructive)
               .setConsumersBeforeDispatch(consumersBeforeDispatch)
-              .setDelayBeforeDispatch(delayBeforeDispatch);
+              .setDelayBeforeDispatch(delayBeforeDispatch)
+              .setRingSize(ringSize);
    }
 
    protected CoreAddressConfiguration parseAddressConfiguration(final Node node) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index f4ad3b5..4c8c78a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -1068,11 +1068,59 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                              long autoDeleteDelay,
                              long autoDeleteMessageCount,
                              boolean autoCreateAddress) throws Exception {
+      AddressSettings addressSettings = server.getAddressSettingsRepository().getMatch(address == null ? name : address);
+      return createQueue(
+         address,
+         routingType,
+         name,
+         filterStr,
+         durable,
+         maxConsumers,
+         purgeOnNoConsumers,
+         exclusive,
+         groupRebalance,
+         groupBuckets,
+         groupFirstKey,
+         lastValue,
+         lastValueKey,
+         nonDestructive,
+         consumersBeforeDispatch,
+         delayBeforeDispatch,
+         autoDelete,
+         autoDeleteDelay,
+         autoDeleteMessageCount,
+         autoCreateAddress,
+         addressSettings.getDefaultRingSize()
+      );
+   }
+
+   @Override
+   public String createQueue(String address,
+                             String routingType,
+                             String name,
+                             String filterStr,
+                             boolean durable,
+                             int maxConsumers,
+                             boolean purgeOnNoConsumers,
+                             boolean exclusive,
+                             boolean groupRebalance,
+                             int groupBuckets,
+                             String groupFirstKey,
+                             boolean lastValue,
+                             String lastValueKey,
+                             boolean nonDestructive,
+                             int consumersBeforeDispatch,
+                             long delayBeforeDispatch,
+                             boolean autoDelete,
+                             long autoDeleteDelay,
+                             long autoDeleteMessageCount,
+                             boolean autoCreateAddress,
+                             long ringSize) throws Exception {
       if (AuditLogger.isEnabled()) {
          AuditLogger.createQueue(this.server, address, routingType, name, filterStr, durable,
                   maxConsumers, purgeOnNoConsumers, exclusive, groupBuckets, lastValue,
                   lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete,
-                  autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress);
+                  autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, ringSize);
       }
       checkStarted();
 
@@ -1084,7 +1132,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
             filter = new SimpleString(filterStr);
          }
 
-         final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), SimpleString.toSimpleString(name), filter, durable, false, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, SimpleString.toSimpleString(groupFirstKey), lastValue, SimpleString.toSimpleString(lastValueKey), nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateA [...]
+         final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), SimpleString.toSimpleString(name), filter, durable, false, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, SimpleString.toSimpleString(groupFirstKey), lastValue, SimpleString.toSimpleString(lastValueKey), nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateA [...]
          return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString();
       } catch (ActiveMQException e) {
          throw new IllegalStateException(e.getMessage());
@@ -1152,9 +1200,27 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                              Integer consumersBeforeDispatch,
                              Long delayBeforeDispatch,
                              String user) throws Exception {
+      return updateQueue(name, routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, null, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, null);
+   }
+
+   @Override
+   public String updateQueue(String name,
+                             String routingType,
+                             String filter,
+                             Integer maxConsumers,
+                             Boolean purgeOnNoConsumers,
+                             Boolean exclusive,
+                             Boolean groupRebalance,
+                             Integer groupBuckets,
+                             String groupFirstKey,
+                             Boolean nonDestructive,
+                             Integer consumersBeforeDispatch,
+                             Long delayBeforeDispatch,
+                             String user,
+                             Long ringSize) throws Exception {
       if (AuditLogger.isEnabled()) {
          AuditLogger.updateQueue(this.server, name, routingType, filter, maxConsumers, purgeOnNoConsumers,
-                  exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user);
+                  exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, ringSize);
       }
       checkStarted();
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
index 4f18921..3f79138 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/QueueControlImpl.java
@@ -1552,6 +1552,21 @@ public class QueueControlImpl extends AbstractControl implements QueueControl {
    }
 
    @Override
+   public long getRingSize() {
+      if (AuditLogger.isEnabled()) {
+         AuditLogger.getRingSize(queue);
+      }
+      checkStarted();
+
+      clearIO();
+      try {
+         return queue.getRingSize();
+      } finally {
+         blockOnIO();
+      }
+   }
+
+   @Override
    public String listConsumersAsJSON() throws Exception {
       if (AuditLogger.isEnabled()) {
          AuditLogger.listConsumersAsJSON(queue);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
index 0f265f8..7582eca 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/cursor/PagedReferenceImpl.java
@@ -253,6 +253,16 @@ public class PagedReferenceImpl extends LinkedListImpl.Node<PagedReferenceImpl>
    }
 
    @Override
+   public void setInDelivery(boolean inDelivery) {
+
+   }
+
+   @Override
+   public boolean isInDelivery() {
+      return false;
+   }
+
+   @Override
    public void setAlreadyAcked() {
       alreadyAcked = true;
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
index 45d8309..9e088e6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
@@ -97,4 +97,6 @@ public interface QueueBindingInfo {
    long getAutoDeleteDelay();
 
    long getAutoDeleteMessageCount();
+
+   long getRingSize();
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 238f39b..79bab76 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1298,7 +1298,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
       SimpleString filterString = filter == null ? null : filter.getFilterString();
 
-      PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDele [...]
+      PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.isAutoDele [...]
 
       readLock();
       try {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index 8278fa1..adf91c2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -74,6 +74,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
 
    public long autoDeleteMessageCount;
 
+   public long ringSize;
+
    public PersistentQueueBindingEncoding() {
    }
 
@@ -145,7 +147,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
                                          final long autoDeleteDelay,
                                          final long autoDeleteMessageCount,
                                          final byte routingType,
-                                         final boolean configurationManaged) {
+                                         final boolean configurationManaged,
+                                         final long ringSize) {
       this.name = name;
       this.address = address;
       this.filterString = filterString;
@@ -167,6 +170,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       this.autoDeleteMessageCount = autoDeleteMessageCount;
       this.routingType = routingType;
       this.configurationManaged = configurationManaged;
+      this.ringSize = ringSize;
    }
 
    @Override
@@ -352,6 +356,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
    }
 
    @Override
+   public long getRingSize() {
+      return ringSize;
+   }
+
+   @Override
    public void decode(final ActiveMQBuffer buffer) {
       name = buffer.readSimpleString();
       address = buffer.readSimpleString();
@@ -447,6 +456,11 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       } else {
          groupFirstKey = ActiveMQDefaultConfiguration.getDefaultGroupFirstKey();
       }
+      if (buffer.readableBytes() > 0) {
+         ringSize = buffer.readLong();
+      } else {
+         ringSize = ActiveMQDefaultConfiguration.getDefaultRingSize();
+      }
    }
 
    @Override
@@ -472,6 +486,7 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       buffer.writeLong(autoDeleteDelay);
       buffer.writeLong(autoDeleteMessageCount);
       buffer.writeNullableSimpleString(groupFirstKey);
+      buffer.writeLong(ringSize);
    }
 
    @Override
@@ -494,7 +509,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
          DataConstants.SIZE_BOOLEAN +
          DataConstants.SIZE_LONG +
          DataConstants.SIZE_LONG +
-         SimpleString.sizeofNullableString(groupFirstKey);
+         SimpleString.sizeofNullableString(groupFirstKey) +
+         DataConstants.SIZE_LONG;
    }
 
    private SimpleString createMetadata() {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index 9d95955..977e678 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -80,6 +80,22 @@ public interface PostOffice extends ActiveMQComponent {
                             SimpleString user,
                             Boolean configurationManaged) throws Exception;
 
+   QueueBinding updateQueue(SimpleString name,
+                            RoutingType routingType,
+                            Filter filter,
+                            Integer maxConsumers,
+                            Boolean purgeOnNoConsumers,
+                            Boolean exclusive,
+                            Boolean groupRebalance,
+                            Integer groupBuckets,
+                            SimpleString groupFirstKey,
+                            Boolean nonDestructive,
+                            Integer consumersBeforeDispatch,
+                            Long delayBeforeDispatch,
+                            SimpleString user,
+                            Boolean configurationManaged,
+                            Long ringSize) throws Exception;
+
    List<Queue> listQueuesForAddress(SimpleString address) throws Exception;
 
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index 7156514..9465872 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -482,6 +482,25 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                                    Long delayBeforeDispatch,
                                    SimpleString user,
                                    Boolean configurationManaged) throws Exception {
+      return updateQueue(name, routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, configurationManaged, null);
+   }
+
+   @Override
+   public QueueBinding updateQueue(SimpleString name,
+                                   RoutingType routingType,
+                                   Filter filter,
+                                   Integer maxConsumers,
+                                   Boolean purgeOnNoConsumers,
+                                   Boolean exclusive,
+                                   Boolean groupRebalance,
+                                   Integer groupBuckets,
+                                   SimpleString groupFirstKey,
+                                   Boolean nonDestructive,
+                                   Integer consumersBeforeDispatch,
+                                   Long delayBeforeDispatch,
+                                   SimpleString user,
+                                   Boolean configurationManaged,
+                                   Long ringSize) throws Exception {
       synchronized (this) {
          final QueueBinding queueBinding = (QueueBinding) addressManager.getBinding(name);
          if (queueBinding == null) {
@@ -570,6 +589,10 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                changed = true;
                queue.setUser(user);
             }
+            if (ringSize != null && !ringSize.equals(queue.getRingSize())) {
+               changed = true;
+               queue.setRingSize(ringSize);
+            }
 
             if (changed) {
                final long txID = storageManager.generateID();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index e148f91..5a731fb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -358,7 +358,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   requiresResponse = request.isRequiresResponse();
                   session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(),
                                       request.isExclusive(), request.isGroupRebalance(), request.getGroupBuckets(), request.getGroupFirstKey(), request.isLastValue(), request.getLastValueKey(), request.isNonDestructive(), request.getConsumersBeforeDispatch(), request.getDelayBeforeDispatch(),
-                                      request.isAutoDelete(), request.getAutoDeleteDelay(), request.getAutoDeleteMessageCount(), request.isAutoCreated());
+                                      request.isAutoDelete(), request.getAutoDeleteDelay(), request.getAutoDeleteMessageCount(), request.isAutoCreated(), request.getRingSize());
                   if (requiresResponse) {
                      response = createNullResponseMessage(packet);
                   }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index cd5ef02..f437820 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -428,6 +428,11 @@ public interface ActiveMQServer extends ServiceComponent {
                      boolean lastValue, SimpleString lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch,
                      boolean autoDelete, long autoDeleteDelay, long autoDeleteMessageCount, boolean autoCreateAddress) throws Exception;
 
+   Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
+                     boolean durable, boolean temporary, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean groupRebalance, int groupBuckets, SimpleString groupFirstKey,
+                     boolean lastValue, SimpleString lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch,
+                     boolean autoDelete, long autoDeleteDelay, long autoDeleteMessageCount, boolean autoCreateAddress, long ringSize) throws Exception;
+
 
    Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
                      SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
@@ -451,6 +456,11 @@ public interface ActiveMQServer extends ServiceComponent {
                      Boolean purgeOnNoConsumers, Boolean exclusive, Boolean groupRebalance, Integer groupBuckets, SimpleString groupFirstKey, Boolean lastValue, SimpleString lastValueKey, Boolean nonDestructive,
                      Integer consumersBeforeDispatch, Long delayBeforeDispatch, Boolean autoDelete, Long autoDeleteDelay, Long autoDeleteMessageCount, boolean autoCreateAddress) throws Exception;
 
+   Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter,
+                     SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
+                     Boolean purgeOnNoConsumers, Boolean exclusive, Boolean groupRebalance, Integer groupBuckets, SimpleString groupFirstKey, Boolean lastValue, SimpleString lastValueKey, Boolean nonDestructive,
+                     Integer consumersBeforeDispatch, Long delayBeforeDispatch, Boolean autoDelete, Long autoDeleteDelay, Long autoDeleteMessageCount, boolean autoCreateAddress, Long ringSize) throws Exception;
+
    Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
                      SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
                      boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception;
@@ -471,6 +481,12 @@ public interface ActiveMQServer extends ServiceComponent {
                      int groupBuckets, SimpleString groupFirstKey, boolean lastValue, SimpleString lastValueKey, boolean nonDestructive,
                      int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoDelete, long autoDeleteDelay, long autoDeleteMessageCount, boolean autoCreateAddress) throws Exception;
 
+   Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
+                     SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
+                     boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean groupRebalance,
+                     int groupBuckets, SimpleString groupFirstKey, boolean lastValue, SimpleString lastValueKey, boolean nonDestructive,
+                     int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoDelete, long autoDeleteDelay, long autoDeleteMessageCount, boolean autoCreateAddress, long ringSize) throws Exception;
+
    @Deprecated
    Queue createQueue(SimpleString address, SimpleString queueName, SimpleString filter, boolean durable, boolean temporary) throws Exception;
 
@@ -628,6 +644,21 @@ public interface ActiveMQServer extends ServiceComponent {
                      Long delayBeforeDispatch,
                      String user) throws Exception;
 
+   Queue updateQueue(String name,
+                     RoutingType routingType,
+                     String filterString,
+                     Integer maxConsumers,
+                     Boolean purgeOnNoConsumers,
+                     Boolean exclusive,
+                     Boolean groupRebalance,
+                     Integer groupBuckets,
+                     String groupFirstQueue,
+                     Boolean nonDestructive,
+                     Integer consumersBeforeDispatch,
+                     Long delayBeforeDispatch,
+                     String user,
+                     Long ringSize) throws Exception;
+
    /*
             * add a ProtocolManagerFactory to be used. Note if @see Configuration#isResolveProtocols is tur then this factory will
             * replace any factories with the same protocol
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
index 2dd8fc3..c55910b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/MessageReference.java
@@ -119,6 +119,10 @@ public interface MessageReference {
 
    void handled();
 
+   void setInDelivery(boolean alreadyDelivered);
+
+   boolean isInDelivery();
+
    void setAlreadyAcked();
 
    boolean isAlreadyAcked();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 8b91aa9..8f344ff 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -135,6 +135,10 @@ public interface Queue extends Bindable,CriticalComponent {
 
    long getConsumerRemovedTimestamp();
 
+   void setRingSize(long ringSize);
+
+   long getRingSize();
+
     /**
     * This will set a reference counter for every consumer present on the queue.
     * The ReferenceCounter will know what to do when the counter became zeroed.
@@ -254,6 +258,8 @@ public interface Queue extends Bindable,CriticalComponent {
 
    long getMessagesKilled();
 
+   long getMessagesReplaced();
+
    MessageReference removeReferenceWithID(long id) throws Exception;
 
    MessageReference getReference(long id) throws ActiveMQException;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
index 9135e18..e5b9bb6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
@@ -52,6 +52,7 @@ public final class QueueConfig {
    private final boolean autoDelete;
    private final long autoDeleteDelay;
    private final long autoDeleteMessageCount;
+   private final long ringSize;
 
    public static final class Builder {
 
@@ -79,6 +80,7 @@ public final class QueueConfig {
       private boolean autoDelete;
       private long autoDeleteDelay;
       private long autoDeleteMessageCount;
+      private long ringSize;
       private boolean configurationManaged;
 
       private Builder(final long id, final SimpleString name) {
@@ -110,6 +112,7 @@ public final class QueueConfig {
          this.autoDelete = ActiveMQDefaultConfiguration.getDefaultQueueAutoDelete(autoCreated);
          this.autoDeleteDelay = ActiveMQDefaultConfiguration.getDefaultQueueAutoDeleteDelay();
          this.autoDeleteMessageCount = ActiveMQDefaultConfiguration.getDefaultQueueAutoDeleteMessageCount();
+         this.ringSize = ActiveMQDefaultConfiguration.getDefaultRingSize();
          this.configurationManaged = false;
          validateState();
       }
@@ -217,6 +220,11 @@ public final class QueueConfig {
          return this;
       }
 
+      public Builder ringSize(final long ringSize) {
+         this.ringSize = ringSize;
+         return this;
+      }
+
 
       public Builder groupRebalance(final boolean groupRebalance) {
          this.groupRebalance = groupRebalance;
@@ -266,7 +274,7 @@ public final class QueueConfig {
          } else {
             pageSubscription = null;
          }
-         return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, groupRebalance, groupBuckets, groupFirstKey, autoDelete, autoDeleteDelay, autoDeleteMessageCount, configurationManaged);
+         return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, groupRebalance, groupBuckets, groupFirstKey, autoDelete, autoDeleteDelay, autoDeleteMessageCount, ringSize, configurationManaged);
       }
 
    }
@@ -323,6 +331,7 @@ public final class QueueConfig {
                        final boolean autoDelete,
                        final long autoDeleteDelay,
                        final long autoDeleteMessageCount,
+                       final long ringSize,
                        final boolean configurationManaged) {
       this.id = id;
       this.address = address;
@@ -348,6 +357,7 @@ public final class QueueConfig {
       this.autoDelete = autoDelete;
       this.autoDeleteDelay = autoDeleteDelay;
       this.autoDeleteMessageCount = autoDeleteMessageCount;
+      this.ringSize = ringSize;
       this.configurationManaged = configurationManaged;
    }
 
@@ -451,6 +461,10 @@ public final class QueueConfig {
       return autoDeleteMessageCount;
    }
 
+   public long getRingSize() {
+      return ringSize;
+   }
+
    @Override
    public boolean equals(Object o) {
       if (this == o)
@@ -508,6 +522,8 @@ public final class QueueConfig {
          return false;
       if (autoDeleteMessageCount != that.autoDeleteMessageCount)
          return false;
+      if (ringSize != that.ringSize)
+         return false;
       if (configurationManaged != that.configurationManaged)
          return false;
       return user != null ? user.equals(that.user) : that.user == null;
@@ -540,6 +556,7 @@ public final class QueueConfig {
       result = 31 * result + (autoDelete ? 1 : 0);
       result = 31 * result + Long.hashCode(autoDeleteDelay);
       result = 31 * result + Long.hashCode(autoDeleteMessageCount);
+      result = 31 * result + Long.hashCode(ringSize);
       result = 31 * result + (configurationManaged ? 1 : 0);
       return result;
    }
@@ -571,6 +588,7 @@ public final class QueueConfig {
          + ", autoDelete=" + autoDelete
          + ", autoDeleteDelay=" + autoDeleteDelay
          + ", autoDeleteMessageCount=" + autoDeleteMessageCount
+         + ", ringSize=" + ringSize
          + ", configurationManaged=" + configurationManaged + '}';
    }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index e7848f0..210fe89 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -221,6 +221,29 @@ public interface ServerSession extends SecurityAuth {
                      SimpleString filterString,
                      boolean temporary,
                      boolean durable,
+                     int maxConsumers,
+                     boolean purgeOnNoConsumers,
+                     Boolean exclusive,
+                     Boolean groupRebalance,
+                     Integer groupBuckets,
+                     SimpleString groupFirstKey,
+                     Boolean lastValue,
+                     SimpleString lastValueKey,
+                     Boolean nonDestructive,
+                     Integer consumersBeforeDispatch,
+                     Long delayBeforeDispatch,
+                     Boolean autoDelete,
+                     Long autoDeleteDelay,
+                     Long autoDeleteMessageCount,
+                     boolean autoCreated,
+                     Long ringSize) throws Exception;
+
+   Queue createQueue(SimpleString address,
+                     SimpleString name,
+                     RoutingType routingType,
+                     SimpleString filterString,
+                     boolean temporary,
+                     boolean durable,
                      boolean autoCreated) throws Exception;
 
    Queue createQueue(AddressInfo addressInfo,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index ba2bfdc..b5c7054 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -982,12 +982,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
          SimpleString filterString = filter == null ? null : filter.getFilterString();
 
-         response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumers [...]
+         response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.getGroupFirstKey(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumers [...]
       } else if (realName.equals(managementAddress)) {
          // make an exception for the management address (see HORNETQ-29)
-         response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize);
+         response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, null, null, null,null, null, null, null, null, null, null, defaultConsumerWindowSize, null);
       } else {
-         response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessage [...]
+         response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupBuckets, defaultGroupFirstKey, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, isAutoDelete(false, addressSettings), autoDeleteQueuesDelay, autoDeleteQueuesMessage [...]
       }
 
       return response;
@@ -1835,6 +1835,32 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
+   public Queue createQueue(final SimpleString address,
+                            final RoutingType routingType,
+                            final SimpleString queueName,
+                            final SimpleString filter,
+                            final boolean durable,
+                            final boolean temporary,
+                            final int maxConsumers,
+                            final boolean purgeOnNoConsumers,
+                            final boolean exclusive,
+                            final boolean groupRebalance,
+                            final int groupBuckets,
+                            final SimpleString groupFirstKey,
+                            final boolean lastValue,
+                            final SimpleString lastValueKey,
+                            final boolean nonDestructive,
+                            final int consumersBeforeDispatch,
+                            final long delayBeforeDispatch,
+                            final boolean autoDelete,
+                            final long autoDeleteDelay,
+                            final long autoDeleteMessageCount,
+                            final boolean autoCreateAddress,
+                            final long ringSize) throws Exception {
+      return createQueue(address, routingType, queueName, filter, null, durable, temporary, false, false, false, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, ringSize);
+   }
+
+   @Override
    @Deprecated
    public Queue createQueue(SimpleString address,
                             RoutingType routingType,
@@ -1853,24 +1879,30 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    @Override
    public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception {
       AddressSettings as = getAddressSettingsRepository().getMatch(addressInfo == null ? queueName.toString() : addressInfo.getName().toString());
-      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(),  as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.getDefaultGroupFirstKey(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), isAutoDelete(autoCreated, as), as.getAutoDeleteQueuesDelay(), a [...]
+      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(),  as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.getDefaultGroupFirstKey(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), isAutoDelete(autoCreated, as), as.getAutoDeleteQueuesDelay(), a [...]
    }
 
    @Override
    public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, boolean autoCreateAddress) throws Exception {
       AddressSettings as = getAddressSettingsRepository().getMatch(addressInfo == null ? queueName.toString() : addressInfo.getName().toString());
-      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive,  as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.getDefaultGroupFirstKey(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), isAutoDelete(autoCreated, as), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), a [...]
+      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive,  as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.getDefaultGroupFirstKey(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), isAutoDelete(autoCreated, as), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(), a [...]
    }
 
    @Override
    public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean groupRebalance, Integer groupBuckets, Boolean lastValue, SimpleString lastValueKey, Boolean nonDestructive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, Boolean autoDelete, Long autoDeleteDelay, Long autoDeleteMessageCount, boo [...]
       AddressSettings as = getAddressSettingsRepository().getMatch(addressInfo == null ? queueName.toString() : addressInfo.getName().toString());
-      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, as.getDefaultGroupFirstKey(), lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, false);
+      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, as.getDefaultGroupFirstKey(), lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, false, as.getDefaultRingSize());
    }
 
    @Override
    public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean groupRebalance, Integer groupBuckets, SimpleString groupFirstKey, Boolean lastValue, SimpleString lastValueKey, Boolean nonDestructive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, Boolean autoDelete, Long autoDeleteDelay, Long [...]
-      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, false);
+      AddressSettings as = getAddressSettingsRepository().getMatch(addressInfo == null ? queueName.toString() : addressInfo.getName().toString());
+      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, false, as.getDefaultRingSize());
+   }
+
+   @Override
+   public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean groupRebalance, Integer groupBuckets, SimpleString groupFirstKey, Boolean lastValue, SimpleString lastValueKey, Boolean nonDestructive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, Boolean autoDelete, Long autoDeleteDelay, Long [...]
+      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, false, ringSize);
    }
 
    static boolean isAutoDelete(boolean autoCreated, AddressSettings addressSettings) {
@@ -3063,13 +3095,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
             boolean isNonDestructive = config.isNonDestructive() == null ? as.isDefaultNonDestructive() : config.isNonDestructive();
             int consumersBeforeDispatch = config.getConsumersBeforeDispatch() == null ? as.getDefaultConsumersBeforeDispatch() : config.getConsumersBeforeDispatch();
             long delayBeforeDispatch = config.getDelayBeforeDispatch() == null ? as.getDefaultDelayBeforeDispatch() : config.getDelayBeforeDispatch();
+            long ringSize = config.getRingSize() == null ? as.getDefaultRingSize() : config.getRingSize();
 
             if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
-               updateQueue(config.getName(), config.getRoutingType(), config.getFilterString(), maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, groupRebalance, groupBuckets, groupFirstKey != null ? groupFirstKey.toString() : null, isNonDestructive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser(), true);
+               updateQueue(config.getName(), config.getRoutingType(), config.getFilterString(), maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, groupRebalance, groupBuckets, groupFirstKey != null ? groupFirstKey.toString() : null, isNonDestructive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser(), true, ringSize);
             } else {
                // if the address::queue doesn't exist then create it
                try {
-                  createQueue(new AddressInfo(SimpleString.toSimpleString(config.getAddress())).addRoutingType(config.getRoutingType()), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, groupRebalance, groupBuckets, groupFirstKey, isLastValue, lastValueKey, isNonDestructive, consumersBeforeDispatch, delayBeforeDispatch, [...]
+                  createQueue(new AddressInfo(SimpleString.toSimpleString(config.getAddress())).addRoutingType(config.getRoutingType()), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, groupRebalance, groupBuckets, groupFirstKey, isLastValue, lastValueKey, isNonDestructive, consumersBeforeDispatch, delayBeforeDispatch, [...]
                } catch (ActiveMQQueueExistsException e) {
                   // the queue may exist on a *different* address
                   ActiveMQServerLogger.LOGGER.warn(e.getMessage());
@@ -3272,7 +3305,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             final long autoDeleteDelay,
                             final long autoDeleteMessageCount,
                             final boolean autoCreateAddress,
-                            final boolean configurationManaged) throws Exception {
+                            final boolean configurationManaged,
+                            final long ringSize) throws Exception {
       SimpleString realQueueName = CompositeAddress.extractQueueName(queueName);
 
       if (realQueueName == null || realQueueName.length() == 0) {
@@ -3343,6 +3377,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
               .autoDeleteDelay(autoDeleteDelay)
               .autoDeleteMessageCount(autoDeleteMessageCount)
               .configurationManaged(configurationManaged)
+              .ringSize(ringSize)
               .build();
 
       if (hasBrokerQueuePlugins()) {
@@ -3426,7 +3461,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             final long autoDeleteMessageCount,
                             final boolean autoCreateAddress) throws Exception {
       AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? queueName.toString() : address.toString());
-      return createQueue(new AddressInfo(address).addRoutingType(routingType), queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, as.getDefaultGroupFirstKey(), lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, false);
+      return createQueue(new AddressInfo(address).addRoutingType(routingType), queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, as.getDefaultGroupFirstKey(), lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, false, as.getDefaultRingSize());
    }
 
    @Override
@@ -3455,7 +3490,38 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             final long autoDeleteDelay,
                             final long autoDeleteMessageCount,
                             final boolean autoCreateAddress) throws Exception {
-      return createQueue(new AddressInfo(address).addRoutingType(routingType), queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, false);
+      AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? queueName.toString() : address.toString());
+      return createQueue(new AddressInfo(address).addRoutingType(routingType), queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, false, as.getDefaultRingSize());
+   }
+
+   @Override
+   public Queue createQueue(final SimpleString address,
+                            final RoutingType routingType,
+                            final SimpleString queueName,
+                            final SimpleString filterString,
+                            final SimpleString user,
+                            final boolean durable,
+                            final boolean temporary,
+                            final boolean ignoreIfExists,
+                            final boolean transientQueue,
+                            final boolean autoCreated,
+                            final int maxConsumers,
+                            final boolean purgeOnNoConsumers,
+                            final boolean exclusive,
+                            final boolean groupRebalance,
+                            final int groupBuckets,
+                            final SimpleString groupFirstKey,
+                            final boolean lastValue,
+                            final SimpleString lastValueKey,
+                            final boolean nonDestructive,
+                            final int consumersBeforeDispatch,
+                            final long delayBeforeDispatch,
+                            final boolean autoDelete,
+                            final long autoDeleteDelay,
+                            final long autoDeleteMessageCount,
+                            final boolean autoCreateAddress,
+                            final long ringSize) throws Exception {
+      return createQueue(new AddressInfo(address).addRoutingType(routingType), queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, false, ringSize);
    }
 
    @Deprecated
@@ -3521,6 +3587,24 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       return updateQueue(name, routingType, filterString, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, null);
    }
 
+   @Override
+   public Queue updateQueue(String name,
+                            RoutingType routingType,
+                            String filterString,
+                            Integer maxConsumers,
+                            Boolean purgeOnNoConsumers,
+                            Boolean exclusive,
+                            Boolean groupRebalance,
+                            Integer groupBuckets,
+                            String groupFirstKey,
+                            Boolean nonDestructive,
+                            Integer consumersBeforeDispatch,
+                            Long delayBeforeDispatch,
+                            String user,
+                            Long ringSize) throws Exception {
+      return updateQueue(name, routingType, filterString, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, null, ringSize);
+   }
+
    private Queue updateQueue(String name,
                             RoutingType routingType,
                             String filterString,
@@ -3534,9 +3618,10 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             Integer consumersBeforeDispatch,
                             Long delayBeforeDispatch,
                             String user,
-                            Boolean configurationManaged) throws Exception {
+                            Boolean configurationManaged,
+                            Long ringSize) throws Exception {
       final Filter filter = FilterImpl.createFilter(filterString);
-      final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, SimpleString.toSimpleString(groupFirstKey), nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user), configurationManaged);
+      final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, SimpleString.toSimpleString(groupFirstKey), nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user), configurationManaged, ringSize);
       if (queueBinding != null) {
          final Queue queue = queueBinding.getQueue();
          return queue;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
index d02ef70..d4db19b 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/GroupFirstMessageReference.java
@@ -192,6 +192,16 @@ public class GroupFirstMessageReference implements MessageReference {
    }
 
    @Override
+   public void setInDelivery(boolean inDelivery) {
+      messageReference.setInDelivery(inDelivery);
+   }
+
+   @Override
+   public boolean isInDelivery() {
+      return messageReference.isInDelivery();
+   }
+
+   @Override
    public void setAlreadyAcked() {
       messageReference.setAlreadyAcked();
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 40222b8..d802259 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -278,6 +278,16 @@ public class LastValueQueue extends QueueImpl {
       }
 
       @Override
+      public void setInDelivery(boolean inDelivery) {
+         ref.setInDelivery(inDelivery);
+      }
+
+      @Override
+      public boolean isInDelivery() {
+         return ref.isInDelivery();
+      }
+
+      @Override
       public Object getProtocolData() {
          return ref.getProtocolData();
       }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
index 459b703..e9238fe 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageReferenceImpl.java
@@ -76,6 +76,8 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
 
    private boolean alreadyAcked;
 
+   private boolean deliveredDirectly;
+
    private Object protocolData;
 
    private Consumer<? super MessageReference> onDelivery;
@@ -223,6 +225,16 @@ public class MessageReferenceImpl extends LinkedListImpl.Node<MessageReferenceIm
    }
 
    @Override
+   public void setInDelivery(boolean inDelivery) {
+      this.deliveredDirectly = inDelivery;
+   }
+
+   @Override
+   public boolean isInDelivery() {
+      return deliveredDirectly;
+   }
+
+   @Override
    public void setAlreadyAcked() {
       alreadyAcked = true;
    }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index c00add4..3d73015 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -164,7 +164,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
             .autoDeleteDelay(queueBindingInfo.getAutoDeleteDelay())
             .autoDeleteMessageCount(queueBindingInfo.getAutoDeleteMessageCount())
             .routingType(RoutingType.getType(queueBindingInfo.getRoutingType()))
-            .configurationManaged((queueBindingInfo.isConfigurationManaged()));
+            .configurationManaged((queueBindingInfo.isConfigurationManaged()))
+            .ringSize(queueBindingInfo.getRingSize());
          final Queue queue = queueFactory.createQueueWith(queueConfigBuilder.build());
          queue.setConsumersRefCount(new QueueManagerImpl(((PostOfficeImpl)postOffice).getServer(), queueBindingInfo.getQueueName()));
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index af37216..f4ad91d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -77,7 +77,7 @@ public class QueueFactoryImpl implements QueueFactory {
       if (lastValueKey(config) != null) {
          queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isGroupRebalance(), config.getGroupBuckets(), config.getGroupFirstKey(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), lastValueKey(config), config.isNonDestruct [...]
       } else {
-         queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isGroupRebalance(), config.getGroupBuckets(), config.getGroupFirstKey(), config.isNonDestructive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), config.isAutoDelete( [...]
+         queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isGroupRebalance(), config.getGroupBuckets(), config.getGroupFirstKey(), config.isNonDestructive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), config.isAutoDelete( [...]
       }
       server.getCriticalAnalyzer().add(queue);
       return queue;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 15f76e1..7e8d82e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -187,9 +187,9 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    // The estimate of memory being consumed by this queue. Used to calculate instances of messages to depage
    private final AtomicInteger queueMemorySize = new AtomicInteger(0);
 
-   private final QueuePendingMessageMetrics pendingMetrics = new QueuePendingMessageMetrics(this);
+   private final QueueMessageMetrics pendingMetrics = new QueueMessageMetrics(this, "pending");
 
-   private final QueuePendingMessageMetrics deliveringMetrics = new QueuePendingMessageMetrics(this);
+   private final QueueMessageMetrics deliveringMetrics = new QueueMessageMetrics(this, "delivering");
 
    protected final ScheduledDeliveryHandler scheduledDeliveryHandler;
 
@@ -309,6 +309,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private volatile boolean nonDestructive;
 
+   private volatile long ringSize;
+
    /**
     * This is to avoid multi-thread races on calculating direct delivery,
     * to guarantee ordering will be always be correct
@@ -461,35 +463,69 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    public QueueImpl(final long id,
-                     final SimpleString address,
-                     final SimpleString name,
-                     final Filter filter,
-                     final PageSubscription pageSubscription,
-                     final SimpleString user,
-                     final boolean durable,
-                     final boolean temporary,
-                     final boolean autoCreated,
-                     final RoutingType routingType,
-                     final Integer maxConsumers,
-                     final Boolean exclusive,
-                     final Boolean groupRebalance,
-                     final Integer groupBuckets,
-                     final SimpleString groupFirstKey,
-                     final Boolean nonDestructive,
-                     final Integer consumersBeforeDispatch,
-                     final Long delayBeforeDispatch,
-                     final Boolean purgeOnNoConsumers,
-                     final Boolean autoDelete,
-                     final Long autoDeleteDelay,
-                     final Long autoDeleteMessageCount,
-                     final boolean configurationManaged,
-                     final ScheduledExecutorService scheduledExecutor,
-                     final PostOffice postOffice,
-                     final StorageManager storageManager,
-                     final HierarchicalRepository<AddressSettings> addressSettingsRepository,
-                     final ArtemisExecutor executor,
-                     final ActiveMQServer server,
-                     final QueueFactory factory) {
+                    final SimpleString address,
+                    final SimpleString name,
+                    final Filter filter,
+                    final PageSubscription pageSubscription,
+                    final SimpleString user,
+                    final boolean durable,
+                    final boolean temporary,
+                    final boolean autoCreated,
+                    final RoutingType routingType,
+                    final Integer maxConsumers,
+                    final Boolean exclusive,
+                    final Boolean groupRebalance,
+                    final Integer groupBuckets,
+                    final SimpleString groupFirstKey,
+                    final Boolean nonDestructive,
+                    final Integer consumersBeforeDispatch,
+                    final Long delayBeforeDispatch,
+                    final Boolean purgeOnNoConsumers,
+                    final Boolean autoDelete,
+                    final Long autoDeleteDelay,
+                    final Long autoDeleteMessageCount,
+                    final boolean configurationManaged,
+                    final ScheduledExecutorService scheduledExecutor,
+                    final PostOffice postOffice,
+                    final StorageManager storageManager,
+                    final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+                    final ArtemisExecutor executor,
+                    final ActiveMQServer server,
+                    final QueueFactory factory) {
+      this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, autoDelete, autoDeleteDelay, autoDeleteMessageCount, configurationManaged, null, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
+   }
+
+   public QueueImpl(final long id,
+                    final SimpleString address,
+                    final SimpleString name,
+                    final Filter filter,
+                    final PageSubscription pageSubscription,
+                    final SimpleString user,
+                    final boolean durable,
+                    final boolean temporary,
+                    final boolean autoCreated,
+                    final RoutingType routingType,
+                    final Integer maxConsumers,
+                    final Boolean exclusive,
+                    final Boolean groupRebalance,
+                    final Integer groupBuckets,
+                    final SimpleString groupFirstKey,
+                    final Boolean nonDestructive,
+                    final Integer consumersBeforeDispatch,
+                    final Long delayBeforeDispatch,
+                    final Boolean purgeOnNoConsumers,
+                    final Boolean autoDelete,
+                    final Long autoDeleteDelay,
+                    final Long autoDeleteMessageCount,
+                    final boolean configurationManaged,
+                    final Long ringSize,
+                    final ScheduledExecutorService scheduledExecutor,
+                    final PostOffice postOffice,
+                    final StorageManager storageManager,
+                    final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+                    final ArtemisExecutor executor,
+                    final ActiveMQServer server,
+                    final QueueFactory factory) {
       super(server == null ? EmptyCriticalAnalyzer.getInstance() : server.getCriticalAnalyzer(), CRITICAL_PATHS);
 
       this.id = id;
@@ -576,13 +612,15 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       if (this.addressInfo != null && this.addressInfo.isPaused()) {
          this.pause(false);
       }
+
+      this.ringSize = ringSize == null ? ActiveMQDefaultConfiguration.getDefaultRingSize() : ringSize;
    }
 
    // Bindable implementation -------------------------------------------------------------------------------------
 
    @Override
    public boolean allowsReferenceCallback() {
-      // non descructive queues will reuse the same reference between multiple consumers
+      // non destructive queues will reuse the same reference between multiple consumers
       // so you cannot really use the callback from the MessageReference
       return !nonDestructive;
    }
@@ -881,13 +919,19 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       enterCritical(CRITICAL_PATH_ADD_HEAD);
       synchronized (this) {
          try {
-            if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
-               return;
+            if (ringSize != -1) {
+               enforceRing(ref, scheduling);
             }
 
-            internalAddHead(ref);
+            if (!ref.isAlreadyAcked()) {
+               if (!scheduling && scheduledDeliveryHandler.checkAndSchedule(ref, false)) {
+                  return;
+               }
 
-            directDeliver = false;
+               internalAddHead(ref);
+
+               directDeliver = false;
+            }
          } finally {
             leaveCritical(CRITICAL_PATH_ADD_HEAD);
          }
@@ -961,7 +1005,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       directDeliver = false;
 
       if (!ref.isPaged()) {
-         messagesAdded.incrementAndGet();
+         incrementMesssagesAdded();
       }
    }
 
@@ -974,6 +1018,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    public void addTail(final MessageReference ref, final boolean direct) {
       enterCritical(CRITICAL_PATH_ADD_TAIL);
       try {
+         enforceRing();
+
          if (scheduleIfPossible(ref)) {
             return;
          }
@@ -1029,7 +1075,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       if (scheduledDeliveryHandler.checkAndSchedule(ref, true)) {
          synchronized (this) {
             if (!ref.isPaged()) {
-               messagesAdded.incrementAndGet();
+               incrementMesssagesAdded();
             }
          }
 
@@ -1330,6 +1376,20 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public long getRingSize() {
+      return ringSize;
+   }
+
+   @Override
+   public synchronized void setRingSize(long ringSize) {
+      this.ringSize = ringSize;
+   }
+
+   public long getMessageCountForRing() {
+      return (long) pendingMetrics.getMessageCount();
+   }
+
+   @Override
    public Set<Consumer> getConsumers() {
       Set<Consumer> consumersSet = new HashSet<>(this.consumers.size());
       for (ConsumerHolder<? extends Consumer> consumerHolder : consumers) {
@@ -1812,6 +1872,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public long getMessagesReplaced() {
+      return messagesReplaced.get();
+   }
+
+   @Override
    public int deleteAllReferences() throws Exception {
       return deleteAllReferences(DEFAULT_FLUSH_LIMIT);
    }
@@ -2516,6 +2581,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       int priority = getPriority(ref);
 
       messageReferences.addHead(ref, priority);
+
+      ref.setInDelivery(false);
    }
 
    /**
@@ -2553,7 +2620,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          internalAddTail(ref);
 
          if (!ref.isPaged()) {
-            messagesAdded.incrementAndGet();
+            incrementMesssagesAdded();
          }
 
          if (added++ > MAX_DELIVERIES_IN_LOOP) {
@@ -2685,6 +2752,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
 
                   removeMessageReference(holder, ref);
+                  ref.setInDelivery(true);
                   handledconsumer = consumer;
                   handled++;
                   consumers.reset();
@@ -3304,9 +3372,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                   reference = ref;
                }
 
-               messagesAdded.incrementAndGet();
+               incrementMesssagesAdded();
 
                deliveriesInTransit.countUp();
+               reference.setInDelivery(true);
                proceedDeliver(consumer, reference);
                consumers.reset();
                return true;
@@ -3446,6 +3515,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          return;
       }
 
+      // this is done to tell the difference between actual acks and just a closed consumer in the non-destructive use-case
+      if (nonDestructive && reason == AckReason.NORMAL) {
+         ref.setInDelivery(false);
+      }
+
       if (reason == AckReason.EXPIRED) {
          messagesExpired.incrementAndGet();
       } else if (reason == AckReason.KILLED) {
@@ -3470,7 +3544,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
          message = null;
       }
 
-      if (message == null)
+      if (message == null || (nonDestructive && reason == AckReason.NORMAL))
          return;
 
       boolean durableRef = message.isDurable() && queue.isDurable();
@@ -3943,6 +4017,39 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
       }
    }
 
+   private void enforceRing() {
+      if (ringSize != -1) { // better escaping & inlining when ring isn't being used
+         enforceRing(null, false);
+      }
+   }
+
+   private void enforceRing(MessageReference refToAck, boolean scheduling) {
+      if (getMessageCountForRing() >= ringSize) {
+         refToAck = refToAck == null ? messageReferences.poll() : refToAck;
+
+         if (refToAck != null) {
+            if (logger.isDebugEnabled()) {
+               logger.debugf("Preserving ringSize %d by acking message ref %s", ringSize, refToAck);
+            }
+            referenceHandled(refToAck);
+
+            try {
+               refToAck.acknowledge(null, AckReason.REPLACED, null);
+               if (!refToAck.isInDelivery() && !scheduling) {
+                  refRemoved(refToAck);
+               }
+               refToAck.setAlreadyAcked();
+            } catch (Exception e) {
+               ActiveMQServerLogger.LOGGER.errorAckingOldReference(e);
+            }
+         } else {
+            if (logger.isDebugEnabled()) {
+               logger.debugf("Cannot preserve ringSize %d; message ref is null", ringSize);
+            }
+         }
+      }
+   }
+
    private void registerMeters() {
       String addressName = address.toString();
       String queueName = name.toString();
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueMessageMetrics.java
similarity index 72%
rename from artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java
rename to artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueMessageMetrics.java
index 668a918..3d03e31 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueuePendingMessageMetrics.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueMessageMetrics.java
@@ -24,20 +24,23 @@ import org.apache.activemq.artemis.utils.Preconditions;
 import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
 import org.apache.activemq.artemis.core.server.MessageReference;
 import org.apache.activemq.artemis.core.server.Queue;
+import org.jboss.logging.Logger;
 
-public class QueuePendingMessageMetrics {
+public class QueueMessageMetrics {
 
-   private static final AtomicIntegerFieldUpdater<QueuePendingMessageMetrics> COUNT_UPDATER =
-         AtomicIntegerFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "messageCount");
+   private static final Logger logger = Logger.getLogger(QueueMessageMetrics.class);
 
-   private static final AtomicIntegerFieldUpdater<QueuePendingMessageMetrics> DURABLE_COUNT_UPDATER =
-         AtomicIntegerFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "durableMessageCount");
+   private static final AtomicIntegerFieldUpdater<QueueMessageMetrics> COUNT_UPDATER =
+         AtomicIntegerFieldUpdater.newUpdater(QueueMessageMetrics.class, "messageCount");
 
-   private static final AtomicLongFieldUpdater<QueuePendingMessageMetrics> SIZE_UPDATER =
-         AtomicLongFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "persistentSize");
+   private static final AtomicIntegerFieldUpdater<QueueMessageMetrics> DURABLE_COUNT_UPDATER =
+         AtomicIntegerFieldUpdater.newUpdater(QueueMessageMetrics.class, "durableMessageCount");
 
-   private static final AtomicLongFieldUpdater<QueuePendingMessageMetrics> DURABLE_SIZE_UPDATER =
-         AtomicLongFieldUpdater.newUpdater(QueuePendingMessageMetrics.class, "durablePersistentSize");
+   private static final AtomicLongFieldUpdater<QueueMessageMetrics> SIZE_UPDATER =
+         AtomicLongFieldUpdater.newUpdater(QueueMessageMetrics.class, "persistentSize");
+
+   private static final AtomicLongFieldUpdater<QueueMessageMetrics> DURABLE_SIZE_UPDATER =
+         AtomicLongFieldUpdater.newUpdater(QueueMessageMetrics.class, "durablePersistentSize");
 
    private volatile int messageCount;
 
@@ -49,14 +52,20 @@ public class QueuePendingMessageMetrics {
 
    private final Queue queue;
 
-   public QueuePendingMessageMetrics(final Queue queue) {
+   private final String name;
+
+   public QueueMessageMetrics(final Queue queue, final String name) {
       Preconditions.checkNotNull(queue);
       this.queue = queue;
+      this.name = name;
    }
 
    public void incrementMetrics(final MessageReference reference) {
       long size = getPersistentSize(reference);
       COUNT_UPDATER.incrementAndGet(this);
+      if (logger.isDebugEnabled()) {
+         logger.debugf("%s increment messageCount to %d: %s", this, messageCount, reference);
+      }
       SIZE_UPDATER.addAndGet(this, size);
       if (queue.isDurable() && reference.isDurable()) {
          DURABLE_COUNT_UPDATER.incrementAndGet(this);
@@ -67,6 +76,9 @@ public class QueuePendingMessageMetrics {
    public void decrementMetrics(final MessageReference reference) {
       long size = -getPersistentSize(reference);
       COUNT_UPDATER.decrementAndGet(this);
+      if (logger.isDebugEnabled()) {
+         logger.debugf("%s decrement messageCount to %d: %s", this, messageCount, reference);
+      }
       SIZE_UPDATER.addAndGet(this, size);
       if (queue.isDurable() && reference.isDurable()) {
          DURABLE_COUNT_UPDATER.decrementAndGet(this);
@@ -144,4 +156,9 @@ public class QueuePendingMessageMetrics {
       return size;
    }
 
+   @Override
+   public String toString() {
+      return "QueuePendingMessageMetrics[queue=" + queue.getName() + ", name=" + name + "]";
+   }
+
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
index 78ec785..550ec9d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerImpl.java
@@ -50,12 +50,12 @@ public class ScheduledDeliveryHandlerImpl implements ScheduledDeliveryHandler {
    // just adding some information to keep it in order accordingly to the initial operations
    private final TreeSet<RefScheduled> scheduledReferences = new TreeSet<>(new MessageReferenceComparator());
 
-   private final QueuePendingMessageMetrics metrics;
+   private final QueueMessageMetrics metrics;
 
    public ScheduledDeliveryHandlerImpl(final ScheduledExecutorService scheduledExecutor,
          final Queue queue) {
       this.scheduledExecutor = scheduledExecutor;
-      this.metrics = new QueuePendingMessageMetrics(queue);
+      this.metrics = new QueueMessageMetrics(queue, "scheduled");
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 296382b..7c26028 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -620,7 +620,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    @Override
    public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable) throws Exception {
       AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString());
-      return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.getDefaultGroupFirstKey(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), ActiveMQServerImpl.isAutoDelete(false, as), as.getAutoDeleteQueuesD [...]
+      return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.getDefaultGroupFirstKey(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), ActiveMQServerImpl.isAutoDelete(false, as), as.getAutoDeleteQueuesD [...]
    }
 
    public Queue createQueue(final AddressInfo addressInfo,
@@ -642,7 +642,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             final boolean autoDelete,
                             final long autoDeleteDelay,
                             final long autoDeleteMessageCount,
-                            final boolean autoCreated) throws Exception {
+                            final boolean autoCreated,
+                            final long ringSize) throws Exception {
       if (AuditLogger.isEnabled()) {
          AuditLogger.createQueue(this, getUsername(), addressInfo, name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers,
                   exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch,
@@ -667,7 +668,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
       server.checkQueueCreationLimit(getUsername());
 
-      Queue queue = server.createQueue(art, unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, as.isAutoCreateAddresses());
+      Queue queue = server.createQueue(art, unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, as.isAutoCreateAddresses(), ringSize);
 
       if (temporary) {
          // Temporary queue in core simply means the queue will be deleted if
@@ -707,7 +708,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             final boolean purgeOnNoConsumers,
                             final boolean autoCreated) throws Exception {
       AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
-      return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.getDefaultGroupFirstKey(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), ActiveMQServerImpl.isAutoDelete(autoCreated, as), as.getAutoDeleteQueu [...]
+      return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.getDefaultGroupFirstKey(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), ActiveMQServerImpl.isAutoDelete(autoCreated, as), as.getAutoDeleteQueu [...]
    }
 
    @Override
@@ -771,7 +772,33 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             final Long autoDeleteDelay,
                             final Long autoDeleteMessageCount,
                             final boolean autoCreated) throws Exception {
-      if (exclusive == null || groupRebalance == null || groupBuckets == null || groupFirstKey == null || lastValue == null || lastValueKey == null || nonDestructive == null || consumersBeforeDispatch == null || delayBeforeDispatch == null || autoDelete == null || autoDeleteDelay == null || autoDeleteMessageCount == null) {
+      return createQueue(address, name, routingType, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, null, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreated, null);
+   }
+
+   @Override
+   public Queue createQueue(final SimpleString address,
+                            final SimpleString name,
+                            final RoutingType routingType,
+                            final SimpleString filterString,
+                            final boolean temporary,
+                            final boolean durable,
+                            final int maxConsumers,
+                            final boolean purgeOnNoConsumers,
+                            final Boolean exclusive,
+                            final Boolean groupRebalance,
+                            final Integer groupBuckets,
+                            final SimpleString groupFirstKey,
+                            final Boolean lastValue,
+                            final SimpleString lastValueKey,
+                            final Boolean nonDestructive,
+                            final Integer consumersBeforeDispatch,
+                            final Long delayBeforeDispatch,
+                            final Boolean autoDelete,
+                            final Long autoDeleteDelay,
+                            final Long autoDeleteMessageCount,
+                            final boolean autoCreated,
+                            final Long ringSize) throws Exception {
+      if (exclusive == null || groupRebalance == null || groupBuckets == null || groupFirstKey == null || lastValue == null || lastValueKey == null || nonDestructive == null || consumersBeforeDispatch == null || delayBeforeDispatch == null || autoDelete == null || autoDeleteDelay == null || autoDeleteMessageCount == null || ringSize == null) {
          AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
          return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers,
                  exclusive == null ? as.isDefaultExclusiveQueue() : exclusive,
@@ -786,10 +813,11 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                  autoDelete == null ? ActiveMQServerImpl.isAutoDelete(autoCreated, as) : autoDelete,
                  autoDeleteDelay == null ? as.getAutoDeleteQueuesDelay() : autoDeleteDelay,
                  autoDeleteMessageCount == null ? as.getAutoDeleteQueuesMessageCount() : autoDeleteMessageCount,
-                 autoCreated);
+                 autoCreated,
+                 ringSize == null ? as.getDefaultRingSize() : ringSize);
       } else {
          return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers,
-                 exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreated);
+                 exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreated, ringSize);
       }
    }
 
@@ -808,14 +836,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    @Override
    public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable, boolean autoCreated) throws Exception {
       AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString());
-      return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.getDefaultGroupFirstKey(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), ActiveMQServerImpl.isAutoDelete(autoCreated, as), as.getAutoDeleteQ [...]
+      return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.getDefaultGroupFirstKey(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), ActiveMQServerImpl.isAutoDelete(autoCreated, as), as.getAutoDeleteQ [...]
    }
 
    @Override
    public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable, Boolean exclusive, Boolean lastValue, boolean autoCreated) throws Exception {
       AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString());
       return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(),
-                         exclusive == null ? as.isDefaultExclusiveQueue() : exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.getDefaultGroupFirstKey(), lastValue == null ? as.isDefaultLastValueQueue() : lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), ActiveMQServerImpl.isAutoDelete(autoCreated, as), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(),  [...]
+                         exclusive == null ? as.isDefaultExclusiveQueue() : exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.getDefaultGroupFirstKey(), lastValue == null ? as.isDefaultLastValueQueue() : lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), ActiveMQServerImpl.isAutoDelete(autoCreated, as), as.getAutoDeleteQueuesDelay(), as.getAutoDeleteQueuesMessageCount(),  [...]
    }
 
    @Override
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index cb5d0b3..625d652 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -179,6 +179,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    private Long autoDeleteQueuesMessageCount = null;
 
+   private Long defaultRingSize = null;
+
    private DeletionPolicy configDeleteQueues = null;
 
    private Boolean autoCreateAddresses = null;
@@ -260,6 +262,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       this.defaultGroupRebalance = other.defaultGroupRebalance;
       this.defaultGroupBuckets = other.defaultGroupBuckets;
       this.defaultGroupFirstKey = other.defaultGroupFirstKey;
+      this.defaultRingSize = other.defaultRingSize;
    }
 
    public AddressSettings() {
@@ -733,6 +736,15 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       return this;
    }
 
+   public long getDefaultRingSize() {
+      return defaultRingSize != null ? defaultRingSize : ActiveMQDefaultConfiguration.DEFAULT_RING_SIZE;
+   }
+
+   public AddressSettings setDefaultRingSize(final long defaultRingSize) {
+      this.defaultRingSize = defaultRingSize;
+      return this;
+   }
+
    /**
     * merge 2 objects in to 1
     *
@@ -884,6 +896,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (defaultGroupBuckets == null) {
          defaultGroupBuckets = merged.defaultGroupBuckets;
       }
+      if (defaultRingSize == null) {
+         defaultRingSize = merged.defaultRingSize;
+      }
    }
 
    @Override
@@ -1040,6 +1055,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (buffer.readableBytes() > 0) {
          autoDeleteCreatedQueues = BufferHelper.readNullableBoolean(buffer);
       }
+
+      if (buffer.readableBytes() > 0) {
+         defaultRingSize = BufferHelper.readNullableLong(buffer);
+      }
    }
 
    @Override
@@ -1089,7 +1108,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          BufferHelper.sizeOfNullableBoolean(defaultGroupRebalance) +
          BufferHelper.sizeOfNullableInteger(defaultGroupBuckets) +
          BufferHelper.sizeOfNullableLong(autoDeleteQueuesMessageCount) +
-         BufferHelper.sizeOfNullableBoolean(autoDeleteCreatedQueues);
+         BufferHelper.sizeOfNullableBoolean(autoDeleteCreatedQueues) +
+         BufferHelper.sizeOfNullableLong(defaultRingSize);
    }
 
    @Override
@@ -1188,6 +1208,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
       BufferHelper.writeNullableBoolean(buffer, autoDeleteCreatedQueues);
 
+      BufferHelper.writeNullableLong(buffer, defaultRingSize);
+
    }
 
    /* (non-Javadoc)
@@ -1245,6 +1267,7 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
       result = prime * result + ((defaultGroupRebalance == null) ? 0 : defaultGroupRebalance.hashCode());
       result = prime * result + ((defaultGroupBuckets == null) ? 0 : defaultGroupBuckets.hashCode());
+      result = prime * result + ((defaultRingSize == null) ? 0 : defaultRingSize.hashCode());
       return result;
    }
 
@@ -1510,6 +1533,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
             return false;
       } else if (!defaultGroupBuckets.equals(other.defaultGroupBuckets))
          return false;
+
+      if (defaultRingSize == null) {
+         if (other.defaultRingSize != null)
+            return false;
+      } else if (!defaultRingSize.equals(other.defaultRingSize))
+         return false;
       return true;
    }
 
@@ -1611,6 +1640,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          defaultGroupRebalance +
          ", defaultGroupBuckets=" +
          defaultGroupBuckets +
+         ", defaultRingSize=" +
+         defaultRingSize +
          "]";
    }
 }
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 2ade964..a9e4be6 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -3380,6 +3380,15 @@
                   </xsd:documentation>
                </xsd:annotation>
             </xsd:element>
+
+            <xsd:element name="default-ring-size" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     the default ring-size value for any matching queue which doesn't have `ring-size` explicitly
+                     defined
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
             
          </xsd:all>
 
@@ -3514,6 +3523,7 @@
       <xsd:attribute name="non-destructive" type="xsd:boolean" use="optional"/>
       <xsd:attribute name="consumers-before-dispatch" type="xsd:int" use="optional"/>
       <xsd:attribute name="delay-before-dispatch" type="xsd:long" use="optional"/>
+      <xsd:attribute name="ring-size" type="xsd:long" use="optional"/>
       <xsd:attributeGroup ref="xml:specialAttrs"/>
    </xsd:complexType>
 
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
index 18469c5..a16d8c0 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/impl/FileConfigurationTest.java
@@ -360,6 +360,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       assertEquals(5, conf.getAddressesSettings().get("a1").getDefaultMaxConsumers());
       assertEquals(RoutingType.ANYCAST, conf.getAddressesSettings().get("a1").getDefaultQueueRoutingType());
       assertEquals(RoutingType.MULTICAST, conf.getAddressesSettings().get("a1").getDefaultAddressRoutingType());
+      assertEquals(3, conf.getAddressesSettings().get("a1").getDefaultRingSize());
 
       assertEquals("a2.1", conf.getAddressesSettings().get("a2").getDeadLetterAddress().toString());
       assertEquals("a2.2", conf.getAddressesSettings().get("a2").getExpiryAddress().toString());
@@ -382,6 +383,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       assertEquals(RoutingType.MULTICAST, conf.getAddressesSettings().get("a2").getDefaultQueueRoutingType());
       assertEquals(RoutingType.ANYCAST, conf.getAddressesSettings().get("a2").getDefaultAddressRoutingType());
       assertEquals(10000, conf.getAddressesSettings().get("a2").getDefaultConsumerWindowSize());
+      assertEquals(-1, conf.getAddressesSettings().get("a2").getDefaultRingSize());
 
       assertTrue(conf.getResourceLimitSettings().containsKey("myUser"));
       assertEquals(104, conf.getResourceLimitSettings().get("myUser").getMaxConnections());
@@ -462,6 +464,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       CoreQueueConfiguration queueConfiguration = addressConfiguration.getQueueConfigurations().get(0);
 
       assertEquals("q1", queueConfiguration.getName());
+      assertEquals(3L, queueConfiguration.getRingSize().longValue());
       assertFalse(queueConfiguration.isDurable());
       assertEquals("color='blue'", queueConfiguration.getFilterString());
       assertEquals(ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), queueConfiguration.getPurgeOnNoConsumers());
@@ -473,6 +476,7 @@ public class FileConfigurationTest extends ConfigurationImplTest {
       queueConfiguration = addressConfiguration.getQueueConfigurations().get(1);
 
       assertEquals("q2", queueConfiguration.getName());
+      assertEquals(-1, queueConfiguration.getRingSize().longValue());
       assertTrue(queueConfiguration.isDurable());
       assertEquals("color='green'", queueConfiguration.getFilterString());
       assertEquals(Queue.MAX_CONSUMERS_UNLIMITED, queueConfiguration.getMaxConsumers().intValue());
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java
new file mode 100644
index 0000000..f6ed46b
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/persistence/impl/journal/QueueBindingEncodingTest.java
@@ -0,0 +1,106 @@
+/*
+ * Copyright The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.persistence.impl.journal;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class QueueBindingEncodingTest extends Assert {
+
+   @Test
+   public void testEncodeDecode() {
+      final SimpleString name = RandomUtil.randomSimpleString();
+      final SimpleString address = RandomUtil.randomSimpleString();
+      final SimpleString filterString = RandomUtil.randomSimpleString();
+      final SimpleString user = RandomUtil.randomSimpleString();
+      final boolean autoCreated = RandomUtil.randomBoolean();
+      final int maxConsumers = RandomUtil.randomInt();
+      final boolean purgeOnNoConsumers = RandomUtil.randomBoolean();
+      final boolean exclusive = RandomUtil.randomBoolean();
+      final boolean groupRebalance = RandomUtil.randomBoolean();
+      final int groupBuckets = RandomUtil.randomInt();
+      final SimpleString groupFirstKey = RandomUtil.randomSimpleString();
+      final boolean lastValue = RandomUtil.randomBoolean();
+      final SimpleString lastValueKey = RandomUtil.randomSimpleString();
+      final boolean nonDestructive = RandomUtil.randomBoolean();
+      final int consumersBeforeDispatch = RandomUtil.randomInt();
+      final long delayBeforeDispatch = RandomUtil.randomLong();
+      final boolean autoDelete = RandomUtil.randomBoolean();
+      final long autoDeleteDelay = RandomUtil.randomLong();
+      final long autoDeleteMessageCount = RandomUtil.randomLong();
+      final byte routingType = RandomUtil.randomByte();
+      final boolean configurationManaged = RandomUtil.randomBoolean();
+      final long ringSize = RandomUtil.randomLong();
+
+      PersistentQueueBindingEncoding encoding = new PersistentQueueBindingEncoding(name,
+                                                                                   address,
+                                                                                   filterString,
+                                                                                   user,
+                                                                                   autoCreated,
+                                                                                   maxConsumers,
+                                                                                   purgeOnNoConsumers,
+                                                                                   exclusive,
+                                                                                   groupRebalance,
+                                                                                   groupBuckets,
+                                                                                   groupFirstKey,
+                                                                                   lastValue,
+                                                                                   lastValueKey,
+                                                                                   nonDestructive,
+                                                                                   consumersBeforeDispatch,
+                                                                                   delayBeforeDispatch,
+                                                                                   autoDelete,
+                                                                                   autoDeleteDelay,
+                                                                                   autoDeleteMessageCount,
+                                                                                   routingType,
+                                                                                   configurationManaged,
+                                                                                   ringSize);
+      int size = encoding.getEncodeSize();
+      ActiveMQBuffer encodedBuffer = ActiveMQBuffers.fixedBuffer(size);
+      encoding.encode(encodedBuffer);
+
+      PersistentQueueBindingEncoding decoding = new PersistentQueueBindingEncoding();
+      decoding.decode(encodedBuffer);
+
+      assertEquals(name, decoding.getQueueName());
+      assertEquals(address, decoding.getAddress());
+      assertEquals(filterString, decoding.getFilterString());
+      assertEquals(user, decoding.getUser());
+      assertEquals(autoCreated, decoding.isAutoCreated());
+      assertEquals(maxConsumers, decoding.getMaxConsumers());
+      assertEquals(purgeOnNoConsumers, decoding.isPurgeOnNoConsumers());
+      assertEquals(exclusive, decoding.isExclusive());
+      assertEquals(groupRebalance, decoding.isGroupRebalance());
+      assertEquals(groupBuckets, decoding.getGroupBuckets());
+      assertEquals(groupFirstKey, decoding.getGroupFirstKey());
+      assertEquals(lastValue, decoding.isLastValue());
+      assertEquals(lastValueKey, decoding.getLastValueKey());
+      assertEquals(nonDestructive, decoding.isNonDestructive());
+      assertEquals(consumersBeforeDispatch, decoding.getConsumersBeforeDispatch());
+      assertEquals(delayBeforeDispatch, decoding.getDelayBeforeDispatch());
+      assertEquals(autoDelete, decoding.isAutoDelete());
+      assertEquals(autoDeleteDelay, decoding.getAutoDeleteDelay());
+      assertEquals(autoDeleteMessageCount, decoding.getAutoDeleteMessageCount());
+      assertEquals(routingType, decoding.getRoutingType());
+      assertEquals(configurationManaged, decoding.isConfigurationManaged());
+      assertEquals(ringSize, decoding.getRingSize());
+   }
+}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index b94bd3a..8b4869c 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -1026,6 +1026,16 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public void setRingSize(long ringSize) {
+
+      }
+
+      @Override
+      public long getRingSize() {
+         return 0;
+      }
+
+      @Override
       public void setConsumersRefCount(ReferenceCounter referenceCounter) {
 
       }
@@ -1243,6 +1253,11 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public long getMessagesReplaced() {
+         return 0;
+      }
+
+      @Override
       public MessageReference removeReferenceWithID(long id) throws Exception {
          return null;
       }
diff --git a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
index 8b96e91..9b213cc 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-full-config.xml
@@ -402,6 +402,7 @@
             <default-max-consumers>5</default-max-consumers>
             <default-queue-routing-type>ANYCAST</default-queue-routing-type>
             <default-address-routing-type>MULTICAST</default-address-routing-type>
+            <default-ring-size>3</default-ring-size>
          </address-setting>
          <address-setting match="a2">
             <dead-letter-address>a2.1</dead-letter-address>
@@ -446,7 +447,7 @@
       <addresses>
          <address name="addr1">
             <anycast>
-               <queue name="q1">
+               <queue name="q1" ring-size="3">
                   <durable>${falseProp}</durable>
                   <filter string="color='blue'"/>
                </queue>
diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
index b060302..49b625a 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-address-settings.xml
@@ -38,6 +38,7 @@
       <default-max-consumers>5</default-max-consumers>
       <default-queue-routing-type>ANYCAST</default-queue-routing-type>
       <default-address-routing-type>MULTICAST</default-address-routing-type>
+      <default-ring-size>3</default-ring-size>
    </address-setting>
    <address-setting match="a2">
       <dead-letter-address>a2.1</dead-letter-address>
diff --git a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-addresses.xml b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-addresses.xml
index 530695e..f442be6 100644
--- a/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-addresses.xml
+++ b/artemis-server/src/test/resources/ConfigurationTest-xinclude-config-addresses.xml
@@ -17,7 +17,7 @@
 <addresses xmlns="urn:activemq:core">
    <address name="addr1">
       <anycast>
-         <queue name="q1">
+         <queue name="q1" ring-size="3">
             <durable>${falseProp}</durable>
             <filter string="color='blue'"/>
          </queue>
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd
index 0a0f0fe..64f32cd 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -3347,6 +3347,15 @@
                </xsd:annotation>
             </xsd:element>
 
+            <xsd:element name="default-ring-size" type="xsd:long" default="-1" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     the default ring-size value for any matching queue which doesn't have `ring-size` explicitly
+                     defined
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
          </xsd:all>
 
          <xsd:attribute name="match" type="xsd:string" use="required">
@@ -3480,6 +3489,7 @@
       <xsd:attribute name="non-destructive" type="xsd:boolean" use="optional"/>
       <xsd:attribute name="consumers-before-dispatch" type="xsd:int" use="optional"/>
       <xsd:attribute name="delay-before-dispatch" type="xsd:long" use="optional"/>
+      <xsd:attribute name="ring-size" type="xsd:long" use="optional"/>
       <xsd:attributeGroup ref="xml:specialAttrs"/>
    </xsd:complexType>
 
diff --git a/docs/user-manual/en/SUMMARY.md b/docs/user-manual/en/SUMMARY.md
index 2207b32..6cc0efd 100644
--- a/docs/user-manual/en/SUMMARY.md
+++ b/docs/user-manual/en/SUMMARY.md
@@ -39,6 +39,7 @@
 * [Paging](paging.md)
 * [Scheduled Messages](scheduled-messages.md)
 * [Last-Value Queues](last-value-queues.md)
+* [Ring Queues](ring-queues.md)
 * [Exclusive Queues](exclusive-queues.md)
 * [Message Grouping](message-grouping.md)
 * [Consumer Priority](consumer-priority.md)
diff --git a/docs/user-manual/en/address-model.md b/docs/user-manual/en/address-model.md
index 88b0d11..7512522 100644
--- a/docs/user-manual/en/address-model.md
+++ b/docs/user-manual/en/address-model.md
@@ -610,6 +610,7 @@ that would be found in the `broker.xml` file.
       <default-max-consumers>-1</default-max-consumers>
       <default-queue-routing-type></default-queue-routing-type>
       <default-address-routing-type></default-address-routing-type>
+      <default-ring-size>-1</default-ring-size>
    </address-setting>
 </address-settings>
 ```
@@ -844,3 +845,7 @@ client and/or protocol semantics. Default is `MULTICAST`. Read more about
 for a `CORE` protocol consumer, if not defined the default will be set to 
 1 MiB (1024 * 1024 bytes). The consumer will use this value as the window size
 if the value is not set on the client. Read more about [flow control](#flow-control).
+
+`default-ring-size` defines the default `ring-size` value for any matching queue
+which doesn't have `ring-size` explicitly defined. If not defined the default will
+be set to -1. Read more about [ring queues](#ring-queue).
diff --git a/docs/user-manual/en/configuration-index.md b/docs/user-manual/en/configuration-index.md
index 3e116ec..4289e12 100644
--- a/docs/user-manual/en/configuration-index.md
+++ b/docs/user-manual/en/configuration-index.md
@@ -243,6 +243,7 @@ Name | Description | Default
 [default-max-consumers](address-model.md#shared-durable-subscription-queue-using-max-consumers) | `max-consumers` value if none is set on the queue | -1
 [default-queue-routing-type](address-model.md#routing-type) | Routing type for auto-created queues if the type can't be otherwise determined | `MULTICAST`
 [default-address-routing-type](address-model.md#routing-type) | Routing type for auto-created addresses if the type can't be otherwise determined | `MULTICAST`
+[default-ring-size](ring-queues.md) | The ring-size applied to queues without an explicit `ring-size` configured | `-1`
 
 
 ## bridge type
@@ -359,6 +360,7 @@ user | the name of the user to associate with the creation of the queue | n/a
 [purge-on-no-consumers](address-model.md#non-durable-subscription-queue) | whether or not to delete all messages and prevent routing when no consumers are connected | `false`
 [exclusive](exclusive-queues.md) | only deliver messages to one of the connected consumers | `false`
 [last-value](last-value-queues.md) | use last-value semantics | `false`
+[ring-size](ring-queues.md) | the size this queue should maintain according to ring semantics | based on `default-ring-size` `address-setting`
 consumers-before-dispatch | number of consumers required before dispatching messages | 0
 delay-before-dispatch | milliseconds to wait for `consumers-before-dispatch` to be met before dispatching messages anyway | -1 (wait forever)
 
diff --git a/docs/user-manual/en/ring-queues.md b/docs/user-manual/en/ring-queues.md
new file mode 100644
index 0000000..4b1ce19
--- /dev/null
+++ b/docs/user-manual/en/ring-queues.md
@@ -0,0 +1,187 @@
+# Ring Queue
+
+Queues operate with first-in, first-out (FIFO) semantics which means that
+messages, in general, are added to the "tail" of the queue and removed from the
+"head." A "ring" queue is a special type of queue with a *fixed* size. The
+fixed size is maintained by removing the message at the head of the queue when
+the number of messages on the queue reaches the configured size.
+
+For example, consider a queue configured with a ring size of 3 and a producer
+which sends the messages `A`, `B`, `C`, & `D` in that order. Once `C` is sent
+the number of messages in the queue will be 3 which is the same as the
+configured ring size. We can visualize the queue growth like this...
+
+After `A` is sent:
+```
+             |---|
+head/tail -> | A |
+             |---|
+```
+
+After `B` is sent:
+
+```
+        |---|
+head -> | A |
+        |---|
+tail -> | B |
+        |---|
+```
+
+After `C` is sent:
+
+```
+        |---|
+head -> | A |
+        |---|
+        | B |
+        |---|
+tail -> | C |
+        |---|
+```
+
+When `D` is sent it will be added to the tail of the queue and the message at
+the head of the queue (i.e. `A`) will be removed so the queue will look like
+this:
+
+```
+        |---|
+head -> | B |
+        |---|
+        | C |
+        |---|
+tail -> | D |
+        |---|
+```
+
+This example covers the most basic use case with messages being added to the
+tail of the queue. However, there are a few other important use cases
+involving:
+
+ - Messages in delivery & rollbacks
+ - Scheduled messages
+ - Paging
+
+However, before we get to those use cases let's look at the basic configuration
+of a ring queue.
+
+## Configuration
+
+There are 2 parameters related to ring queue configuration.
+
+The `ring-size` parameter can be set directly on the `queue` element. The
+default value comes from the `default-ring-size` `address-setting` (see below).
+
+```xml
+<addresses>
+   <address name="myRing">
+      <anycast>
+         <queue name="myRing" ring-size="3" />
+      </anycast>
+   </address>
+</addresses>
+```
+
+The `default-ring-size` is an `address-setting` which applies to queues on
+matching addresses which don't have an explicit `ring-size` set. This is
+especially useful for auto-created queues. The default value is `-1` (i.e.
+no limit).
+
+```xml
+<address-settings>
+   <address-setting match="ring.#">
+      <default-ring-size>3</default-ring-size>
+   </address-setting>
+</address-settings>
+```
+
+The `ring-size` may be updated at runtime. If the new `ring-size` is set
+*lower* than the previous `ring-size` the broker will not immediately delete
+enough messages from the head of the queue to enforce the new size. New
+messages sent to the queue will force the deletion of old messages (i.e. the
+queue won't grow any larger), but the queue will not reach its new size until
+it does so *naturally* through the normal consumption of messages by
+clients.
+
+## Messages in Delivery & Rollbacks
+
+When messages are "in delivery" they are in an in-between state where they are
+not technically on the queue but they are also not yet acknowledged. The
+broker is at the consumer’s mercy to either acknowledge such messages or not.
+In the context of a ring queue, messages which are in-delivery cannot be
+removed from the queue.
+
+This presents a few dilemmas.
+
+Due to the nature of messages in delivery a client can actually send more
+messages to a ring queue than it would otherwise permit. This can make it
+appear that the ring-size is not being enforced properly. Consider this
+simple scenario:
+
+ - Queue `foo` with `ring-size="3"`
+ - 1 Consumer on queue `foo`
+ - Message `A` sent to `foo` & dispatched to consumer
+ - `messageCount`=1, `deliveringCount`=1
+ - Message `B` sent to `foo` & dispatched to consumer
+ - `messageCount`=2, `deliveringCount`=2
+ - Message `C` sent to `foo` & dispatched to consumer
+ - `messageCount`=3, `deliveringCount`=3
+ - Message `D` sent to `foo` & dispatched to consumer
+ - `messageCount`=4, `deliveringCount`=4
+
+The `messageCount` for `foo` is now 4, one *greater* than the `ring-size`
+of 3! However, the broker has no choice but to allow this because it cannot
+remove messages from the queue which are in delivery.
+
+Now consider that the consumer is closed without actually acknowledging any
+of these 4 messages. These 4 in-delivery, unacknowledged messages will be
+cancelled back to the broker and added to the *head* of the queue in the
+reverse order from which they were consumed. This, of course, will put the
+queue over its configured `ring-size`. Therefore, since a ring queue
+prefers messages at the tail of the queue over messages at the head it will
+keep `B`, `C`, & `D` and delete `A` (since `A` was the last message added
+to the head of the queue).
+
+Transaction or core session rollbacks are treated the same way.
+
+If you wish to avoid these kinds of situations and you're using the core
+client directly or the core JMS client you can minimize messages in delivery
+by reducing the size of `consumerWindowSize` (1024 * 1024 bytes by default).
+
+## Scheduled Messages
+
+When a scheduled message is sent to a queue it isn't immediately added to the
+tail of the queue like normal messages. It is held in an intermediate buffer
+and scheduled for delivery onto the *head* of the queue according to the
+details of the message. However, scheduled messages are nevertheless reflected
+in the message count of the queue. As with messages which are in delivery this
+can make it appear that the ring queue's size is not being enforced. Consider
+this simple scenario:
+
+ - Queue `foo` with `ring-size="3"`
+ - At 12:00 message `A` sent to `foo` scheduled for 12:05
+ - `messageCount`=1, `scheduledCount`=1
+ - At 12:01 message `B` sent to `foo`
+ - `messageCount`=2, `scheduledCount`=1
+ - At 12:02 message `C` sent to `foo`
+ - `messageCount`=3, `scheduledCount`=1
+ - At 12:03 message `D` sent to `foo`
+ - `messageCount`=4, `scheduledCount`=1
+
+The `messageCount` for `foo` is now 4, one *greater* than the `ring-size` of 3!
+However, the scheduled message is not technically on the queue yet (i.e. it is
+on the broker and scheduled to be put on the queue). When the scheduled
+delivery time for 12:05 comes the message will put on the head of the queue,
+but since the ring queue's size has already been reach the scheduled message
+`A` will be removed.
+
+## Paging
+
+Similar to scheduled messages and messages in delivery, paged messages don't
+count against a ring queue's size because messages are actually paged at the
+*address* level, not the queue level. A paged message is not technically on a
+queue although it is reflected in a queue's `messageCount`.
+
+It is recommended that paging is not used for addresses with ring queues. In
+other words, ensure that the entire address will be able to fit into memory or
+use the `DROP`, `BLOCK` or `FAIL` `address-full-policy`.
\ No newline at end of file
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java
index 60a84a4..0901ece 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/UpdateQueueTest.java
@@ -132,7 +132,7 @@ public class UpdateQueueTest extends ActiveMQTestBase {
          prod.send(session.createTextMessage("message " + i));
       }
 
-      server.updateQueue(ADDRESS.toString(), RoutingType.ANYCAST, 1, false, false, "newUser");
+      server.updateQueue(ADDRESS.toString(), RoutingType.ANYCAST, null, 1, false, true, true, 5, "gfk", true, 1, 10L, "newUser", 180L);
 
       conn.close();
       factory.close();
@@ -145,8 +145,17 @@ public class UpdateQueueTest extends ActiveMQTestBase {
       queue = server.locateQueue(ADDRESS);
 
       Assert.assertNotNull("queue not found", queue);
-
+      Assert.assertEquals(1, queue.getMaxConsumers());
+      Assert.assertEquals(false, queue.isPurgeOnNoConsumers());
+      Assert.assertEquals(true, queue.isExclusive());
+      Assert.assertEquals(true, queue.isGroupRebalance());
+      Assert.assertEquals(5, queue.getGroupBuckets());
+      Assert.assertEquals("gfk", queue.getGroupFirstKey().toString());
+      Assert.assertEquals(true, queue.isNonDestructive());
+      Assert.assertEquals(1, queue.getConsumersBeforeDispatch());
+      Assert.assertEquals(10L, queue.getDelayBeforeDispatch());
       Assert.assertEquals("newUser", queue.getUser().toString());
+      Assert.assertEquals(180L, queue.getRingSize());
 
       factory = new ActiveMQConnectionFactory();
 
@@ -162,8 +171,6 @@ public class UpdateQueueTest extends ActiveMQTestBase {
 
       Assert.assertNull(consumer.receiveNoWait());
 
-      Assert.assertEquals(1, queue.getMaxConsumers());
-
       conn.close();
 
       Assert.assertEquals(originalID, server.locateQueue(ADDRESS).getID());
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
index 22c1834..6ba521f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlTest.java
@@ -336,6 +336,66 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
    }
 
    @Test
+   public void testCreateAndDestroyQueue_5() throws Exception {
+      SimpleString address = RandomUtil.randomSimpleString();
+      SimpleString name = RandomUtil.randomSimpleString();
+      boolean durable = RandomUtil.randomBoolean();
+      int maxConsumers = RandomUtil.randomInt();
+      boolean purgeOnNoConsumers = RandomUtil.randomBoolean();
+      boolean exclusive = RandomUtil.randomBoolean();
+      boolean groupRebalance = RandomUtil.randomBoolean();
+      int groupBuckets = 1;
+      String groupFirstKey = RandomUtil.randomSimpleString().toString();
+      boolean lastValue = false;
+      String lastValueKey = null;
+      boolean nonDestructive = RandomUtil.randomBoolean();
+      int consumersBeforeDispatch = RandomUtil.randomInt();
+      long delayBeforeDispatch = RandomUtil.randomLong();
+      boolean autoDelete = RandomUtil.randomBoolean();
+      long autoDeleteDelay = RandomUtil.randomLong();
+      long autoDeleteMessageCount = RandomUtil.randomLong();
+      boolean autoCreateAddress = true;
+      long ringSize = RandomUtil.randomLong();
+
+      ActiveMQServerControl serverControl = createManagementControl();
+
+      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+      serverControl.createQueue(address.toString(), RoutingType.ANYCAST.toString(), name.toString(), null, durable, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, ringSize);
+
+      checkResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+      QueueControl queueControl = ManagementControlHelper.createQueueControl(address, name, RoutingType.ANYCAST, mbeanServer);
+      Assert.assertEquals(address.toString(), queueControl.getAddress());
+      Assert.assertEquals(name.toString(), queueControl.getName());
+      Assert.assertNull(queueControl.getFilter());
+      Assert.assertEquals(durable, queueControl.isDurable());
+      Assert.assertEquals(maxConsumers, queueControl.getMaxConsumers());
+      Assert.assertEquals(purgeOnNoConsumers, queueControl.isPurgeOnNoConsumers());
+      Assert.assertEquals(false, queueControl.isTemporary());
+      Assert.assertEquals(exclusive, queueControl.isExclusive());
+//      Assert.assertEquals(groupRebalance, queueControl.getGroupRebalance());
+//      Assert.assertEquals(groupBuckets, queueControl.getGroupBuckets());
+//      Assert.assertEquals(groupFirstKey, queueControl.getGroupFirstKey());
+      Assert.assertEquals(lastValue, queueControl.isLastValue());
+//      Assert.assertEquals(lastValueKey, queueControl.getLastValueKey());
+//      Assert.assertEquals(nonDestructive, queueControl.isNonDestructive());
+//      Assert.assertEquals(consumersBeforeDispatch, queueControl.getConsumersBeforeDispatch());
+//      Assert.assertEquals(delayBeforeDispatch, queueControl.getDelayBeforeDispatch());
+//      Assert.assertEquals(autoDelete, queueControl.isAutoDelete());
+//      Assert.assertEquals(autoDeleteDelay, queueControl.getAutoDeleteDelay());
+//      Assert.assertEquals(autoDeleteMessageCount, queueControl.autoDeleteMessageCount());
+      Assert.assertEquals(ringSize, queueControl.getRingSize());
+
+      checkResource(ObjectNameBuilder.DEFAULT.getAddressObjectName(address));
+      AddressControl addressControl = ManagementControlHelper.createAddressControl(address, mbeanServer);
+      Assert.assertEquals(address.toString(), addressControl.getAddress());
+
+      serverControl.destroyQueue(name.toString(), true, true);
+
+      checkNoResource(ObjectNameBuilder.DEFAULT.getQueueObjectName(address, name, RoutingType.ANYCAST));
+      checkNoResource(ObjectNameBuilder.DEFAULT.getAddressObjectName(address));
+   }
+
+   @Test
    public void testCreateAndDestroyQueueClosingConsumers() throws Exception {
       SimpleString address = RandomUtil.randomSimpleString();
       SimpleString name = RandomUtil.randomSimpleString();
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index 61a6e02..7278114 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -177,6 +177,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
          }
 
          @Override
+         public String updateQueue(String name, String routingType, String filter, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean groupRebalance, Integer groupBuckets, String groupFirstKey, Boolean nonDestructive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, String user, Long ringSize) throws Exception {
+            return (String) proxy.invokeOperation("updateQueue", name, routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, ringSize);
+         }
+
+         @Override
          public void deleteAddress(@Parameter(name = "name", desc = "The name of the address") String name) throws Exception {
             proxy.invokeOperation("deleteAddress", name);
          }
@@ -225,6 +230,11 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
          }
 
          @Override
+         public String createQueue(String address, String routingType, String name, String filter, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean groupRebalance, int groupBuckets, String groupFirstKey, boolean lastValue, String lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoDelete, long autoDeleteDelay, long autoDeleteMessageCount, boolean autoCreateAddress, long ringSize) throws Exception {
+            return (String) proxy.invokeOperation("createQueue", address, routingType, name, filter, durable, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, groupFirstKey, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoDelete, autoDeleteDelay, autoDeleteMessageCount, autoCreateAddress, ringSize);
+         }
+
+         @Override
          public void createQueue(final String address, final String name, final boolean durable) throws Exception {
             proxy.invokeOperation("createQueue", address, name, durable);
          }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
index 14f55c9..f72fb11 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/QueueControlUsingCoreTest.java
@@ -82,6 +82,15 @@ public class QueueControlUsingCoreTest extends QueueControlTest {
          }
 
          @Override
+         public long getRingSize() {
+            try {
+               return (Long) proxy.invokeOperation(Integer.TYPE, "getRingSize");
+            } catch (Exception e) {
+               throw new RuntimeException(e.getMessage(), e);
+            }
+         }
+
+         @Override
          public boolean changeMessagePriority(final long messageID, final int newPriority) throws Exception {
             return (Boolean) proxy.invokeOperation("changeMessagePriority", messageID, newPriority);
          }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java
new file mode 100644
index 0000000..723b76e
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/server/RingQueueTest.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.server;
+
+import javax.jms.Connection;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.QueueAttributes;
+import org.apache.activemq.artemis.api.core.RoutingType;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientConsumer;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientProducer;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.Queue;
+import org.apache.activemq.artemis.core.server.impl.QueueImpl;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.artemis.utils.RandomUtil;
+import org.junit.Before;
+import org.junit.Test;
+
+public class RingQueueTest extends ActiveMQTestBase {
+
+   private ActiveMQServer server;
+
+   private final SimpleString address = new SimpleString("RingQueueTestAddress");
+
+   private final SimpleString qName = new SimpleString("RingQueueTestQ1");
+
+   @Test
+   public void testSimple() throws Exception {
+      ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0);
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession clientSession = addClientSession(sf.createSession(false, true, true));
+      clientSession.createQueue(address, qName, false, new QueueAttributes().setDurable(true).setRingSize(1L).setMaxConsumers(-1).setPurgeOnNoConsumers(false));
+      clientSession.start();
+      final Queue queue = server.locateQueue(qName);
+      assertEquals(1, queue.getRingSize());
+
+      ClientProducer producer = clientSession.createProducer(address);
+
+      for (int i = 0, j = 0; i < 500; i += 2, j++) {
+         ClientMessage m0 = createTextMessage(clientSession, "hello" + i);
+         producer.send(m0);
+         Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100);
+         ClientMessage m1 = createTextMessage(clientSession, "hello" + (i + 1));
+         producer.send(m1);
+         int expectedMessagesReplaced = j + 1;
+         Wait.assertTrue(() -> queue.getMessagesReplaced() == expectedMessagesReplaced, 2000, 100);
+         Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100);
+         ClientConsumer consumer = clientSession.createConsumer(qName);
+         ClientMessage message = consumer.receiveImmediate();
+         message.acknowledge();
+         consumer.close();
+         assertEquals("hello" + (i + 1), message.getBodyBuffer().readString());
+      }
+   }
+
+   @Test
+   public void testRollback() throws Exception {
+      ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0);
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession clientSession = addClientSession(sf.createSession(false, true, false));
+      clientSession.createQueue(address, qName, false, new QueueAttributes().setDurable(true).setRingSize(1L).setMaxConsumers(-1).setPurgeOnNoConsumers(false));
+      clientSession.start();
+      final Queue queue = server.locateQueue(qName);
+
+      assertEquals(1, queue.getRingSize());
+
+      ClientProducer producer = clientSession.createProducer(address);
+
+      ClientMessage m0 = createTextMessage(clientSession, "hello0");
+      producer.send(m0);
+      Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100);
+
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+
+      ClientMessage message = consumer.receiveImmediate();
+      assertNotNull(message);
+      Wait.assertTrue(() -> queue.getDeliveringCount() == 1, 2000, 100);
+
+      message.acknowledge();
+      assertEquals("hello0", message.getBodyBuffer().readString());
+
+      ClientMessage m1 = createTextMessage(clientSession, "hello1");
+      producer.send(m1);
+      Wait.assertTrue(() -> queue.getDeliveringCount() == 2, 2000, 100);
+      Wait.assertTrue(() -> queue.getMessagesReplaced() == 0, 2000, 100);
+      Wait.assertTrue(() -> queue.getMessageCount() == 2, 2000, 100);
+
+      clientSession.rollback();
+      consumer.close();
+      Wait.assertTrue(() -> queue.getDeliveringCount() == 0, 2000, 100);
+      Wait.assertTrue(() -> queue.getMessagesReplaced() == 1, 2000, 100);
+      Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100);
+
+      consumer = clientSession.createConsumer(qName);
+      message = consumer.receiveImmediate();
+      assertNotNull(message);
+      Wait.assertTrue(() -> queue.getDeliveringCount() == 1, 2000, 100);
+
+      message.acknowledge();
+
+      clientSession.commit();
+
+      Wait.assertTrue(() -> queue.getMessagesAcknowledged() == 1, 2000, 100);
+      assertEquals("hello1", message.getBodyBuffer().readString());
+   }
+
+   @Test
+   public void testConsumerCloseWithDirectDeliver() throws Exception {
+      ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0);
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession clientSession = addClientSession(sf.createSession(false, true, false));
+      clientSession.createQueue(address, qName, false, new QueueAttributes().setDurable(true).setRingSize(1L).setMaxConsumers(-1).setPurgeOnNoConsumers(false));
+      clientSession.start();
+      final Queue queue = server.locateQueue(qName);
+      assertEquals(1, queue.getRingSize());
+
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientProducer producer = clientSession.createProducer(address);
+
+      ClientMessage message = createTextMessage(clientSession, "hello0");
+      producer.send(message);
+      message = createTextMessage(clientSession, "hello1");
+      producer.send(message);
+      Wait.assertTrue(() -> queue.getMessageCount() == 2, 2000, 100);
+      Wait.assertTrue(() -> queue.getDeliveringCount() == 2, 2000, 100);
+      consumer.close();
+      Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100);
+      Wait.assertTrue(() -> queue.getDeliveringCount() == 0, 2000, 100);
+      Wait.assertTrue(() -> queue.getMessagesReplaced() == 1, 2000, 100);
+      consumer = clientSession.createConsumer(qName);
+      message = consumer.receiveImmediate();
+      assertNotNull(message);
+      Wait.assertTrue(() -> queue.getDeliveringCount() == 1, 2000, 100);
+      message.acknowledge();
+      clientSession.commit();
+      Wait.assertTrue(() -> queue.getMessagesAcknowledged() == 1, 2000, 100);
+      assertEquals("hello1", message.getBodyBuffer().readString());
+      consumer.close();
+      Wait.assertTrue(() -> queue.getMessageCount() == 0, 2000, 100);
+      Wait.assertTrue(() -> queue.getDeliveringCount() == 0, 2000, 100);
+      Wait.assertTrue(() -> queue.getMessagesReplaced() == 1, 2000, 100);
+   }
+
+   @Test
+   public void testScheduled() throws Exception {
+      ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0);
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession clientSession = addClientSession(sf.createSession(false, true, false));
+      clientSession.createQueue(address, qName, false, new QueueAttributes().setDurable(true).setRingSize(1L).setMaxConsumers(-1).setPurgeOnNoConsumers(false));
+      clientSession.start();
+      final Queue queue = server.locateQueue(qName);
+      assertEquals(1, queue.getRingSize());
+
+      ClientProducer producer = clientSession.createProducer(address);
+
+      ClientMessage m0 = createTextMessage(clientSession, "hello0");
+      long time = System.currentTimeMillis();
+      time += 3000;
+      m0.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
+      producer.send(m0);
+      Wait.assertTrue(() -> queue.getScheduledCount() == 1, 2000, 100);
+      Wait.assertTrue(() -> ((QueueImpl) queue).getMessageCountForRing() == 0, 2000, 100);
+      Thread.sleep(1500);
+      time = System.currentTimeMillis();
+      time += 3000;
+      m0.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
+      producer.send(m0);
+      Wait.assertTrue(() -> queue.getScheduledCount() == 2, 2000, 100);
+      Wait.assertTrue(() -> ((QueueImpl) queue).getMessageCountForRing() == 0, 2000, 100);
+      Wait.assertTrue(() -> queue.getMessagesReplaced() == 1, 5000, 100);
+      Wait.assertTrue(() -> ((QueueImpl) queue).getMessageCountForRing() == 1, 3000, 100);
+   }
+
+   @Test
+   public void testDefaultAddressSetting() throws Exception {
+      SimpleString random = RandomUtil.randomSimpleString();
+      server.getAddressSettingsRepository().addMatch(address.toString(), new AddressSettings().setDefaultRingSize(100));
+
+      ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://0");
+      Connection c = cf.createConnection();
+      Session s = c.createSession();
+      MessageProducer producer = s.createProducer(s.createQueue(address.toString()));
+      producer.send(s.createMessage());
+      Wait.assertTrue(() -> server.locateQueue(address) != null);
+      assertEquals(100, server.locateQueue(address).getRingSize());
+      producer.close();
+      producer = s.createProducer(s.createQueue(random.toString()));
+      producer.send(s.createMessage());
+      Wait.assertTrue(() -> server.locateQueue(random) != null);
+      assertEquals(ActiveMQDefaultConfiguration.getDefaultRingSize(), server.locateQueue(random).getRingSize());
+   }
+
+   @Test
+   public void testUpdate() throws Exception {
+      ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0);
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession clientSession = addClientSession(sf.createSession(false, true, true));
+      clientSession.createQueue(address, RoutingType.ANYCAST, qName);
+      clientSession.start();
+      final Queue queue = server.locateQueue(qName);
+      assertEquals(-1, queue.getRingSize());
+
+      ClientProducer producer = clientSession.createProducer(address);
+      for (int i = 0; i < 100; i++) {
+         producer.send(clientSession.createMessage(true));
+      }
+      Wait.assertTrue(() -> queue.getMessageCount() == 100, 2000, 100);
+
+      queue.setRingSize(10);
+
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      ClientMessage message;
+      for (int j = 0; j < 95; j++) {
+         message = consumer.receiveImmediate();
+         message.acknowledge();
+      }
+      consumer.close();
+      Wait.assertTrue(() -> queue.getMessageCount() == 5, 2000, 100);
+
+      for (int i = 0; i < 10; i++) {
+         producer.send(clientSession.createMessage(true));
+      }
+      Wait.assertTrue(() -> queue.getMessageCount() == 10, 2000, 100);
+      Wait.assertTrue(() -> queue.getMessagesReplaced() == 5, 2000, 100);
+      consumer = clientSession.createConsumer(qName);
+      message = consumer.receiveImmediate();
+      assertNotNull(message);
+      message.acknowledge();
+      consumer.close();
+      Wait.assertTrue(() -> queue.getMessageCount() == 9, 2000, 100);
+
+      queue.setRingSize(5);
+
+      consumer = clientSession.createConsumer(qName);
+      for (int j = 0; j < 4; j++) {
+         message = consumer.receiveImmediate();
+         message.acknowledge();
+      }
+      consumer.close();
+      Wait.assertTrue(() -> queue.getMessageCount() == 5, 2000, 100);
+      producer.send(clientSession.createMessage(true));
+      Wait.assertTrue(() -> queue.getMessagesReplaced() == 6, 2000, 100);
+
+      queue.setRingSize(10);
+
+      for (int i = 0; i < 5; i++) {
+         producer.send(clientSession.createMessage(true));
+      }
+      Wait.assertTrue(() -> queue.getMessageCount() == 10, 2000, 100);
+      producer.send(clientSession.createMessage(true));
+      Wait.assertTrue(() -> queue.getMessagesReplaced() == 7, 2000, 100);
+      Wait.assertTrue(() -> queue.getMessageCount() == 10, 2000, 100);
+   }
+
+   @Test
+   public void testNonDestructive() throws Exception {
+      ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0);
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession clientSession = addClientSession(sf.createSession(false, true, true));
+      clientSession.createQueue(address, qName, false, new QueueAttributes().setDurable(true).setNonDestructive(true).setRingSize(1L).setMaxConsumers(-1).setPurgeOnNoConsumers(false));
+      clientSession.start();
+      final Queue queue = server.locateQueue(qName);
+      assertEquals(1, queue.getRingSize());
+
+      ClientProducer producer = clientSession.createProducer(address);
+
+      ClientMessage message = createTextMessage(clientSession, "hello" + 0);
+      producer.send(message);
+      for (int i = 0; i < 5; i++) {
+         Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100);
+         message = createTextMessage(clientSession, "hello" + (i + 1));
+         producer.send(message);
+         final int finalI = i + 1;
+         Wait.assertTrue(() -> queue.getMessagesReplaced() == finalI, 2000, 100);
+         Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100);
+         ClientConsumer consumer = clientSession.createConsumer(qName);
+         message = consumer.receiveImmediate();
+         assertNotNull(message);
+         message.acknowledge(); // non-destructive!
+         consumer.close();
+         assertEquals("hello" + (i + 1), message.getBodyBuffer().readString());
+      }
+   }
+
+   @Test
+   public void testNonDestructiveWithConsumerClose() throws Exception {
+      ServerLocator locator = createNettyNonHALocator().setBlockOnAcknowledge(true).setAckBatchSize(0);
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession clientSession = addClientSession(sf.createSession(false, true, true));
+      clientSession.createQueue(address, qName, false, new QueueAttributes().setDurable(true).setNonDestructive(true).setRingSize(1L).setMaxConsumers(-1).setPurgeOnNoConsumers(false));
+      clientSession.start();
+      final Queue queue = server.locateQueue(qName);
+      assertEquals(1, queue.getRingSize());
+
+      ClientProducer producer = clientSession.createProducer(address);
+
+      ClientMessage m0 = createTextMessage(clientSession, "hello" + 0);
+      producer.send(m0);
+      Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100);
+      ClientConsumer consumer = clientSession.createConsumer(qName);
+      Wait.assertTrue(() -> queue.getDeliveringCount() == 1, 2000, 100);
+      consumer.close();
+      Thread.sleep(3000);
+      Wait.assertTrue(() -> queue.getMessageCount() == 1, 2000, 100);
+   }
+
+   @Override
+   @Before
+   public void setUp() throws Exception {
+      super.setUp();
+
+      server = addServer(ActiveMQServers.newActiveMQServer(createDefaultNettyConfig(), true));
+      // start the server
+      server.start();
+   }
+}
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index da25078..3f236c3 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -460,6 +460,16 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public void setRingSize(long ringSize) {
+
+   }
+
+   @Override
+   public long getRingSize() {
+      return 0;
+   }
+
+   @Override
    public ReferenceCounter getConsumersRefCount() {
       return null;  //To change body of implemented methods use File | Settings | File Templates.
    }
@@ -561,6 +571,12 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public long getMessagesReplaced() {
+      // no-op
+      return 0;
+   }
+
+   @Override
    public void resetMessagesAdded() {
       // no-op
 
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 6c84ffc..406b40c 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -63,6 +63,25 @@ public class FakePostOffice implements PostOffice {
    }
 
    @Override
+   public QueueBinding updateQueue(SimpleString name,
+                                   RoutingType routingType,
+                                   Filter filter,
+                                   Integer maxConsumers,
+                                   Boolean purgeOnNoConsumers,
+                                   Boolean exclusive,
+                                   Boolean groupRebalance,
+                                   Integer groupBuckets,
+                                   SimpleString groupFirstKey,
+                                   Boolean nonDestructive,
+                                   Integer consumersBeforeDispatch,
+                                   Long delayBeforeDispatch,
+                                   SimpleString user,
+                                   Boolean configurationManaged,
+                                   Long ringSize) throws Exception {
+      return null;
+   }
+
+   @Override
    public AddressInfo updateAddressInfo(SimpleString addressName,
                                         EnumSet<RoutingType> routingTypes) throws Exception {
       return null;