You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2019/02/22 00:58:42 UTC

[activemq-artemis] branch master updated: ARTEMIS-2118 Enhanced Message Groups Support

This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new c4e68b6  ARTEMIS-2118 Enhanced Message Groups Support
     new 75cebe9  This closes #2548
c4e68b6 is described below

commit c4e68b604678cbf3b726fe09746f8b04417d0121
Author: Michael André Pearce <mi...@me.com>
AuthorDate: Fri Feb 15 00:48:54 2019 +0000

    ARTEMIS-2118 Enhanced Message Groups Support
    
    Support using group buckets on a queue for better local group scaling
    Support disabling message groups on a queue
    Support rebalancing groups when a consumer is added.
---
 .../activemq/artemis/api/core/QueueAttributes.java |  25 ++
 .../activemq/artemis/api/core/SimpleString.java    |  11 +-
 .../artemis/utils/collections/NoOpMap.java         | 100 ++++++
 .../artemis/utils/collections/NoOpMapTest.java     | 110 +++++++
 .../api/config/ActiveMQDefaultConfiguration.java   |  12 +
 .../artemis/api/core/client/ClientSession.java     |   4 +
 .../api/core/management/ActiveMQServerControl.java |   4 +
 .../artemis/core/client/impl/QueueQueryImpl.java   |  20 +-
 .../protocol/core/impl/ActiveMQSessionContext.java |   4 +-
 .../impl/wireformat/CreateQueueMessage_V2.java     |  44 +++
 .../wireformat/CreateSharedQueueMessage_V2.java    |  40 +++
 .../SessionQueueQueryResponseMessage_V3.java       |  50 ++-
 .../artemis/core/server/QueueQueryResult.java      |  18 ++
 .../core/config/CoreQueueConfiguration.java        |  41 +++
 .../deployers/impl/FileConfigurationParser.java    |  32 +-
 .../management/impl/ActiveMQServerControlImpl.java |  12 +-
 .../artemis/core/persistence/QueueBindingInfo.java |   4 +
 .../journal/AbstractJournalStorageManager.java     |   2 +-
 .../codec/PersistentQueueBindingEncoding.java      |  39 ++-
 .../artemis/core/postoffice/PostOffice.java        |   2 +
 .../core/postoffice/impl/PostOfficeImpl.java       |  10 +
 .../protocol/core/ServerSessionPacketHandler.java  |   4 +-
 .../artemis/core/server/ActiveMQServer.java        |  12 +-
 .../apache/activemq/artemis/core/server/Queue.java |   8 +
 .../activemq/artemis/core/server/QueueConfig.java  |  40 ++-
 .../artemis/core/server/ServerSession.java         |   4 +
 .../core/server/impl/ActiveMQServerImpl.java       |  54 ++--
 .../core/server/impl/BucketMessageGroups.java      | 164 ++++++++++
 .../core/server/impl/DisabledMessageGroups.java    |  38 +++
 .../artemis/core/server/impl/LastValueQueue.java   |   4 +-
 .../artemis/core/server/impl/MapMessageGroups.java |  69 ++++
 .../artemis/core/server/impl/MessageGroups.java    |  40 +++
 .../core/server/impl/PostOfficeJournalLoader.java  |   2 +
 .../artemis/core/server/impl/QueueFactoryImpl.java |   6 +-
 .../artemis/core/server/impl/QueueImpl.java        | 103 +++---
 .../core/server/impl/ServerSessionImpl.java        |  28 +-
 .../core/server/impl/SimpleMessageGroups.java      |  33 ++
 .../core/settings/impl/AddressSettings.java        |  76 ++++-
 .../resources/schema/artemis-configuration.xsd     |  20 ++
 .../core/server/impl/BucketMessageGroupsTest.java  | 201 ++++++++++++
 .../server/impl/ScheduledDeliveryHandlerTest.java  |  20 ++
 .../src/test/resources/artemis-configuration.xsd   |  20 ++
 docs/user-manual/en/message-grouping.md            |  46 +++
 .../distribution/ClusteredGroupingTest.java        |   6 +-
 .../jms/client/ConsumerDelayDispatchTest.java      |   4 +-
 .../tests/integration/jms/client/GroupingTest.java | 352 +++++++++++++++++++++
 .../ActiveMQServerControlUsingCoreTest.java        |   8 +-
 .../persistence/QueueConfigRestartTest.java        |   4 +-
 .../tests/unit/core/postoffice/impl/FakeQueue.java |  20 ++
 .../core/server/impl/fakes/FakePostOffice.java     |   2 +
 50 files changed, 1862 insertions(+), 110 deletions(-)

diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
index 0f7eb3c..75b5775 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/QueueAttributes.java
@@ -26,6 +26,8 @@ public class QueueAttributes implements Serializable {
    public static final String DURABLE = "durable";
    public static final String MAX_CONSUMERS = "max-consumers";
    public static final String EXCLUSIVE = "exclusive";
+   public static final String GROUP_REBALANCE = "group-rebalance";
+   public static final String GROUP_BUCKETS = "group-buckets";
    public static final String LAST_VALUE = "last-value";
    public static final String LAST_VALUE_KEY = "last-value-key";
    public static final String NON_DESTRUCTIVE = "non-destructive";
@@ -39,6 +41,8 @@ public class QueueAttributes implements Serializable {
    private Boolean durable;
    private Integer maxConsumers;
    private Boolean exclusive;
+   private Boolean groupRebalance;
+   private Integer groupBuckets;
    private Boolean lastValue;
    private SimpleString lastValueKey;
    private Boolean nonDestructive;
@@ -73,6 +77,10 @@ public class QueueAttributes implements Serializable {
             setDelayBeforeDispatch(Long.valueOf(value));
          } else if (key.equals(CONSUMER_PRIORITY)) {
             setConsumerPriority(Integer.valueOf(value));
+         } else if (key.equals(GROUP_REBALANCE)) {
+            setGroupRebalance(Boolean.valueOf(value));
+         } else if (key.equals(GROUP_BUCKETS)) {
+            setGroupBuckets(Integer.valueOf(value));
          }
       }
    }
@@ -185,4 +193,21 @@ public class QueueAttributes implements Serializable {
       return this;
    }
 
+   public Boolean getGroupRebalance() {
+      return groupRebalance;
+   }
+
+   public QueueAttributes setGroupRebalance(Boolean groupRebalance) {
+      this.groupRebalance = groupRebalance;
+      return this;
+   }
+
+   public Integer getGroupBuckets() {
+      return groupBuckets;
+   }
+
+   public QueueAttributes setGroupBuckets(Integer groupBuckets) {
+      this.groupBuckets = groupBuckets;
+      return this;
+   }
 }
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
index eb3ce2e..7767fdd 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/SimpleString.java
@@ -454,7 +454,16 @@ public final class SimpleString implements CharSequence, Serializable, Comparabl
     * @return the concatenated SimpleString
     */
    public SimpleString concat(final String toAdd) {
-      return concat(new SimpleString(toAdd));
+      int len = toAdd.length();
+      byte[] bytes = new byte[data.length + len * 2];
+      System.arraycopy(data, 0, bytes, 0, data.length);
+      for (int i = 0; i < len; i++) {
+         char c = toAdd.charAt(i);
+         int offset = data.length + i * 2;
+         bytes[offset] = (byte) (c & 0xFF);
+         bytes[offset + 1] = (byte) (c >> 8 & 0xFF);
+      }
+      return new SimpleString(bytes);
    }
 
    /**
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NoOpMap.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NoOpMap.java
new file mode 100644
index 0000000..d5893ed
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/collections/NoOpMap.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class implements a Map, but actually doesnt store anything, it is similar in idea to an EmptyMap,
+ * but where mutation methods simply do a no op rather than UnsupportedOperationException as with EmptyMap.
+ *
+ * This is used in QueueImpl when message groups is disabled.
+ *
+ * @param <K> the key type.
+ * @param <V> the value type.
+ */
+public class NoOpMap<K,V> extends AbstractMap<K,V> {
+
+   private static final Map NO_OP_MAP = new NoOpMap<>();
+
+   @SuppressWarnings("unchecked")
+   public static <K,V> Map<K,V> instance() {
+      return (Map<K,V>) NO_OP_MAP;
+   }
+
+   private NoOpMap() {
+   }
+
+   @Override
+   public V put(K key, V value) {
+      return null;
+   }
+
+   @Override
+   public int size() {
+      return 0;
+   }
+
+   @Override
+   public boolean isEmpty() {
+      return true;
+   }
+
+   @Override
+   public boolean containsKey(Object key) {
+      return false;
+   }
+
+   @Override
+   public boolean containsValue(Object value) {
+      return false;
+   }
+
+   @Override
+   public V get(Object key) {
+      return null;
+   }
+
+   @Override
+   public Set<K> keySet() {
+      return Collections.emptySet();
+   }
+
+   @Override
+   public Collection<V> values() {
+      return Collections.emptySet();
+   }
+
+   @Override
+   public Set<Entry<K,V>> entrySet() {
+      return Collections.emptySet();
+   }
+
+   @Override
+   public boolean equals(Object o) {
+      return (o instanceof Map) && ((Map)o).size() == 0;
+   }
+
+   @Override
+   public int hashCode() {
+      return 0;
+   }
+}
\ No newline at end of file
diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/NoOpMapTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/NoOpMapTest.java
new file mode 100644
index 0000000..8be1fa9
--- /dev/null
+++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/collections/NoOpMapTest.java
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.artemis.utils.collections;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import org.junit.Test;
+
+public class NoOpMapTest {
+
+   @Test
+   public void testPut() {
+      Map<String, String> map = NoOpMap.instance();
+      assertNull(map.put("hello", "world"));
+      assertNull(map.put("hello", "world2"));
+
+      assertEquals(0, map.size());
+   }
+
+   @Test
+   public void testGet() {
+      Map<String, String> map = NoOpMap.instance();
+      map.put("hello", "world");
+
+      assertNull(map.get("hello"));
+   }
+
+   @Test
+   public void testValues() {
+      Map<String, String> map = NoOpMap.instance();
+      map.put("hello", "world");
+
+      Collection<String> values = map.values();
+
+      assertEquals(0, values.size());
+   }
+
+   @Test
+   public void testKeys() {
+      Map<String, String> map = NoOpMap.instance();
+      map.put("hello", "world");
+
+      Set<String> keySet = map.keySet();
+
+      assertEquals(0, keySet.size());
+   }
+
+   @Test
+   public void testEntrySet() {
+      Map<String, String> map = NoOpMap.instance();
+      map.put("hello", "world");
+
+      Set<Map.Entry<String, String>> entrySet = map.entrySet();
+
+      assertEquals(0, entrySet.size());
+   }
+
+
+   @Test
+   public void testIsEmpty() {
+      Map<String, String> map = NoOpMap.instance();
+      map.put("hello", "world");
+
+      assertTrue(map.isEmpty());
+   }
+
+   @Test
+   public void testRemove() {
+      Map<String, String> map = NoOpMap.instance();
+      map.put("hello", "world");
+
+      assertNull(map.remove("hello"));
+   }
+
+   @Test
+   public void testReplace() {
+      Map<String, String> map = NoOpMap.instance();
+      map.put("hello", "world");
+
+      assertNull(map.replace("hello", "world2"));
+
+      map.put("hello", "world");
+
+      assertFalse(map.replace("hello", "world", "world2"));
+   }
+
+}
\ No newline at end of file
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
index 61f0f5d..d85b978 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/config/ActiveMQDefaultConfiguration.java
@@ -505,6 +505,10 @@ public final class ActiveMQDefaultConfiguration {
 
    public static final long DEFAULT_DELAY_BEFORE_DISPATCH = -1;
 
+   public static final int DEFAULT_GROUP_BUCKETS = -1;
+
+   public static final boolean DEFAULT_GROUP_REBALANCE = false;
+
    public static final RoutingType DEFAULT_ROUTING_TYPE = RoutingType.MULTICAST;
 
    public static final String DEFAULT_SYSTEM_PROPERTY_PREFIX = "brokerconfig.";
@@ -1370,6 +1374,14 @@ public final class ActiveMQDefaultConfiguration {
       return DEFAULT_DELAY_BEFORE_DISPATCH;
    }
 
+   public static int getDefaultGroupBuckets() {
+      return DEFAULT_GROUP_BUCKETS;
+   }
+
+   public static boolean getDefaultGroupRebalance() {
+      return DEFAULT_GROUP_REBALANCE;
+   }
+
    public static String getInternalNamingPrefix() {
       return DEFAULT_INTERNAL_NAMING_PREFIX;
    }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
index 008ab3f..9326e30 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/client/ClientSession.java
@@ -158,6 +158,10 @@ public interface ClientSession extends XAResource, AutoCloseable {
       Long getDelayBeforeDispatch();
 
       Integer getDefaultConsumerWindowSize();
+
+      Boolean isGroupRebalance();
+
+      Integer getGroupBuckets();
    }
 
    // Lifecycle operations ------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
index 04c7e16..a6fb792 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/management/ActiveMQServerControl.java
@@ -598,6 +598,8 @@ public interface ActiveMQServerControl {
                       @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") int maxConsumers,
                       @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") boolean purgeOnNoConsumers,
                       @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") boolean exclusive,
+                      @Parameter(name = "groupRebalance", desc = "If the queue should rebalance groups when a consumer is added") boolean groupRebalance,
+                      @Parameter(name = "groupBuckets", desc = "Number of buckets that should be used for message groups, -1 (default) is unlimited, and groups by raw key instead") int groupBuckets,
                       @Parameter(name = "lastValue", desc = "Use last-value semantics") boolean lastValue,
                       @Parameter(name = "lastValueKey", desc = "Use the specified property key for the last value") String lastValueKey,
                       @Parameter(name = "nonDestructive", desc = "If the queue is non-destructive") boolean nonDestructive,
@@ -712,6 +714,8 @@ public interface ActiveMQServerControl {
                       @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
                       @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
                       @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive,
+                      @Parameter(name = "groupRebalance", desc = "If the queue should rebalance groups when a consumer is added") Boolean groupRebalance,
+                      @Parameter(name = "groupBuckets", desc = "Number of buckets that should be used for message groups, -1 (default) is unlimited, and groups by raw key instead") Integer groupBuckets,
                       @Parameter(name = "nonDestructive", desc = "If the queue is non-destructive") Boolean nonDestructive,
                       @Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") Integer consumersBeforeDispatch,
                       @Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") Long delayBeforeDispatch,
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
index 550728b..bef8b37 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/QueueQueryImpl.java
@@ -50,6 +50,10 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
 
    private final Boolean exclusive;
 
+   private final Boolean groupRebalance;
+
+   private final Integer groupBuckets;
+
    private final Boolean lastValue;
 
    private final SimpleString lastValueKey;
@@ -118,7 +122,7 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
                          final Boolean exclusive,
                          final Boolean lastValue,
                          final Integer defaultConsumerWindowSize) {
-      this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, lastValue, null, null, null, null, defaultConsumerWindowSize);
+      this(durable, temporary, consumerCount, messageCount, filterString, address, name, exists, autoCreateQueues, maxConsumers, autoCreated, purgeOnNoConsumers, routingType, exclusive, null, null, lastValue, null, null, null, null, defaultConsumerWindowSize);
    }
 
    public QueueQueryImpl(final boolean durable,
@@ -135,6 +139,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
                          final boolean purgeOnNoConsumers,
                          final RoutingType routingType,
                          final Boolean exclusive,
+                         final Boolean groupRebalance,
+                         final Integer groupBuckets,
                          final Boolean lastValue,
                          final SimpleString lastValueKey,
                          final Boolean nonDestructive,
@@ -155,6 +161,8 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
       this.purgeOnNoConsumers = purgeOnNoConsumers;
       this.routingType = routingType;
       this.exclusive = exclusive;
+      this.groupRebalance = groupRebalance;
+      this.groupBuckets = groupBuckets;
       this.lastValue = lastValue;
       this.lastValueKey = lastValueKey;
       this.nonDestructive = nonDestructive;
@@ -262,5 +270,15 @@ public class QueueQueryImpl implements ClientSession.QueueQuery {
    public Integer getDefaultConsumerWindowSize() {
       return defaultConsumerWindowSize;
    }
+
+   @Override
+   public Boolean isGroupRebalance() {
+      return groupRebalance;
+   }
+
+   @Override
+   public Integer getGroupBuckets() {
+      return groupBuckets;
+   }
 }
 
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
index bfe8ec0..71e363e 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java
@@ -314,6 +314,8 @@ public class ActiveMQSessionContext extends SessionContext {
               queueAttributes.getMaxConsumers(),
               queueAttributes.getPurgeOnNoConsumers(),
               queueAttributes.getExclusive(),
+              queueAttributes.getGroupRebalance(),
+              queueAttributes.getGroupBuckets(),
               queueAttributes.getLastValue(),
               queueAttributes.getLastValueKey(),
               queueAttributes.getNonDestructive(),
@@ -871,7 +873,7 @@ public class ActiveMQSessionContext extends SessionContext {
       // We try to recreate any non-durable or auto-created queues, since they might not be there on failover/reconnect.
       // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover/reconnection
       if (!queueInfo.isDurable() || queueInfo.isAutoCreated()) {
-         CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isLastValue(), queueInfo.getLastValueKey(), queueInfo.isNonDestructive(), queueInfo.getConsumersBeforeDispatch(), queueInfo.getDelay [...]
+         CreateQueueMessage_V2 createQueueRequest = new CreateQueueMessage_V2(queueInfo.getAddress(), queueInfo.getName(), queueInfo.getRoutingType(), queueInfo.getFilterString(), queueInfo.isDurable(), queueInfo.isTemporary(), queueInfo.getMaxConsumers(), queueInfo.isPurgeOnNoConsumers(), queueInfo.isAutoCreated(), false, queueInfo.isExclusive(), queueInfo.isGroupRebalance(), queueInfo.getGroupBuckets(), queueInfo.isLastValue(), queueInfo.getLastValueKey(), queueInfo.isNonDestructive(), [...]
 
          sendPacketWithoutLock(sessionChannel, createQueueRequest);
       }
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
index 78d869d..4a383b3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateQueueMessage_V2.java
@@ -34,6 +34,10 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
 
    private Boolean exclusive;
 
+   private Boolean groupRebalance;
+
+   private Integer groupBuckets;
+
    private Boolean lastValue;
 
    private SimpleString lastValueKey;
@@ -62,6 +66,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
          autoCreated,
          requiresResponse,
          queueAttributes.getExclusive(),
+         queueAttributes.getGroupRebalance(),
+         queueAttributes.getGroupBuckets(),
          queueAttributes.getLastValue(),
          queueAttributes.getLastValueKey(),
          queueAttributes.getNonDestructive(),
@@ -81,6 +87,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
                                 final boolean autoCreated,
                                 final boolean requiresResponse,
                                 final Boolean exclusive,
+                                final Boolean groupRebalance,
+                                final Integer groupBuckets,
                                 final Boolean lastValue,
                                 final SimpleString lastValueKey,
                                 final Boolean nonDestructive,
@@ -99,6 +107,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       this.maxConsumers = maxConsumers;
       this.purgeOnNoConsumers = purgeOnNoConsumers;
       this.exclusive = exclusive;
+      this.groupRebalance = groupRebalance;
+      this.groupBuckets = groupBuckets;
       this.lastValue = lastValue;
       this.lastValueKey = lastValueKey;
       this.nonDestructive = nonDestructive;
@@ -120,6 +130,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       buff.append(", maxConsumers=" + maxConsumers);
       buff.append(", purgeOnNoConsumers=" + purgeOnNoConsumers);
       buff.append(", exclusive=" + exclusive);
+      buff.append(", groupRebalance=" + groupRebalance);
+      buff.append(", groupBuckets=" + groupBuckets);
       buff.append(", lastValue=" + lastValue);
       buff.append(", lastValueKey=" + lastValue);
       buff.append(", nonDestructive=" + nonDestructive);
@@ -209,6 +221,22 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       this.delayBeforeDispatch = delayBeforeDispatch;
    }
 
+   public Boolean isGroupRebalance() {
+      return groupRebalance;
+   }
+
+   public void setGroupRebalance(Boolean groupRebalance) {
+      this.groupRebalance = groupRebalance;
+   }
+
+   public Integer getGroupBuckets() {
+      return groupBuckets;
+   }
+
+   public void setGroupBuckets(Integer groupBuckets) {
+      this.groupBuckets = groupBuckets;
+   }
+
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
@@ -222,6 +250,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       BufferHelper.writeNullableBoolean(buffer, nonDestructive);
       BufferHelper.writeNullableInteger(buffer, consumersBeforeDispatch);
       BufferHelper.writeNullableLong(buffer, delayBeforeDispatch);
+      BufferHelper.writeNullableBoolean(buffer, groupRebalance);
+      BufferHelper.writeNullableInteger(buffer, groupBuckets);
    }
 
    @Override
@@ -240,6 +270,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
          nonDestructive = BufferHelper.readNullableBoolean(buffer);
          consumersBeforeDispatch = BufferHelper.readNullableInteger(buffer);
          delayBeforeDispatch = BufferHelper.readNullableLong(buffer);
+         groupRebalance = BufferHelper.readNullableBoolean(buffer);
+         groupBuckets = BufferHelper.readNullableInteger(buffer);
       }
    }
 
@@ -252,6 +284,8 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
       result = prime * result + (maxConsumers);
       result = prime * result + (purgeOnNoConsumers ? 1231 : 1237);
       result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237);
+      result = prime * result + (groupRebalance == null ? 0 : groupRebalance ? 1231 : 1237);
+      result = prime * result + (groupBuckets == null ? 0 : groupBuckets.hashCode());
       result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237);
       result = prime * result + (lastValueKey == null ? 0 : lastValueKey.hashCode());
       result = prime * result + (nonDestructive == null ? 0 : nonDestructive ? 1231 : 1237);
@@ -280,6 +314,16 @@ public class CreateQueueMessage_V2 extends CreateQueueMessage {
             return false;
       } else if (!exclusive.equals(other.exclusive))
          return false;
+      if (groupRebalance == null) {
+         if (other.groupRebalance != null)
+            return false;
+      } else if (!groupRebalance.equals(other.groupRebalance))
+         return false;
+      if (groupBuckets == null) {
+         if (other.groupBuckets != null)
+            return false;
+      } else if (!groupBuckets.equals(other.groupBuckets))
+         return false;
       if (lastValue == null) {
          if (other.lastValue != null)
             return false;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
index 9a4ee41..dcd936a 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/CreateSharedQueueMessage_V2.java
@@ -27,6 +27,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
    private Integer maxConsumers;
    private Boolean purgeOnNoConsumers;
    private Boolean exclusive;
+   private Boolean groupRebalance;
+   private Integer groupBuckets;
    private Boolean lastValue;
    private SimpleString lastValueKey;
    private Boolean nonDestructive;
@@ -41,6 +43,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
                                       final Integer maxConsumers,
                                       final Boolean purgeOnNoConsumers,
                                       final Boolean exclusive,
+                                      final Boolean groupRebalance,
+                                      final Integer groupBuckets,
                                       final Boolean lastValue,
                                       final SimpleString lastValueKey,
                                       final Boolean nonDestructive,
@@ -57,6 +61,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
       this.maxConsumers = maxConsumers;
       this.purgeOnNoConsumers = purgeOnNoConsumers;
       this.exclusive = exclusive;
+      this.groupRebalance = groupRebalance;
+      this.groupBuckets = groupBuckets;
       this.lastValue = lastValue;
       this.lastValueKey = lastValueKey;
       this.nonDestructive = nonDestructive;
@@ -141,6 +147,22 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
       this.delayBeforeDispatch = delayBeforeDispatch;
    }
 
+   public Boolean isGroupRebalance() {
+      return groupRebalance;
+   }
+
+   public void setGroupRebalance(Boolean groupRebalance) {
+      this.groupRebalance = groupRebalance;
+   }
+
+   public Integer getGroupBuckets() {
+      return groupBuckets;
+   }
+
+   public void setGroupBuckets(Integer groupBuckets) {
+      this.groupBuckets = groupBuckets;
+   }
+
    @Override
    public String toString() {
       StringBuffer buff = new StringBuffer(getParentString());
@@ -152,6 +174,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
       buff.append(", maxConsumers=" + maxConsumers);
       buff.append(", purgeOnNoConsumers=" + purgeOnNoConsumers);
       buff.append(", exclusive=" + exclusive);
+      buff.append(", groupRebalance=" + groupRebalance);
+      buff.append(", groupBuckets=" + groupBuckets);
       buff.append(", lastValue=" + lastValue);
       buff.append(", lastValueKey=" + lastValueKey);
       buff.append(", nonDestructive=" + nonDestructive);
@@ -178,6 +202,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
       BufferHelper.writeNullableBoolean(buffer, nonDestructive);
       BufferHelper.writeNullableInteger(buffer, consumersBeforeDispatch);
       BufferHelper.writeNullableLong(buffer, delayBeforeDispatch);
+      BufferHelper.writeNullableBoolean(buffer, groupRebalance);
+      BufferHelper.writeNullableInteger(buffer, groupBuckets);
    }
 
    @Override
@@ -199,6 +225,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
          nonDestructive = BufferHelper.readNullableBoolean(buffer);
          consumersBeforeDispatch = BufferHelper.readNullableInteger(buffer);
          delayBeforeDispatch = BufferHelper.readNullableLong(buffer);
+         groupRebalance = BufferHelper.readNullableBoolean(buffer);
+         groupBuckets = BufferHelper.readNullableInteger(buffer);
       }
    }
 
@@ -215,6 +243,8 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
       result = prime * result + (maxConsumers == null ? 0 : maxConsumers.hashCode());
       result = prime * result + (purgeOnNoConsumers == null ? 0 : purgeOnNoConsumers ? 1231 : 1237);
       result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237);
+      result = prime * result + (groupRebalance == null ? 0 : groupRebalance ? 1231 : 1237);
+      result = prime * result + (groupBuckets == null ? 0 : groupBuckets.hashCode());
       result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237);
       result = prime * result + (lastValueKey == null ? 0 : lastValueKey.hashCode());
       result = prime * result + (nonDestructive == null ? 0 : nonDestructive ? 1231 : 1237);
@@ -269,6 +299,16 @@ public class CreateSharedQueueMessage_V2 extends CreateSharedQueueMessage {
             return false;
       } else if (!exclusive.equals(other.exclusive))
          return false;
+      if (groupRebalance == null) {
+         if (other.groupRebalance != null)
+            return false;
+      } else if (!groupRebalance.equals(other.groupRebalance))
+         return false;
+      if (groupBuckets == null) {
+         if (other.groupBuckets != null)
+            return false;
+      } else if (!groupBuckets.equals(other.groupBuckets))
+         return false;
       if (lastValue == null) {
          if (other.lastValue != null)
             return false;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
index 7e24186..09bdaca 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionQueueQueryResponseMessage_V3.java
@@ -36,6 +36,10 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
 
    protected Boolean exclusive;
 
+   protected Boolean groupRebalance;
+
+   protected Integer groupBuckets;
+
    protected Boolean lastValue;
 
    protected SimpleString lastValueKey;
@@ -49,11 +53,11 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
    protected Integer defaultConsumerWindowSize;
 
    public SessionQueueQueryResponseMessage_V3(final QueueQueryResult result) {
-      this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isLastValue(), result.getLastValueKey(), result.isNonDestructive(), result.getConsumersBeforeDispatch(), result.getDelayBeforeDispatch(),  [...]
+      this(result.getName(), result.getAddress(), result.isDurable(), result.isTemporary(), result.getFilterString(), result.getConsumerCount(), result.getMessageCount(), result.isExists(), result.isAutoCreateQueues(), result.isAutoCreated(), result.isPurgeOnNoConsumers(), result.getRoutingType(), result.getMaxConsumers(), result.isExclusive(), result.isGroupRebalance(), result.getGroupBuckets(), result.isLastValue(), result.getLastValueKey(), result.isNonDestructive(), result.getConsume [...]
    }
 
    public SessionQueueQueryResponseMessage_V3() {
-      this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null, null, null, null, null, null);
+      this(null, null, false, false, null, 0, 0, false, false, false, false, RoutingType.MULTICAST, -1, null, null,null, null, null, null, null, null, null);
    }
 
    private SessionQueueQueryResponseMessage_V3(final SimpleString name,
@@ -70,6 +74,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
                                                final RoutingType routingType,
                                                final int maxConsumers,
                                                final Boolean exclusive,
+                                               final Boolean groupRebalance,
+                                               final Integer groupBuckets,
                                                final Boolean lastValue,
                                                final SimpleString lastValueKey,
                                                final Boolean nonDestructive,
@@ -106,6 +112,10 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
 
       this.exclusive = exclusive;
 
+      this.groupRebalance = groupRebalance;
+
+      this.groupBuckets = groupBuckets;
+
       this.lastValue = lastValue;
 
       this.lastValueKey = lastValueKey;
@@ -207,6 +217,22 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       this.defaultConsumerWindowSize = defaultConsumerWindowSize;
    }
 
+   public Boolean isGroupRebalance() {
+      return groupRebalance;
+   }
+
+   public void setGroupRebalance(Boolean groupRebalance) {
+      this.groupRebalance = groupRebalance;
+   }
+
+   public Integer getGroupBuckets() {
+      return groupBuckets;
+   }
+
+   public void setGroupBuckets(Integer groupBuckets) {
+      this.groupBuckets = groupBuckets;
+   }
+
    @Override
    public void encodeRest(final ActiveMQBuffer buffer) {
       super.encodeRest(buffer);
@@ -221,6 +247,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       BufferHelper.writeNullableBoolean(buffer, nonDestructive);
       BufferHelper.writeNullableInteger(buffer, consumersBeforeDispatch);
       BufferHelper.writeNullableLong(buffer, delayBeforeDispatch);
+      BufferHelper.writeNullableBoolean(buffer, groupRebalance);
+      BufferHelper.writeNullableInteger(buffer, groupBuckets);
    }
 
    @Override
@@ -242,6 +270,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
          nonDestructive = BufferHelper.readNullableBoolean(buffer);
          consumersBeforeDispatch = BufferHelper.readNullableInteger(buffer);
          delayBeforeDispatch = BufferHelper.readNullableLong(buffer);
+         groupRebalance = BufferHelper.readNullableBoolean(buffer);
+         groupBuckets = BufferHelper.readNullableInteger(buffer);
       }
    }
 
@@ -254,6 +284,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       result = prime * result + routingType.hashCode();
       result = prime * result + maxConsumers;
       result = prime * result + (exclusive == null ? 0 : exclusive ? 1231 : 1237);
+      result = prime * result + (groupRebalance == null ? 0 : groupRebalance ? 1231 : 1237);
+      result = prime * result + (groupBuckets == null ? 0 : groupBuckets.hashCode());
       result = prime * result + (lastValue == null ? 0 : lastValue ? 1231 : 1237);
       result = prime * result + (lastValueKey == null ? 0 : lastValueKey.hashCode());
       result = prime * result + (nonDestructive == null ? 0 : nonDestructive ? 1231 : 1237);
@@ -278,6 +310,8 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
       buff.append(", routingType=" + routingType);
       buff.append(", maxConsumers=" + maxConsumers);
       buff.append(", exclusive=" + exclusive);
+      buff.append(", groupRebalance=" + groupRebalance);
+      buff.append(", groupBuckets=" + groupBuckets);
       buff.append(", lastValue=" + lastValue);
       buff.append(", lastValueKey=" + lastValueKey);
       buff.append(", nonDestructive=" + nonDestructive);
@@ -289,7 +323,7 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
 
    @Override
    public ClientSession.QueueQuery toQueueQuery() {
-      return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), getDefaultConsumerWindowSize());
+      return new QueueQueryImpl(isDurable(), isTemporary(), getConsumerCount(), getMessageCount(), getFilterString(), getAddress(), getName(), isExists(), isAutoCreateQueues(), getMaxConsumers(), isAutoCreated(), isPurgeOnNoConsumers(), getRoutingType(), isExclusive(), isGroupRebalance(), getGroupBuckets(), isLastValue(), getLastValueKey(), isNonDestructive(), getConsumersBeforeDispatch(), getDelayBeforeDispatch(), getDefaultConsumerWindowSize());
    }
 
    @Override
@@ -310,6 +344,16 @@ public class SessionQueueQueryResponseMessage_V3 extends SessionQueueQueryRespon
             return false;
       } else if (!exclusive.equals(other.exclusive))
          return false;
+      if (groupRebalance == null) {
+         if (other.groupRebalance != null)
+            return false;
+      } else if (!groupRebalance.equals(other.groupRebalance))
+         return false;
+      if (groupBuckets == null) {
+         if (other.groupBuckets != null)
+            return false;
+      } else if (!groupBuckets.equals(other.groupBuckets))
+         return false;
       if (lastValue == null) {
          if (other.lastValue != null)
             return false;
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
index b863181..92bf200 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/server/QueueQueryResult.java
@@ -49,6 +49,10 @@ public class QueueQueryResult {
 
    private Boolean exclusive;
 
+   private Boolean groupRebalance;
+
+   private Integer groupBuckets;
+
    private Boolean lastValue;
 
    private SimpleString lastValueKey;
@@ -75,6 +79,8 @@ public class QueueQueryResult {
                            final RoutingType routingType,
                            final int maxConsumers,
                            final Boolean exclusive,
+                           final Boolean groupRebalance,
+                           final Integer groupBuckets,
                            final Boolean lastValue,
                            final SimpleString lastValueKey,
                            final Boolean nonDestructive,
@@ -109,6 +115,10 @@ public class QueueQueryResult {
 
       this.exclusive = exclusive;
 
+      this.groupRebalance = groupRebalance;
+
+      this.groupBuckets = groupBuckets;
+
       this.lastValue = lastValue;
 
       this.lastValueKey = lastValueKey;
@@ -205,4 +215,12 @@ public class QueueQueryResult {
    public Integer getDefaultConsumerWindowSize() {
       return defaultConsumerWindowSize;
    }
+
+   public Boolean isGroupRebalance() {
+      return groupRebalance;
+   }
+
+   public Integer getGroupBuckets() {
+      return groupBuckets;
+   }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
index 50fee8e..0a3fde8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/CoreQueueConfiguration.java
@@ -37,6 +37,10 @@ public class CoreQueueConfiguration implements Serializable {
 
    private Boolean exclusive;
 
+   private Boolean groupRebalance;
+
+   private Integer groupBuckets;
+
    private Boolean lastValue;
 
    private String lastValueKey;
@@ -80,6 +84,14 @@ public class CoreQueueConfiguration implements Serializable {
       return exclusive;
    }
 
+   public Boolean isGroupRebalance() {
+      return groupRebalance;
+   }
+
+   public Integer getGroupBuckets() {
+      return groupBuckets;
+   }
+
    public Boolean isLastValue() {
       return lastValue;
    }
@@ -177,6 +189,16 @@ public class CoreQueueConfiguration implements Serializable {
       return this;
    }
 
+   public CoreQueueConfiguration setGroupRebalance(Boolean groupRebalance) {
+      this.groupRebalance = groupRebalance;
+      return this;
+   }
+
+   public CoreQueueConfiguration setGroupBuckets(Integer groupBuckets) {
+      this.groupBuckets = groupBuckets;
+      return this;
+   }
+
    public CoreQueueConfiguration setLastValue(Boolean lastValue) {
       this.lastValue = lastValue;
       return this;
@@ -220,6 +242,8 @@ public class CoreQueueConfiguration implements Serializable {
       result = prime * result + ((maxConsumers == null) ? 0 : maxConsumers.hashCode());
       result = prime * result + ((purgeOnNoConsumers == null) ? 0 : purgeOnNoConsumers.hashCode());
       result = prime * result + ((exclusive == null) ? 0 : exclusive.hashCode());
+      result = prime * result + ((groupRebalance == null) ? 0 : groupRebalance.hashCode());
+      result = prime * result + ((groupBuckets == null) ? 0 : groupBuckets.hashCode());
       result = prime * result + ((lastValue == null) ? 0 : lastValue.hashCode());
       result = prime * result + ((lastValueKey == null) ? 0 : lastValueKey.hashCode());
       result = prime * result + ((nonDestructive == null) ? 0 : nonDestructive.hashCode());
@@ -272,6 +296,21 @@ public class CoreQueueConfiguration implements Serializable {
       } else if (!exclusive.equals(other.exclusive)) {
          return false;
       }
+
+      if (groupRebalance == null) {
+         if (other.groupRebalance != null)
+            return false;
+      } else if (!groupRebalance.equals(other.groupRebalance)) {
+         return false;
+      }
+
+      if (groupBuckets == null) {
+         if (other.groupBuckets != null)
+            return false;
+      } else if (!groupBuckets.equals(other.groupBuckets)) {
+         return false;
+      }
+
       if (lastValue == null) {
          if (other.lastValue != null)
             return false;
@@ -322,6 +361,8 @@ public class CoreQueueConfiguration implements Serializable {
          ", maxConsumers=" + maxConsumers +
          ", purgeOnNoConsumers=" + purgeOnNoConsumers +
          ", exclusive=" + exclusive +
+         ", groupRebalance=" + groupRebalance +
+         ", groupBuckets=" + groupBuckets +
          ", lastValue=" + lastValue +
          ", lastValueKey=" + lastValueKey +
          ", nonDestructive=" + nonDestructive +
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
index 8b6097d..e5aba7a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/deployers/impl/FileConfigurationParser.java
@@ -187,6 +187,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
 
    private static final String DEFAULT_EXCLUSIVE_NODE_NAME = "default-exclusive-queue";
 
+   private static final String DEFAULT_GROUP_REBALANCE = "default-group-rebalance";
+
+   private static final String DEFAULT_GROUP_BUCKETS = "default-group-buckets";
+
    private static final String DEFAULT_CONSUMERS_BEFORE_DISPATCH = "default-consumers-before-dispatch";
 
    private static final String DEFAULT_DELAY_BEFORE_DISPATCH = "default-delay-before-dispatch";
@@ -1016,6 +1020,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
             addressSettings.setDefaultNonDestructive(XMLUtil.parseBoolean(child));
          } else if (DEFAULT_EXCLUSIVE_NODE_NAME.equalsIgnoreCase(name)) {
             addressSettings.setDefaultExclusiveQueue(XMLUtil.parseBoolean(child));
+         } else if (DEFAULT_GROUP_REBALANCE.equalsIgnoreCase(name)) {
+            addressSettings.setDefaultGroupRebalance(XMLUtil.parseBoolean(child));
+         } else if (DEFAULT_GROUP_BUCKETS.equalsIgnoreCase(name)) {
+            addressSettings.setDefaultGroupBuckets(XMLUtil.parseInt(child));
          } else if (MAX_DELIVERY_ATTEMPTS.equalsIgnoreCase(name)) {
             addressSettings.setMaxDeliveryAttempts(XMLUtil.parseInt(child));
          } else if (REDISTRIBUTION_DELAY_NODE_NAME.equalsIgnoreCase(name)) {
@@ -1130,6 +1138,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
       boolean purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
       String user = null;
       Boolean exclusive = null;
+      Boolean groupRebalance = null;
+      Integer groupBuckets = null;
       Boolean lastValue = null;
       String lastValueKey = null;
       Boolean nonDestructive = null;
@@ -1146,6 +1156,10 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
             purgeOnNoConsumers = Boolean.parseBoolean(item.getNodeValue());
          } else if (item.getNodeName().equals("exclusive")) {
             exclusive = Boolean.parseBoolean(item.getNodeValue());
+         } else if (item.getNodeName().equals("group-rebalance")) {
+            groupRebalance = Boolean.parseBoolean(item.getNodeValue());
+         } else if (item.getNodeName().equals("group-buckets")) {
+            groupBuckets = Integer.parseInt(item.getNodeValue());
          } else if (item.getNodeName().equals("last-value")) {
             lastValue = Boolean.parseBoolean(item.getNodeValue());
          } else if (item.getNodeName().equals("last-value-key")) {
@@ -1174,8 +1188,22 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
          }
       }
 
-      return new CoreQueueConfiguration().setAddress(address).setName(name).setFilterString(filterString).setDurable(durable).setMaxConsumers(maxConsumers).setPurgeOnNoConsumers(purgeOnNoConsumers).setUser(user)
-                                         .setExclusive(exclusive).setLastValue(lastValue).setLastValueKey(lastValueKey).setNonDestructive(nonDestructive).setConsumersBeforeDispatch(consumersBeforeDispatch).setDelayBeforeDispatch(delayBeforeDispatch);
+      return new CoreQueueConfiguration()
+              .setAddress(address)
+              .setName(name)
+              .setFilterString(filterString)
+              .setDurable(durable)
+              .setMaxConsumers(maxConsumers)
+              .setPurgeOnNoConsumers(purgeOnNoConsumers)
+              .setUser(user)
+              .setExclusive(exclusive)
+              .setGroupRebalance(groupRebalance)
+              .setGroupBuckets(groupBuckets)
+              .setLastValue(lastValue)
+              .setLastValueKey(lastValueKey)
+              .setNonDestructive(nonDestructive)
+              .setConsumersBeforeDispatch(consumersBeforeDispatch)
+              .setDelayBeforeDispatch(delayBeforeDispatch);
    }
 
    protected CoreAddressConfiguration parseAddressConfiguration(final Node node) {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index c89dacb..823b423 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -827,6 +827,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
               maxConsumers,
               purgeOnNoConsumers,
               addressSettings.isDefaultExclusiveQueue(),
+              addressSettings.isDefaultGroupRebalance(),
+              addressSettings.getDefaultGroupBuckets(),
               addressSettings.isDefaultLastValueQueue(),
               addressSettings.getDefaultLastValueKey() == null ? null : addressSettings.getDefaultLastValueKey().toString(),
               addressSettings.isDefaultNonDestructive(),
@@ -844,6 +846,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                              int maxConsumers,
                              boolean purgeOnNoConsumers,
                              boolean exclusive,
+                             boolean groupRebalance,
+                             int groupBuckets,
                              boolean lastValue,
                              String lastValueKey,
                              boolean nonDestructive,
@@ -860,7 +864,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
             filter = new SimpleString(filterStr);
          }
 
-         final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), SimpleString.toSimpleString(name), filter, durable, false, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, SimpleString.toSimpleString(lastValueKey), nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
+         final Queue queue = server.createQueue(SimpleString.toSimpleString(address), RoutingType.valueOf(routingType.toUpperCase()), SimpleString.toSimpleString(name), filter, durable, false, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, SimpleString.toSimpleString(lastValueKey), nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
          return QueueTextFormatter.Long.format(queue, new StringBuilder()).toString();
       } catch (ActiveMQException e) {
          throw new IllegalStateException(e.getMessage());
@@ -895,7 +899,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                              Boolean purgeOnNoConsumers,
                              Boolean exclusive,
                              String user) throws Exception {
-      return updateQueue(name, routingType, null, maxConsumers, purgeOnNoConsumers, exclusive, null, null, null, user);
+      return updateQueue(name, routingType, null, maxConsumers, purgeOnNoConsumers, exclusive, null, null, null, null, null, user);
    }
 
    @Override
@@ -905,6 +909,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
                              Integer maxConsumers,
                              Boolean purgeOnNoConsumers,
                              Boolean exclusive,
+                             Boolean groupRebalance,
+                             Integer groupBuckets,
                              Boolean nonDestructive,
                              Integer consumersBeforeDispatch,
                              Long delayBeforeDispatch,
@@ -914,7 +920,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
       clearIO();
 
       try {
-         final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, filter, maxConsumers, purgeOnNoConsumers, exclusive, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user);
+         final Queue queue = server.updateQueue(name, routingType != null ? RoutingType.valueOf(routingType) : null, filter, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user);
          if (queue == null) {
             throw ActiveMQMessageBundle.BUNDLE.noSuchQueue(new SimpleString(name));
          }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
index 4caa0e4..89034d6 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/QueueBindingInfo.java
@@ -85,4 +85,8 @@ public interface QueueBindingInfo {
    byte getRoutingType();
 
    void setRoutingType(byte routingType);
+
+   boolean isGroupRebalance();
+
+   int getGroupBuckets();
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 2790f32..b441e3a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -1297,7 +1297,7 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
 
       SimpleString filterString = filter == null ? null : filter.getFilterString();
 
-      PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.getRoutingType().getType(), queue.isConfigurationManaged());
+      PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(), binding.getAddress(), filterString, queue.getUser(), queue.isAutoCreated(), queue.getMaxConsumers(), queue.isPurgeOnNoConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), queue.getRoutingType().getType(), queue.is [...]
 
       readLock();
       try {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
index 0f974aa..08236ab 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/codec/PersistentQueueBindingEncoding.java
@@ -62,6 +62,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
 
    public boolean configurationManaged;
 
+   public boolean groupRebalance;
+
+   public int groupBuckets;
+
    public PersistentQueueBindingEncoding() {
    }
 
@@ -98,6 +102,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
          routingType +
          ", configurationManaged=" +
          configurationManaged +
+         ", groupRebalance=" +
+         groupRebalance +
+         ", groupBuckets=" +
+         groupBuckets +
          "]";
    }
 
@@ -109,6 +117,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
                                          final int maxConsumers,
                                          final boolean purgeOnNoConsumers,
                                          final boolean exclusive,
+                                         final boolean groupRebalance,
+                                         final int groupBuckets,
                                          final boolean lastValue,
                                          final SimpleString lastValueKey,
                                          final boolean nonDestructive,
@@ -124,6 +134,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       this.maxConsumers = maxConsumers;
       this.purgeOnNoConsumers = purgeOnNoConsumers;
       this.exclusive = exclusive;
+      this.groupRebalance = groupRebalance;
+      this.groupBuckets = groupBuckets;
       this.lastValue = lastValue;
       this.lastValueKey = lastValueKey;
       this.nonDestructive = nonDestructive;
@@ -286,6 +298,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
    }
 
    @Override
+   public boolean isGroupRebalance() {
+      return groupRebalance;
+   }
+
+   @Override
+   public int getGroupBuckets() {
+      return groupBuckets;
+   }
+
+   @Override
    public void decode(final ActiveMQBuffer buffer) {
       name = buffer.readSimpleString();
       address = buffer.readSimpleString();
@@ -351,6 +373,16 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       } else {
          nonDestructive = ActiveMQDefaultConfiguration.getDefaultNonDestructive();
       }
+      if (buffer.readableBytes() > 0) {
+         groupRebalance = buffer.readBoolean();
+      } else {
+         groupRebalance = ActiveMQDefaultConfiguration.getDefaultGroupRebalance();
+      }
+      if (buffer.readableBytes() > 0) {
+         groupBuckets = buffer.readInt();
+      } else {
+         groupBuckets = ActiveMQDefaultConfiguration.getDefaultGroupBuckets();
+      }
    }
 
    @Override
@@ -370,6 +402,8 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
       buffer.writeLong(delayBeforeDispatch);
       buffer.writeNullableSimpleString(lastValueKey);
       buffer.writeBoolean(nonDestructive);
+      buffer.writeBoolean(groupRebalance);
+      buffer.writeInt(groupBuckets);
    }
 
    @Override
@@ -386,7 +420,10 @@ public class PersistentQueueBindingEncoding implements EncodingSupport, QueueBin
          DataConstants.SIZE_INT +
          DataConstants.SIZE_LONG +
          SimpleString.sizeofNullableString(lastValueKey) +
-         DataConstants.SIZE_BOOLEAN;
+         DataConstants.SIZE_BOOLEAN +
+         DataConstants.SIZE_BOOLEAN +
+         DataConstants.SIZE_INT;
+
    }
 
    private SimpleString createMetadata() {
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
index fb1e96c..0d7c6c2 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/PostOffice.java
@@ -71,6 +71,8 @@ public interface PostOffice extends ActiveMQComponent {
                             Integer maxConsumers,
                             Boolean purgeOnNoConsumers,
                             Boolean exclusive,
+                            Boolean groupRebalance,
+                            Integer groupBuckets,
                             Boolean nonDestructive,
                             Integer consumersBeforeDispatch,
                             Long delayBeforeDispatch,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
index c552ca9..d209cf7 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/postoffice/impl/PostOfficeImpl.java
@@ -475,6 +475,8 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                                    Integer maxConsumers,
                                    Boolean purgeOnNoConsumers,
                                    Boolean exclusive,
+                                   Boolean groupRebalance,
+                                   Integer groupBuckets,
                                    Boolean nonDestructive,
                                    Integer consumersBeforeDispatch,
                                    Long delayBeforeDispatch,
@@ -527,6 +529,14 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
                changed = true;
                queue.setExclusive(exclusive);
             }
+            if (groupRebalance != null && queue.isGroupRebalance() != groupRebalance.booleanValue()) {
+               changed = true;
+               queue.setGroupRebalance(groupRebalance);
+            }
+            if (groupBuckets != null && queue.getGroupBuckets() != groupBuckets.intValue()) {
+               changed = true;
+               queue.setGroupBuckets(groupBuckets);
+            }
             if (nonDestructive != null && queue.isNonDestructive() != nonDestructive.booleanValue()) {
                changed = true;
                queue.setNonDestructive(nonDestructive);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
index 88c35bf..2aa3046 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java
@@ -357,7 +357,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   CreateQueueMessage_V2 request = (CreateQueueMessage_V2) packet;
                   requiresResponse = request.isRequiresResponse();
                   session.createQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isTemporary(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(),
-                                      request.isExclusive(), request.isLastValue(), request.getLastValueKey(), request.isNonDestructive(), request.getConsumersBeforeDispatch(), request.getDelayBeforeDispatch(), request.isAutoCreated());
+                                      request.isExclusive(), request.isGroupRebalance(), request.getGroupBuckets(), request.isLastValue(), request.getLastValueKey(), request.isNonDestructive(), request.getConsumersBeforeDispatch(), request.getDelayBeforeDispatch(), request.isAutoCreated());
                   if (requiresResponse) {
                      response = createNullResponseMessage(packet);
                   }
@@ -381,7 +381,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
                   QueueQueryResult result = session.executeQueueQuery(request.getQueueName());
                   if (!(result.isExists() && Objects.equals(result.getAddress(), request.getAddress()) && Objects.equals(result.getFilterString(), request.getFilterString()))) {
                      session.createSharedQueue(request.getAddress(), request.getQueueName(), request.getRoutingType(), request.getFilterString(), request.isDurable(), request.getMaxConsumers(), request.isPurgeOnNoConsumers(),
-                                               request.isExclusive(), request.isLastValue(), request.getLastValueKey(), request.isNonDestructive(), request.getConsumersBeforeDispatch(), request.getDelayBeforeDispatch());
+                                               request.isExclusive(), request.isGroupRebalance(), request.getGroupBuckets(), request.isLastValue(), request.getLastValueKey(), request.isNonDestructive(), request.getConsumersBeforeDispatch(), request.getDelayBeforeDispatch());
                   }
                   if (requiresResponse) {
                      response = createNullResponseMessage(packet);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index 1791f1c..eca14b8 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -396,7 +396,8 @@ public interface ActiveMQServer extends ServiceComponent {
                           SimpleString user, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue) throws Exception;
 
    void createSharedQueue(SimpleString address, RoutingType routingType, SimpleString name, SimpleString filterString,
-                          SimpleString user, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue,
+                          SimpleString user, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive,
+                          boolean groupRebalance, int groupBuckets, boolean lastValue,
                           SimpleString lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch) throws Exception;
 
    Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
@@ -410,7 +411,7 @@ public interface ActiveMQServer extends ServiceComponent {
                      boolean autoCreateAddress) throws Exception;
 
    Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
-                     boolean durable, boolean temporary, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive,
+                     boolean durable, boolean temporary, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean groupRebalance, int groupBuckets,
                      boolean lastValue, SimpleString lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
 
    Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
@@ -427,7 +428,7 @@ public interface ActiveMQServer extends ServiceComponent {
 
    Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter,
                      SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers,
-                     Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, SimpleString lastValueKey, Boolean nonDestructive,
+                     Boolean purgeOnNoConsumers, Boolean exclusive, Boolean groupRebalance, Integer groupBuckets, Boolean lastValue, SimpleString lastValueKey, Boolean nonDestructive,
                      Integer consumersBeforeDispatch, Long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
 
    Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
@@ -440,7 +441,8 @@ public interface ActiveMQServer extends ServiceComponent {
 
    Queue createQueue(SimpleString address, RoutingType routingType, SimpleString queueName, SimpleString filter,
                      SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
-                     boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, SimpleString lastValueKey, boolean nonDestructive,
+                     boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean groupRebalance,
+                     int groupBuckets, boolean lastValue, SimpleString lastValueKey, boolean nonDestructive,
                      int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception;
 
    @Deprecated
@@ -548,6 +550,8 @@ public interface ActiveMQServer extends ServiceComponent {
                      Integer maxConsumers,
                      Boolean purgeOnNoConsumers,
                      Boolean exclusive,
+                     Boolean groupRebalance,
+                     Integer groupBuckets,
                      Boolean nonDestructive,
                      Integer consumersBeforeDispatch,
                      Long delayBeforeDispatch,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
index 031d01a..2de830e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/Queue.java
@@ -104,6 +104,14 @@ public interface Queue extends Bindable,CriticalComponent {
 
    void setMaxConsumer(int maxConsumers);
 
+   int getGroupBuckets();
+
+   void setGroupBuckets(int groupBuckets);
+
+   boolean isGroupRebalance();
+
+   void setGroupRebalance(boolean groupRebalance);
+
    boolean isConfigurationManaged();
 
    void setConfigurationManaged(boolean configurationManaged);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
index 53226b4..b3c8009 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/QueueConfig.java
@@ -43,6 +43,8 @@ public final class QueueConfig {
    private final boolean purgeOnNoConsumers;
    private final int consumersBeforeDispatch;
    private final long delayBeforeDispatch;
+   private final boolean groupRebalance;
+   private final int groupBuckets;
    private final boolean configurationManaged;
    private final SimpleString lastValueKey;
    private final boolean nonDestructive;
@@ -67,6 +69,8 @@ public final class QueueConfig {
       private boolean purgeOnNoConsumers;
       private int consumersBeforeDispatch;
       private long delayBeforeDispatch;
+      private boolean groupRebalance;
+      private int groupBuckets;
       private boolean configurationManaged;
 
       private Builder(final long id, final SimpleString name) {
@@ -92,6 +96,8 @@ public final class QueueConfig {
          this.purgeOnNoConsumers = ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers();
          this.consumersBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch();
          this.delayBeforeDispatch = ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch();
+         this.groupRebalance = ActiveMQDefaultConfiguration.getDefaultGroupRebalance();
+         this.groupBuckets = ActiveMQDefaultConfiguration.getDefaultGroupBuckets();
          this.configurationManaged = false;
          validateState();
       }
@@ -184,6 +190,18 @@ public final class QueueConfig {
          return this;
       }
 
+      public Builder groupRebalance(final boolean groupRebalance) {
+         this.groupRebalance = groupRebalance;
+         return this;
+      }
+
+
+      public Builder groupBuckets(final int groupBuckets) {
+         this.groupBuckets = groupBuckets;
+         return this;
+      }
+
+
       public Builder routingType(RoutingType routingType) {
          this.routingType = routingType;
          return this;
@@ -215,7 +233,7 @@ public final class QueueConfig {
          } else {
             pageSubscription = null;
          }
-         return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, configurationManaged);
+         return new QueueConfig(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, groupRebalance, groupBuckets, configurationManaged);
       }
 
    }
@@ -266,6 +284,8 @@ public final class QueueConfig {
                        final int consumersBeforeDispatch,
                        final long delayBeforeDispatch,
                        final boolean purgeOnNoConsumers,
+                       final boolean groupRebalance,
+                       final int groupBuckets,
                        final boolean configurationManaged) {
       this.id = id;
       this.address = address;
@@ -285,6 +305,8 @@ public final class QueueConfig {
       this.maxConsumers = maxConsumers;
       this.consumersBeforeDispatch = consumersBeforeDispatch;
       this.delayBeforeDispatch = delayBeforeDispatch;
+      this.groupRebalance = groupRebalance;
+      this.groupBuckets = groupBuckets;
       this.configurationManaged = configurationManaged;
    }
 
@@ -360,6 +382,14 @@ public final class QueueConfig {
       return delayBeforeDispatch;
    }
 
+   public boolean isGroupRebalance() {
+      return groupRebalance;
+   }
+
+   public int getGroupBuckets() {
+      return groupBuckets;
+   }
+
    public boolean isConfigurationManaged() {
       return configurationManaged;
    }
@@ -409,6 +439,10 @@ public final class QueueConfig {
          return false;
       if (purgeOnNoConsumers != that.purgeOnNoConsumers)
          return false;
+      if (groupRebalance != that.groupRebalance)
+         return false;
+      if (groupBuckets != that.groupBuckets)
+         return false;
       if (configurationManaged != that.configurationManaged)
          return false;
       return user != null ? user.equals(that.user) : that.user == null;
@@ -435,6 +469,8 @@ public final class QueueConfig {
       result = 31 * result + consumersBeforeDispatch;
       result = 31 * result + Long.hashCode(delayBeforeDispatch);
       result = 31 * result + (purgeOnNoConsumers ? 1 : 0);
+      result = 31 * result + (groupRebalance ? 1 : 0);
+      result = 31 * result + groupBuckets;
       result = 31 * result + (configurationManaged ? 1 : 0);
       return result;
    }
@@ -460,6 +496,8 @@ public final class QueueConfig {
          + ", consumersBeforeDispatch=" + consumersBeforeDispatch
          + ", delayBeforeDispatch=" + delayBeforeDispatch
          + ", purgeOnNoConsumers=" + purgeOnNoConsumers
+         + ", groupRebalance=" + groupRebalance
+         + ", groupBuckets=" + groupBuckets
          + ", configurationManaged=" + configurationManaged + '}';
    }
 }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 3ba3384..6ba9b3f 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -181,6 +181,8 @@ public interface ServerSession extends SecurityAuth {
                      int maxConsumers,
                      boolean purgeOnNoConsumers,
                      Boolean exclusive,
+                     Boolean groupRebalance,
+                     Integer groupBuckets,
                      Boolean lastValue,
                      SimpleString lastValueKey,
                      Boolean nonDestructive,
@@ -335,6 +337,8 @@ public interface ServerSession extends SecurityAuth {
                           Integer maxConsumers,
                           Boolean purgeOnNoConsumers,
                           Boolean exclusive,
+                          Boolean groupRebalance,
+                          Integer groupBuckets,
                           Boolean lastValue,
                           SimpleString lastValueKey,
                           Boolean nonDestructive,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index fa6e5ba..61600cb 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -909,6 +909,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
       int defaultConsumersBeforeDispatch = addressSettings.getDefaultConsumersBeforeDispatch();
       long defaultDelayBeforeDispatch = addressSettings.getDefaultDelayBeforeDispatch();
       int defaultConsumerWindowSize = addressSettings.getDefaultConsumerWindowSize();
+      boolean defaultGroupRebalance = addressSettings.isDefaultGroupRebalance();
+      int defaultGroupBuckets = addressSettings.getDefaultGroupBuckets();
 
       SimpleString managementAddress = getManagementService() != null ? getManagementService().getManagementAddress() : null;
 
@@ -919,12 +921,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
 
          SimpleString filterString = filter == null ? null : filter.getFilterString();
 
-         response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.getDelayBeforeDispatch(), defaultConsumerWindowSize);
+         response = new QueueQueryResult(realName, binding.getAddress(), queue.isDurable(), queue.isTemporary(), filterString, queue.getConsumerCount(), queue.getMessageCount(), autoCreateQueues, true, queue.isAutoCreated(), queue.isPurgeOnNoConsumers(), queue.getRoutingType(), queue.getMaxConsumers(), queue.isExclusive(), queue.isGroupRebalance(), queue.getGroupBuckets(), queue.isLastValue(), queue.getLastValueKey(), queue.isNonDestructive(), queue.getConsumersBeforeDispatch(), queue.ge [...]
       } else if (realName.equals(managementAddress)) {
          // make an exception for the management address (see HORNETQ-29)
-         response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, null, null, null, null, defaultConsumerWindowSize);
+         response = new QueueQueryResult(realName, managementAddress, true, false, null, -1, -1, autoCreateQueues, true, false, false, RoutingType.MULTICAST, -1, false, false, null, null, null, null, null, null, defaultConsumerWindowSize);
       } else {
-         response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, defaultConsumerWindowSize);
+         response = new QueueQueryResult(realName, addressName, true, false, null, 0, 0, autoCreateQueues, false, false, defaultPurgeOnNoConsumers, RoutingType.MULTICAST, defaultMaxConsumers, defaultExclusiveQueue, defaultGroupRebalance, defaultGroupBuckets, defaultLastValueQueue, defaultLastValueKey, defaultNonDestructive, defaultConsumersBeforeDispatch, defaultDelayBeforeDispatch, defaultConsumerWindowSize);
       }
 
       return response;
@@ -1712,13 +1714,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             final int maxConsumers,
                             final boolean purgeOnNoConsumers,
                             final boolean exclusive,
+                            final boolean groupRebalance,
+                            final int groupBuckets,
                             final boolean lastValue,
                             final SimpleString lastValueKey,
                             final boolean nonDestructive,
                             final int consumersBeforeDispatch,
                             final long delayBeforeDispatch,
                             final boolean autoCreateAddress) throws Exception {
-      return createQueue(address, routingType, queueName, filter, null, durable, temporary, false, false, false, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
+      return createQueue(address, routingType, queueName, filter, null, durable, temporary, false, false, false, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
    }
 
    @Override
@@ -1740,18 +1744,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    @Override
    public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception {
       AddressSettings as = getAddressSettingsRepository().getMatch(addressInfo == null ? queueName.toString() : addressInfo.getName().toString());
-      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress, false);
+      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(),  as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress, false);
    }
 
    @Override
    public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, boolean autoCreateAddress) throws Exception {
       AddressSettings as = getAddressSettingsRepository().getMatch(addressInfo == null ? queueName.toString() : addressInfo.getName().toString());
-      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress, false);
+      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive,  as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress, false);
    }
 
    @Override
-   public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean lastValue, SimpleString lastValueKey, Boolean nonDestructive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, boolean autoCreateAddress) throws Exception {
-      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress, false);
+   public Queue createQueue(AddressInfo addressInfo, SimpleString queueName, SimpleString filter, SimpleString user, boolean durable, boolean temporary, boolean autoCreated, Integer maxConsumers, Boolean purgeOnNoConsumers, Boolean exclusive, Boolean groupRebalance, Integer groupBuckets, Boolean lastValue, SimpleString lastValueKey, Boolean nonDestructive, Integer consumersBeforeDispatch, Long delayBeforeDispatch, boolean autoCreateAddress) throws Exception {
+      return createQueue(addressInfo, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress, false);
    }
 
 
@@ -1760,7 +1764,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                      SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
                      boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean autoCreateAddress) throws Exception {
       AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? queueName.toString() : address.toString());
-      return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress);
+      return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress);
    }
 
    @Override
@@ -1768,7 +1772,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             SimpleString user, boolean durable, boolean temporary, boolean ignoreIfExists, boolean transientQueue,
                             boolean autoCreated, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, boolean autoCreateAddress) throws Exception {
       AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? queueName.toString() : address.toString());
-      return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress);
+      return createQueue(address, routingType, queueName, filter, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive,  as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreateAddress);
    }
 
 
@@ -1805,7 +1809,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                                  boolean exclusive,
                                  boolean lastValue) throws Exception {
       AddressSettings as = getAddressSettingsRepository().getMatch(address == null ? name.toString() : address.toString());
-      createSharedQueue(address, routingType, name, filterString, user, durable, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch());
+      createSharedQueue(address, routingType, name, filterString, user, durable, maxConsumers, purgeOnNoConsumers, exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch());
    }
 
    @Override
@@ -1818,6 +1822,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                                  int maxConsumers,
                                  boolean purgeOnNoConsumers,
                                  boolean exclusive,
+                                 boolean groupRebalance,
+                                 int groupBuckets,
                                  boolean lastValue,
                                  SimpleString lastValueKey,
                                  boolean nonDestructive,
@@ -1836,7 +1842,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
          }
       }
 
-      final Queue queue = createQueue(address, routingType, name, filterString, user, durable, !durable, true, !durable, false, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, true);
+      final Queue queue = createQueue(address, routingType, name, filterString, user, durable, !durable, true, !durable, false, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, true);
 
       if (!queue.getAddress().equals(address)) {
          throw ActiveMQMessageBundle.BUNDLE.queueSubscriptionBelongsToDifferentAddress(name);
@@ -2837,6 +2843,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
             // determine if there is an address::queue match; update it if so
             int maxConsumers = config.getMaxConsumers() == null ? as.getDefaultMaxConsumers() : config.getMaxConsumers();
             boolean isExclusive = config.isExclusive() == null ? as.isDefaultExclusiveQueue() : config.isExclusive();
+            boolean groupRebalance = config.isGroupRebalance() == null ? as.isDefaultGroupRebalance() : config.isGroupRebalance();
+            int groupBuckets = config.getGroupBuckets() == null ? as.getDefaultGroupBuckets() : config.getGroupBuckets();
             boolean isLastValue = config.isLastValue() == null ? as.isDefaultLastValueQueue() : config.isLastValue();
             SimpleString lastValueKey = config.getLastValueKey() == null ? as.getDefaultLastValueKey() : SimpleString.toSimpleString(config.getLastValueKey());
             boolean isNonDestructive = config.isNonDestructive() == null ? as.isDefaultNonDestructive() : config.isNonDestructive();
@@ -2844,11 +2852,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
             long delayBeforeDispatch = config.getDelayBeforeDispatch() == null ? as.getDefaultDelayBeforeDispatch() : config.getDelayBeforeDispatch();
 
             if (locateQueue(queueName) != null && locateQueue(queueName).getAddress().toString().equals(config.getAddress())) {
-               updateQueue(config.getName(), config.getRoutingType(), config.getFilterString(), maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, isNonDestructive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser(), true);
+               updateQueue(config.getName(), config.getRoutingType(), config.getFilterString(), maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, groupRebalance, groupBuckets, isNonDestructive, consumersBeforeDispatch, delayBeforeDispatch, config.getUser(), true);
             } else {
                // if the address::queue doesn't exist then create it
                try {
-                  createQueue(new AddressInfo(SimpleString.toSimpleString(config.getAddress())).addRoutingType(config.getRoutingType()), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, isLastValue, lastValueKey, isNonDestructive, consumersBeforeDispatch, delayBeforeDispatch, true, true);
+                  createQueue(new AddressInfo(SimpleString.toSimpleString(config.getAddress())).addRoutingType(config.getRoutingType()), queueName, SimpleString.toSimpleString(config.getFilterString()), SimpleString.toSimpleString(config.getUser()), config.isDurable(), false, false, false, false, maxConsumers, config.getPurgeOnNoConsumers(), isExclusive, groupRebalance, groupBuckets, isLastValue, lastValueKey, isNonDestructive, consumersBeforeDispatch, delayBeforeDispatch, true, true);
                } catch (ActiveMQQueueExistsException e) {
                   // the queue may exist on a *different* address
                   ActiveMQServerLogger.LOGGER.warn(e.getMessage());
@@ -3039,6 +3047,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             final int maxConsumers,
                             final boolean purgeOnNoConsumers,
                             final boolean exclusive,
+                            final boolean groupRebalance,
+                            final int groupBuckets,
                             final boolean lastValue,
                             final SimpleString lastValueKey,
                             final boolean nonDestructive,
@@ -3104,6 +3114,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
               .maxConsumers(maxConsumers)
               .purgeOnNoConsumers(purgeOnNoConsumers)
               .exclusive(exclusive)
+              .groupRebalance(groupRebalance)
+              .groupBuckets(groupBuckets)
               .lastValue(lastValue)
               .lastValueKey(lastValueKey)
               .nonDestructive(nonDestructive)
@@ -3181,13 +3193,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             final int maxConsumers,
                             final boolean purgeOnNoConsumers,
                             final boolean exclusive,
+                            final boolean groupRebalance,
+                            final int groupBuckets,
                             final boolean lastValue,
                             final SimpleString lastValueKey,
                             final boolean nonDestructive,
                             final int consumersBeforeDispatch,
                             final long delayBeforeDispatch,
                             final boolean autoCreateAddress) throws Exception {
-      return createQueue(new AddressInfo(address).addRoutingType(routingType), queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress, false);
+      return createQueue(new AddressInfo(address).addRoutingType(routingType), queueName, filterString, user, durable, temporary, ignoreIfExists, transientQueue, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress, false);
    }
 
    @Deprecated
@@ -3217,7 +3231,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             Boolean purgeOnNoConsumers,
                             Boolean exclusive,
                             String user) throws Exception {
-      return updateQueue(name, routingType, null, maxConsumers, purgeOnNoConsumers, exclusive, null, null, null, user);
+      return updateQueue(name, routingType, null, maxConsumers, purgeOnNoConsumers, exclusive, null, null, null, null, null, user);
    }
 
    @Override
@@ -3227,11 +3241,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             Integer maxConsumers,
                             Boolean purgeOnNoConsumers,
                             Boolean exclusive,
+                            Boolean groupRebalance,
+                            Integer groupBuckets,
                             Boolean nonDestructive,
                             Integer consumersBeforeDispatch,
                             Long delayBeforeDispatch,
                             String user) throws Exception {
-      return updateQueue(name, routingType, filterString, maxConsumers, purgeOnNoConsumers, exclusive, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, null);
+      return updateQueue(name, routingType, filterString, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user, null);
    }
 
    private Queue updateQueue(String name,
@@ -3240,13 +3256,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
                             Integer maxConsumers,
                             Boolean purgeOnNoConsumers,
                             Boolean exclusive,
+                            Boolean groupRebalance,
+                            Integer groupBuckets,
                             Boolean nonDestructive,
                             Integer consumersBeforeDispatch,
                             Long delayBeforeDispatch,
                             String user,
                             Boolean configurationManaged) throws Exception {
       final Filter filter = FilterImpl.createFilter(filterString);
-      final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user), configurationManaged);
+      final QueueBinding queueBinding = this.postOffice.updateQueue(new SimpleString(name), routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, SimpleString.toSimpleString(user), configurationManaged);
       if (queueBinding != null) {
          final Queue queue = queueBinding.getQueue();
          return queue;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BucketMessageGroups.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BucketMessageGroups.java
new file mode 100644
index 0000000..066bbf0
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/BucketMessageGroups.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Predicate;
+import org.apache.activemq.artemis.api.core.SimpleString;
+
+/**
+ * BucketMessageGroups, stores values against a bucket, where the bucket used is based on the provided key objects hash.
+ *
+ * As such where keys compute to the same bucket they will act on that stored value, not the unique specific key.
+ *
+ * The number of buckets is provided at construction.
+ */
+public class BucketMessageGroups<C> implements MessageGroups<C> {
+
+   //This _AMQ_GROUP_BUCKET_INT_KEY uses the post-fixed value after this key, as an int, it is used for a few cases:
+   //1) For the admin screen we need to show a group key so we have to map back from int to something, as it expects SimpleString.
+   //2) Admin users still need to interact with a specific bucket/group e.g. they may need to reset a bucket.
+   //3) Choice of key is we want to avoid risk of clashing with users groups keys.
+   //4) Actually makes testing a little easier as we know how the parsed int will hash.
+   private static SimpleString _AMQ_GROUP_BUCKET_INT_KEY = new SimpleString("_AMQ_GROUP_BUCKET_INT_KEY_");
+
+   private final int bucketCount;
+   private C[] buckets;
+   private int size = 0;
+
+   public BucketMessageGroups(int bucketCount) {
+      if (bucketCount < 1) {
+         throw new IllegalArgumentException("Bucket count must be greater than 0");
+      }
+      this.bucketCount = bucketCount;
+   }
+
+   private int getBucket(SimpleString key) {
+      Object bucketKey = key;
+      if (key.startsWith(_AMQ_GROUP_BUCKET_INT_KEY)) {
+         bucketKey = retrieveBucketIntFromKey(key);
+      }
+      return getHashBucket(bucketKey, bucketCount);
+   }
+
+   private static int getHashBucket(final Object key, final int bucketCount) {
+      return (key.hashCode() & Integer.MAX_VALUE) % bucketCount;
+   }
+
+   private static Object retrieveBucketIntFromKey(SimpleString key) {
+      SimpleString bucket = key.subSeq(_AMQ_GROUP_BUCKET_INT_KEY.length(), key.length());
+      try {
+         return Integer.parseInt(bucket.toString());
+      } catch (NumberFormatException nfe) {
+         return key;
+      }
+   }
+
+   @Override
+   public void put(SimpleString key, C consumer) {
+      if (buckets == null) {
+         buckets = newBucketArray(bucketCount);
+      }
+      if (buckets[getBucket(key)] == null) {
+         size++;
+      }
+      buckets[getBucket(key)] = consumer;
+   }
+
+   @SuppressWarnings({ "unchecked", "SuspiciousArrayCast" })
+   private static <C> C[] newBucketArray(int capacity) {
+      return (C[]) new Object[capacity];
+   }
+
+   @Override
+   public C get(SimpleString key) {
+      if (buckets == null) {
+         return null;
+      }
+      return buckets[getBucket(key)];
+   }
+
+   @Override
+   public C remove(SimpleString key) {
+      if (buckets == null) {
+         return null;
+      }
+      return remove(getBucket(key));
+   }
+
+   private C remove(int bucket) {
+      C existing = buckets[bucket];
+      if (existing != null) {
+         size--;
+         buckets[bucket] = null;
+      }
+      return existing;
+   }
+
+   @Override
+   public boolean removeIf(Predicate<? super C> filter) {
+      if (buckets != null && size > 0) {
+         boolean removed = false;
+         for (int bucket = 0; bucket < buckets.length; bucket++) {
+            if (filter.test(buckets[bucket])) {
+               remove(bucket);
+               removed = true;
+            }
+         }
+         return removed;
+      } else {
+         return false;
+      }
+   }
+
+   @Override
+   public void removeAll() {
+      if (buckets != null && size > 0) {
+         Arrays.fill(buckets, null);
+      }
+      size = 0;
+   }
+
+   @Override
+   public int size() {
+      return size;
+   }
+
+   @Override
+   public Map<SimpleString, C> toMap() {
+      if (buckets != null && size > 0) {
+         Map<SimpleString, C> map = new HashMap<>(size);
+         for (int bucket = 0; bucket < buckets.length; bucket++) {
+            C value = buckets[bucket];
+            if (value != null) {
+               map.put(toGroupBucketIntKey(bucket), value);
+            }
+         }
+         return map;
+      } else {
+         return Collections.emptyMap();
+      }
+   }
+
+   static SimpleString toGroupBucketIntKey(int i) {
+      return _AMQ_GROUP_BUCKET_INT_KEY.concat(Integer.toString(i));
+   }
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DisabledMessageGroups.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DisabledMessageGroups.java
new file mode 100644
index 0000000..06b098c
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/DisabledMessageGroups.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import org.apache.activemq.artemis.utils.collections.NoOpMap;
+
+/**
+ * Implementation of MessageGroups that simply uses a NoOpMap, and in essence disables message grouping for queues that use it.
+ *
+ * @param <C> the value type.
+ */
+public class DisabledMessageGroups<C> extends MapMessageGroups<C> {
+
+   private static final DisabledMessageGroups INSTANCE = new DisabledMessageGroups();
+
+   @SuppressWarnings("unchecked")
+   public static <C> DisabledMessageGroups<C> instance() {
+      return (DisabledMessageGroups<C>) INSTANCE;
+   }
+
+   private DisabledMessageGroups() {
+      super(NoOpMap.instance());
+   }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
index 28e3ee3..ee9e0e5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/LastValueQueue.java
@@ -68,6 +68,8 @@ public class LastValueQueue extends QueueImpl {
                          final RoutingType routingType,
                          final Integer maxConsumers,
                          final Boolean exclusive,
+                         final Boolean groupRebalance,
+                         final Integer groupBuckets,
                          final Integer consumersBeforeDispatch,
                          final Long delayBeforeDispatch,
                          final Boolean purgeOnNoConsumers,
@@ -81,7 +83,7 @@ public class LastValueQueue extends QueueImpl {
                          final ArtemisExecutor executor,
                          final ActiveMQServer server,
                          final QueueFactory factory) {
-      super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, configurationManaged, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
+      super(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, groupRebalance, groupBuckets, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, purgeOnNoConsumers, configurationManaged, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
       this.lastValueKey = lastValueKey;
    }
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MapMessageGroups.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MapMessageGroups.java
new file mode 100644
index 0000000..6269abf
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MapMessageGroups.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Predicate;
+import org.apache.activemq.artemis.api.core.SimpleString;
+
+/**
+ * This is abstract implementation of MessageGroups that simply wraps the MessageGroup interface around the passed in map.
+ */
+abstract class MapMessageGroups<C> implements MessageGroups<C> {
+
+   private final Map<SimpleString, C> groups;
+
+   protected MapMessageGroups(Map<SimpleString, C> groups) {
+      this.groups = groups;
+   }
+
+   @Override
+   public void put(SimpleString key, C consumer) {
+      groups.put(key, consumer);
+   }
+
+   @Override
+   public C get(SimpleString key) {
+      return groups.get(key);
+   }
+
+   @Override
+   public C remove(SimpleString key) {
+      return groups.remove(key);
+   }
+
+   @Override
+   public boolean removeIf(Predicate<? super C> filter) {
+      return groups.values().removeIf(filter);
+   }
+
+   @Override
+   public void removeAll() {
+      groups.clear();
+   }
+
+   @Override
+   public int size() {
+      return groups.size();
+   }
+
+   @Override
+   public Map<SimpleString, C> toMap() {
+      return new HashMap<>(groups);
+   }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageGroups.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageGroups.java
new file mode 100644
index 0000000..6cddcdc
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/MessageGroups.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.Map;
+import java.util.function.Predicate;
+import org.apache.activemq.artemis.api.core.SimpleString;
+
+public interface MessageGroups<C> {
+
+   void put(SimpleString key, C consumer);
+
+   C get(SimpleString key);
+
+   C remove(SimpleString key);
+
+   boolean removeIf(Predicate<? super C> filter);
+
+   void removeAll();
+
+   int size();
+
+   Map<SimpleString, C> toMap();
+
+
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
index 5dba891..606256c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/PostOfficeJournalLoader.java
@@ -152,6 +152,8 @@ public class PostOfficeJournalLoader implements JournalLoader {
             .purgeOnNoConsumers(queueBindingInfo.isPurgeOnNoConsumers())
             .maxConsumers(queueBindingInfo.getMaxConsumers())
             .exclusive(queueBindingInfo.isExclusive())
+            .groupRebalance(queueBindingInfo.isGroupRebalance())
+            .groupBuckets(queueBindingInfo.getGroupBuckets())
             .lastValue(queueBindingInfo.isLastValue())
             .lastValueKey(queueBindingInfo.getLastValueKey())
             .nonDestructive(queueBindingInfo.isNonDestructive())
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
index c788081..cba671a 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueFactoryImpl.java
@@ -75,9 +75,9 @@ public class QueueFactoryImpl implements QueueFactory {
    public Queue createQueueWith(final QueueConfig config) {
       final Queue queue;
       if (lastValueKey(config) != null) {
-         queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), lastValueKey(config), config.isNonDestructive(), config.isConfigurationManaged(), scheduledExecutor, postOffice, storageMa [...]
+         queue = new LastValueQueue(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isGroupRebalance(), config.getGroupBuckets(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), lastValueKey(config), config.isNonDestructive(), config.isConfigurati [...]
       } else {
-         queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isNonDestructive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), config.isConfigurationManaged(), scheduledExecutor, postOffice, storageManager, addressSettingsRepos [...]
+         queue = new QueueImpl(config.id(), config.address(), config.name(), config.filter(), config.pageSubscription(), config.user(), config.isDurable(), config.isTemporary(), config.isAutoCreated(), config.deliveryMode(), config.maxConsumers(), config.isExclusive(), config.isGroupRebalance(), config.getGroupBuckets(), config.isNonDestructive(), config.consumersBeforeDispatch(), config.delayBeforeDispatch(), config.isPurgeOnNoConsumers(), config.isConfigurationManaged(), scheduledExecu [...]
       }
       server.getCriticalAnalyzer().add(queue);
       return queue;
@@ -102,7 +102,7 @@ public class QueueFactoryImpl implements QueueFactory {
 
       Queue queue;
       if (lastValueKey(addressSettings) != null) {
-         queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch(), ActiveMQDefaultConfiguration.getDefaultPurgeOnNoConsumers(), lastVa [...]
+         queue = new LastValueQueue(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, ActiveMQDefaultConfiguration.getDefaultRoutingType(), ActiveMQDefaultConfiguration.getDefaultMaxQueueConsumers(), ActiveMQDefaultConfiguration.getDefaultExclusive(), ActiveMQDefaultConfiguration.getDefaultGroupRebalance(), ActiveMQDefaultConfiguration.getDefaultGroupBuckets(), ActiveMQDefaultConfiguration.getDefaultConsumersBeforeDispatch(), ActiveMQDefaultCo [...]
       } else {
          queue = new QueueImpl(persistenceID, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executorFactory.getExecutor(), server, this);
       }
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 9d1988f..31e166e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -20,7 +20,6 @@ import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -31,7 +30,6 @@ import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -233,7 +231,11 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    private final QueueConsumers<ConsumerHolder<? extends Consumer>> consumers = new QueueConsumersImpl<>();
 
-   private final Map<SimpleString, Consumer> groups = new HashMap<>();
+   private volatile boolean groupRebalance;
+
+   private volatile int groupBuckets;
+
+   private MessageGroups<Consumer> groups;
 
    private volatile Consumer exclusiveConsumer;
 
@@ -293,29 +295,6 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
     */
    private final Object directDeliveryGuard = new Object();
 
-   /**
-    * For testing only
-    */
-   public List<SimpleString> getGroupsUsed() {
-      final CountDownLatch flush = new CountDownLatch(1);
-      executor.execute(new Runnable() {
-         @Override
-         public void run() {
-            flush.countDown();
-         }
-      });
-      try {
-         flush.await(10, TimeUnit.SECONDS);
-      } catch (Exception ignored) {
-      }
-
-      synchronized (this) {
-         ArrayList<SimpleString> groupsUsed = new ArrayList<>();
-         groupsUsed.addAll(groups.keySet());
-         return groupsUsed;
-      }
-   }
-
    public String debug() {
       StringWriter str = new StringWriter();
       PrintWriter out = new PrintWriter(str);
@@ -432,7 +411,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                      final ArtemisExecutor executor,
                      final ActiveMQServer server,
                      final QueueFactory factory) {
-      this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, false, null, null, purgeOnNoConsumers, false, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
+      this(id, address, name, filter, pageSubscription, user, durable, temporary, autoCreated, routingType, maxConsumers, exclusive, null, null, false, null, null, purgeOnNoConsumers, false, scheduledExecutor, postOffice, storageManager, addressSettingsRepository, executor, server, factory);
    }
 
    public QueueImpl(final long id,
@@ -447,6 +426,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                      final RoutingType routingType,
                      final Integer maxConsumers,
                      final Boolean exclusive,
+                     final Boolean groupRebalance,
+                     final Integer groupBuckets,
                      final Boolean nonDestructive,
                      final Integer consumersBeforeDispatch,
                      final Long delayBeforeDispatch,
@@ -493,6 +474,12 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
       this.delayBeforeDispatch = delayBeforeDispatch == null ? ActiveMQDefaultConfiguration.getDefaultDelayBeforeDispatch() : delayBeforeDispatch;
 
+      this.groupRebalance = groupRebalance == null ? ActiveMQDefaultConfiguration.getDefaultGroupRebalance() : groupRebalance;
+
+      this.groupBuckets = groupBuckets == null ? ActiveMQDefaultConfiguration.getDefaultGroupBuckets() : groupBuckets;
+
+      this.groups = groupMap(this.groupBuckets);
+
       this.configurationManaged = configurationManaged;
 
       this.postOffice = postOffice;
@@ -700,6 +687,29 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public int getGroupBuckets() {
+      return groupBuckets;
+   }
+
+   @Override
+   public synchronized void setGroupBuckets(int groupBuckets) {
+      if (this.groupBuckets != groupBuckets) {
+         this.groups = groupMap(groupBuckets);
+         this.groupBuckets = groupBuckets;
+      }
+   }
+
+   @Override
+   public boolean isGroupRebalance() {
+      return groupRebalance;
+   }
+
+   @Override
+   public synchronized void setGroupRebalance(boolean groupRebalance) {
+      this.groupRebalance = groupRebalance;
+   }
+
+   @Override
    public boolean isConfigurationManaged() {
       return configurationManaged;
    }
@@ -1052,6 +1062,10 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                }
             }
 
+            if (groupRebalance) {
+               groups.removeAll();
+            }
+
             if (refCountForConsumers != null) {
                refCountForConsumers.increment();
             }
@@ -1096,25 +1110,8 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
                exclusiveConsumer = null;
             }
 
-            LinkedList<SimpleString> groupsToRemove = null;
+            groups.removeIf(consumer::equals);
 
-            for (SimpleString groupID : groups.keySet()) {
-               if (consumer == groups.get(groupID)) {
-                  if (groupsToRemove == null) {
-                     groupsToRemove = new LinkedList<>();
-                  }
-                  groupsToRemove.add(groupID);
-               }
-            }
-
-            // We use an auxiliary List here to avoid concurrent modification exceptions on the keySet
-            // while the iteration is being done.
-            // Since that's a simple HashMap there's no Iterator's support with a remove operation
-            if (groupsToRemove != null) {
-               for (SimpleString groupID : groupsToRemove) {
-                  groups.remove(groupID);
-               }
-            }
 
             if (refCountForConsumers != null) {
                refCountForConsumers.decrement();
@@ -1207,7 +1204,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public synchronized Map<SimpleString, Consumer> getGroups() {
-      return new HashMap<>(groups);
+      return groups.toMap();
    }
 
    @Override
@@ -1217,7 +1214,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    @Override
    public synchronized void resetAllGroups() {
-      groups.clear();
+      groups.removeAll();
    }
 
    @Override
@@ -2601,7 +2598,7 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
    }
 
    private SimpleString extractGroupID(MessageReference ref) {
-      if (internalQueue) {
+      if (internalQueue || exclusive || groupBuckets == 0) {
          return null;
       } else {
          try {
@@ -3347,6 +3344,16 @@ public class QueueImpl extends CriticalComponentImpl implements Queue {
 
    }
 
+   public static MessageGroups<Consumer> groupMap(int groupBuckets) {
+      if (groupBuckets == -1) {
+         return new SimpleMessageGroups<>();
+      } else if (groupBuckets == 0) {
+         return DisabledMessageGroups.instance();
+      } else {
+         return new BucketMessageGroups<>(groupBuckets);
+      }
+   }
+
    // Inner classes
    // --------------------------------------------------------------------------
 
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 56c60f5..fd26db4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -591,7 +591,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    @Override
    public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable) throws Exception {
       AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString());
-      return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), false);
+      return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), false);
    }
 
    public Queue createQueue(final AddressInfo addressInfo,
@@ -602,6 +602,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             final int maxConsumers,
                             final boolean purgeOnNoConsumers,
                             final boolean exclusive,
+                            final boolean groupRebalance,
+                            final int groupBuckets,
                             final boolean lastValue,
                             SimpleString lastValueKey,
                             final boolean nonDestructive,
@@ -627,7 +629,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
       server.checkQueueCreationLimit(getUsername());
 
-      Queue queue = server.createQueue(art, unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, as.isAutoCreateAddresses());
+      Queue queue = server.createQueue(art, unPrefixedName, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, autoCreated, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, as.isAutoCreateAddresses());
 
       if (temporary) {
          // Temporary queue in core simply means the queue will be deleted if
@@ -667,7 +669,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             final boolean purgeOnNoConsumers,
                             final boolean autoCreated) throws Exception {
       AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
-      return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreated);
+      return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreated);
    }
 
    @Override
@@ -682,7 +684,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             final Boolean exclusive,
                             final Boolean lastValue,
                             final boolean autoCreated) throws Exception {
-      return createQueue(address, name, routingType, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, null, null, null, null, autoCreated);
+      return createQueue(address, name, routingType, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers, exclusive, null, null, lastValue, null, null, null, null, autoCreated);
    }
 
    @Override
@@ -695,16 +697,20 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             final int maxConsumers,
                             final boolean purgeOnNoConsumers,
                             final Boolean exclusive,
+                            final Boolean groupRebalance,
+                            final Integer groupBuckets,
                             final Boolean lastValue,
                             final SimpleString lastValueKey,
                             final Boolean nonDestructive,
                             final Integer consumersBeforeDispatch,
                             final Long delayBeforeDispatch,
                             final boolean autoCreated) throws Exception {
-      if (exclusive == null || lastValue == null || lastValueKey == null || nonDestructive == null || consumersBeforeDispatch == null || delayBeforeDispatch == null) {
+      if (exclusive == null || groupRebalance == null || groupBuckets == null || lastValue == null || lastValueKey == null || nonDestructive == null || consumersBeforeDispatch == null || delayBeforeDispatch == null) {
          AddressSettings as = server.getAddressSettingsRepository().getMatch(address.toString());
          return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers,
                  exclusive == null ? as.isDefaultExclusiveQueue() : exclusive,
+                 groupRebalance == null ? as.isDefaultGroupRebalance() : groupRebalance,
+                 groupBuckets == null ? as.getDefaultGroupBuckets() : groupBuckets,
                  lastValue == null ? as.isDefaultLastValueQueue() : lastValue,
                  lastValueKey == null ? as.getDefaultLastValueKey() : lastValueKey,
                  nonDestructive == null ? as.isDefaultNonDestructive() : nonDestructive,
@@ -713,7 +719,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                  autoCreated);
       } else {
          return createQueue(new AddressInfo(address, routingType), name, filterString, temporary, durable, maxConsumers, purgeOnNoConsumers,
-                 exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreated);
+                 exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreated);
       }
    }
 
@@ -732,14 +738,14 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
    @Override
    public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable, boolean autoCreated) throws Exception {
       AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString());
-      return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreated);
+      return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(), as.isDefaultExclusiveQueue(), as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), as.isDefaultLastValueQueue(), as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreated);
    }
 
    @Override
    public Queue createQueue(AddressInfo addressInfo, SimpleString name, SimpleString filterString, boolean temporary, boolean durable, Boolean exclusive, Boolean lastValue, boolean autoCreated) throws Exception {
       AddressSettings as = server.getAddressSettingsRepository().getMatch(addressInfo.getName().toString());
       return createQueue(addressInfo, name, filterString, temporary, durable, as.getDefaultMaxConsumers(), as.isDefaultPurgeOnNoConsumers(),
-                         exclusive == null ? as.isDefaultExclusiveQueue() : exclusive, lastValue == null ? as.isDefaultLastValueQueue() : lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreated);
+                         exclusive == null ? as.isDefaultExclusiveQueue() : exclusive, as.isDefaultGroupRebalance(), as.getDefaultGroupBuckets(), lastValue == null ? as.isDefaultLastValueQueue() : lastValue, as.getDefaultLastValueKey(), as.isDefaultNonDestructive(), as.getDefaultConsumersBeforeDispatch(), as.getDefaultDelayBeforeDispatch(), autoCreated);
    }
 
    @Override
@@ -778,7 +784,7 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                                  Boolean purgeOnNoConsumers,
                                  Boolean exclusive,
                                  Boolean lastValue) throws Exception {
-      createSharedQueue(address, name, routingType, filterString, durable, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, null, null, null, null);
+      createSharedQueue(address, name, routingType, filterString, durable, maxConsumers, purgeOnNoConsumers, exclusive, null, null, lastValue, null, null, null, null);
    }
 
    @Override
@@ -790,6 +796,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                                  Integer maxConsumers,
                                  Boolean purgeOnNoConsumers,
                                  Boolean exclusive,
+                                 Boolean groupRebalance,
+                                 Integer groupBuckets,
                                  Boolean lastValue,
                                  SimpleString lastValueKey,
                                  Boolean nonDestructive,
@@ -807,6 +815,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                                maxConsumers == null ? as.getDefaultMaxConsumers() : maxConsumers,
                                purgeOnNoConsumers == null ? as.isDefaultPurgeOnNoConsumers() : purgeOnNoConsumers,
                                exclusive == null ? as.isDefaultExclusiveQueue() : exclusive,
+                               groupRebalance == null ? as.isDefaultGroupRebalance() : groupRebalance,
+                               groupBuckets == null ? as.getDefaultGroupBuckets() : groupBuckets,
                                lastValue == null ? as.isDefaultLastValueQueue() : lastValue,
                                lastValueKey == null ? as.getDefaultLastValueKey() : lastValueKey,
                                nonDestructive == null ? as.isDefaultNonDestructive() : nonDestructive,
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SimpleMessageGroups.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SimpleMessageGroups.java
new file mode 100644
index 0000000..a789709
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SimpleMessageGroups.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.HashMap;
+
+/**
+ * Implementation of MessageGroups that simply uses a HashMap, this is the existing and default behaviour of message groups in artemis.
+ *
+ * Effectively every Group Id is mapped raw, it also is unbounded.
+ *
+ * @param <C> the value type.
+ */
+public class SimpleMessageGroups<C> extends MapMessageGroups<C> {
+
+   public SimpleMessageGroups() {
+      super(new HashMap<>());
+   }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
index 6d59ae4..2cb6b93 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/settings/impl/AddressSettings.java
@@ -137,6 +137,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
    private Boolean defaultExclusiveQueue = null;
 
+   private Boolean defaultGroupRebalance = null;
+
+   private Integer defaultGroupBuckets = null;
+
    private Long redistributionDelay = null;
 
    private Boolean sendToDLAOnNoRoute = null;
@@ -242,6 +246,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       this.defaultQueueRoutingType = other.defaultQueueRoutingType;
       this.defaultAddressRoutingType = other.defaultAddressRoutingType;
       this.defaultConsumerWindowSize = other.defaultConsumerWindowSize;
+      this.defaultGroupRebalance = other.defaultGroupRebalance;
+      this.defaultGroupBuckets = other.defaultGroupBuckets;
    }
 
    public AddressSettings() {
@@ -651,6 +657,36 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
    }
 
    /**
+    * @return the defaultGroupBuckets
+    */
+   public boolean isDefaultGroupRebalance() {
+      return defaultGroupRebalance != null ? defaultGroupRebalance : ActiveMQDefaultConfiguration.getDefaultGroupRebalance();
+   }
+
+   /**
+    * @param defaultGroupRebalance the defaultGroupBuckets to set
+    */
+   public AddressSettings setDefaultGroupRebalance(boolean defaultGroupRebalance) {
+      this.defaultGroupRebalance = defaultGroupRebalance;
+      return this;
+   }
+
+   /**
+    * @return the defaultGroupBuckets
+    */
+   public int getDefaultGroupBuckets() {
+      return defaultGroupBuckets != null ? defaultGroupBuckets : ActiveMQDefaultConfiguration.getDefaultGroupBuckets();
+   }
+
+   /**
+    * @param defaultGroupBuckets the defaultGroupBuckets to set
+    */
+   public AddressSettings setDefaultGroupBuckets(int defaultGroupBuckets) {
+      this.defaultGroupBuckets = defaultGroupBuckets;
+      return this;
+   }
+
+   /**
     * merge 2 objects in to 1
     *
     * @param merged
@@ -789,6 +825,12 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (defaultDelayBeforeDispatch == null) {
          defaultDelayBeforeDispatch = merged.defaultDelayBeforeDispatch;
       }
+      if (defaultGroupRebalance == null) {
+         defaultGroupRebalance = merged.defaultGroupRebalance;
+      }
+      if (defaultGroupBuckets == null) {
+         defaultGroupBuckets = merged.defaultGroupBuckets;
+      }
    }
 
    @Override
@@ -928,6 +970,14 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       if (buffer.readableBytes() > 0) {
          autoDeleteAddressesDelay = BufferHelper.readNullableLong(buffer);
       }
+
+      if (buffer.readableBytes() > 0) {
+         defaultGroupRebalance = BufferHelper.readNullableBoolean(buffer);
+      }
+
+      if (buffer.readableBytes() > 0) {
+         defaultGroupBuckets = BufferHelper.readNullableInteger(buffer);
+      }
    }
 
    @Override
@@ -973,7 +1023,9 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          SimpleString.sizeofNullableString(defaultLastValueKey) +
          BufferHelper.sizeOfNullableBoolean(defaultNonDestructive) +
          BufferHelper.sizeOfNullableLong(autoDeleteQueuesDelay) +
-         BufferHelper.sizeOfNullableLong(autoDeleteAddressesDelay);
+         BufferHelper.sizeOfNullableLong(autoDeleteAddressesDelay) +
+         BufferHelper.sizeOfNullableBoolean(defaultGroupRebalance) +
+         BufferHelper.sizeOfNullableInteger(defaultGroupBuckets);
    }
 
    @Override
@@ -1064,6 +1116,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
 
       BufferHelper.writeNullableLong(buffer, autoDeleteAddressesDelay);
 
+      BufferHelper.writeNullableBoolean(buffer, defaultGroupRebalance);
+
+      BufferHelper.writeNullableInteger(buffer, defaultGroupBuckets);
+
    }
 
    /* (non-Javadoc)
@@ -1117,6 +1173,8 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
       result = prime * result + ((defaultConsumersBeforeDispatch == null) ? 0 : defaultConsumersBeforeDispatch.hashCode());
       result = prime * result + ((defaultDelayBeforeDispatch == null) ? 0 : defaultDelayBeforeDispatch.hashCode());
       result = prime * result + ((defaultConsumerWindowSize == null) ? 0 : defaultConsumerWindowSize.hashCode());
+      result = prime * result + ((defaultGroupRebalance == null) ? 0 : defaultGroupRebalance.hashCode());
+      result = prime * result + ((defaultGroupBuckets == null) ? 0 : defaultGroupBuckets.hashCode());
       return result;
    }
 
@@ -1360,6 +1418,18 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
             return false;
       } else if (!defaultConsumerWindowSize.equals(other.defaultConsumerWindowSize))
          return false;
+
+      if (defaultGroupRebalance == null) {
+         if (other.defaultGroupRebalance != null)
+            return false;
+      } else if (!defaultGroupRebalance.equals(other.defaultGroupRebalance))
+         return false;
+
+      if (defaultGroupBuckets == null) {
+         if (other.defaultGroupBuckets != null)
+            return false;
+      } else if (!defaultGroupBuckets.equals(other.defaultGroupBuckets))
+         return false;
       return true;
    }
 
@@ -1453,6 +1523,10 @@ public class AddressSettings implements Mergeable<AddressSettings>, Serializable
          defaultDelayBeforeDispatch +
          ", defaultClientWindowSize=" +
          defaultConsumerWindowSize +
+         ", defaultGroupRebalance=" +
+         defaultGroupRebalance +
+         ", defaultGroupBuckets=" +
+         defaultGroupBuckets +
          "]";
    }
 }
diff --git a/artemis-server/src/main/resources/schema/artemis-configuration.xsd b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
index 14f70be..ad46ed4 100644
--- a/artemis-server/src/main/resources/schema/artemis-configuration.xsd
+++ b/artemis-server/src/main/resources/schema/artemis-configuration.xsd
@@ -517,6 +517,8 @@
                         <xsd:attribute name="max-consumers" type="xsd:integer" use="optional"/>
                         <xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/>
                         <xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/>
+                        <xsd:attribute name="group-rebalance" type="xsd:boolean" use="optional"/>
+                        <xsd:attribute name="group-buckets" type="xsd:int" use="optional"/>
                         <xsd:attribute name="last-value" type="xsd:boolean" use="optional"/>
                         <xsd:attribute name="last-value-key" type="xsd:string" use="optional"/>
                         <xsd:attribute name="non-destructive" type="xsd:boolean" use="optional"/>
@@ -2852,6 +2854,22 @@
                </xsd:annotation>
             </xsd:element>
 
+            <xsd:element name="default-group-rebalance" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     whether to rebalance groups when a consumer is added
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
+            <xsd:element name="default-group-buckets" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     number of buckets to use for grouping, -1 (default) is unlimited and uses the raw group, 0 disables message groups.
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
             <xsd:element name="default-consumers-before-dispatch" type="xsd:int" default="0" maxOccurs="1" minOccurs="0">
                <xsd:annotation>
                   <xsd:documentation>
@@ -3210,6 +3228,8 @@
       <xsd:attribute name="max-consumers" type="xsd:integer" use="optional"/>
       <xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/>
       <xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/>
+      <xsd:attribute name="group-rebalance" type="xsd:boolean" use="optional"/>
+      <xsd:attribute name="group-buckets" type="xsd:int" use="optional"/>
       <xsd:attribute name="last-value" type="xsd:boolean" use="optional"/>
       <xsd:attribute name="last-value-key" type="xsd:string" use="optional"/>
       <xsd:attribute name="non-destructive" type="xsd:boolean" use="optional"/>
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/BucketMessageGroupsTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/BucketMessageGroupsTest.java
new file mode 100644
index 0000000..82fc56f
--- /dev/null
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/BucketMessageGroupsTest.java
@@ -0,0 +1,201 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.activemq.artemis.core.server.impl;
+
+import java.util.Collection;
+import static org.apache.activemq.artemis.core.server.impl.BucketMessageGroups.toGroupBucketIntKey;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import org.junit.Test;
+
+public class BucketMessageGroupsTest {
+
+   @Test
+   public void testEnsureBucketCountHonoured() {
+      //Test a range of bucket counts
+      for (int count = 1; count < 100; count++) {
+         MessageGroups<String> messageGroups = new BucketMessageGroups<>(count);
+         //Use a range of keys
+         for (int i = 0; i < 100; i++) {
+            messageGroups.put(toGroupBucketIntKey(i), "world" + i);
+         }
+         assertEquals(count, messageGroups.size());
+      }
+   }
+
+   @Test
+   public void testBucketCountNotGreaterThanZero() {
+      try {
+         MessageGroups<String> messageGroups = new BucketMessageGroups<>(0);
+         fail("IllegalArgumentException was expected as bucket count is NOT greater than 0");
+      } catch (IllegalArgumentException iae) {
+         //Pass we expect exception thrown if count is not greater than 0;
+      }
+
+      try {
+         MessageGroups<String> messageGroups = new BucketMessageGroups<>(-1);
+         fail("IllegalArgumentException was expected as bucket count is NOT greater than 0");
+      } catch (IllegalArgumentException iae) {
+         //Pass we expect exception thrown if count is not greater than 0;
+      }
+   }
+
+   @Test
+   public void testPut() {
+      MessageGroups<String> messageGroups = new BucketMessageGroups<>(2);
+      assertEquals(0, messageGroups.size());
+
+
+      messageGroups.put(toGroupBucketIntKey(1), "world");
+
+      assertEquals(1, messageGroups.size());
+
+
+      messageGroups.put(toGroupBucketIntKey(2), "world2");
+
+      assertEquals(2, messageGroups.size());
+
+      //This as we have 2 buckets, max size will be 2 always
+      messageGroups.put(toGroupBucketIntKey(3), "world3");
+      assertEquals(2, messageGroups.size());
+   }
+
+   @Test
+   public void testGet() {
+      MessageGroups<String> messageGroups = new BucketMessageGroups<>(2);
+
+      assertNull(messageGroups.get(toGroupBucketIntKey(1)));
+
+
+      messageGroups.put(toGroupBucketIntKey(1), "world");
+
+      assertEquals("world", messageGroups.get(toGroupBucketIntKey(1)));
+
+      messageGroups.put(toGroupBucketIntKey(2), "world2");
+
+      assertEquals("world2", messageGroups.get(toGroupBucketIntKey(2)));
+
+      //This as we have 2 buckets, and key 3 will has mod into the same bucket as key 1, overwriting its value.
+      messageGroups.put(toGroupBucketIntKey(3), "world3");
+      assertEquals("world3", messageGroups.get(toGroupBucketIntKey(3)));
+
+      //Ensure that on calling get for key 1, will return same value as key 3 now.
+      assertEquals("world3", messageGroups.get(toGroupBucketIntKey(1)));
+
+      //Ensure that negative hash's are made positive and bucket onto the expected bucket groups.
+      assertEquals("world3", messageGroups.get(toGroupBucketIntKey(-1)));
+      //Ensure that negative hash's are made positive and bucket onto the expected bucket groups.
+      assertEquals("world2", messageGroups.get(toGroupBucketIntKey(-2)));
+   }
+
+   @Test
+   public void testToMap() {
+      MessageGroups<String> messageGroups = new BucketMessageGroups<>(2);
+
+      messageGroups.put(toGroupBucketIntKey(1), "world");
+
+      assertEquals(1, messageGroups.toMap().size());
+
+      messageGroups.put(toGroupBucketIntKey(2), "world2");
+
+      assertEquals(2, messageGroups.toMap().size());
+
+      Collection<String> values = messageGroups.toMap().values();
+      assertTrue(values.contains("world"));
+      assertTrue(values.contains("world2"));
+
+      messageGroups.put(toGroupBucketIntKey(3), "world3");
+      messageGroups.put(toGroupBucketIntKey(4), "world4");
+
+      values = messageGroups.toMap().values();
+      assertFalse(values.contains("world"));
+      assertFalse(values.contains("world2"));
+      assertTrue(values.contains("world3"));
+      assertTrue(values.contains("world4"));
+   }
+
+   @Test
+   public void testRemove() {
+      MessageGroups<String> messageGroups = new BucketMessageGroups<>(2);
+      assertNull(messageGroups.remove(toGroupBucketIntKey(1)));
+
+      messageGroups.put(toGroupBucketIntKey(1), "world");
+
+      assertEquals("world", messageGroups.remove(toGroupBucketIntKey(1)));
+      assertNull(messageGroups.remove(toGroupBucketIntKey(1)));
+
+      messageGroups.put(toGroupBucketIntKey(1), "world1");
+      messageGroups.put(toGroupBucketIntKey(2), "world2");
+      messageGroups.put(toGroupBucketIntKey(3), "world3");
+      messageGroups.put(toGroupBucketIntKey(4), "world4");
+      messageGroups.put(toGroupBucketIntKey(5), "world5");
+
+      assertEquals(2, messageGroups.size());
+
+
+      assertEquals("world5", messageGroups.remove(toGroupBucketIntKey(3)));
+      assertNull(messageGroups.remove(toGroupBucketIntKey(5)));
+      assertEquals(1, messageGroups.size());
+
+      assertEquals("world4", messageGroups.remove(toGroupBucketIntKey(4)));
+      assertEquals(0, messageGroups.size());
+
+   }
+
+   @Test
+   public void testRemoveIf() {
+      MessageGroups<String> messageGroups = new BucketMessageGroups<>(10);
+
+      messageGroups.put(toGroupBucketIntKey(1), "world1");
+      messageGroups.put(toGroupBucketIntKey(2), "world2");
+      messageGroups.put(toGroupBucketIntKey(3), "world1");
+      messageGroups.put(toGroupBucketIntKey(4), "world2");
+      messageGroups.put(toGroupBucketIntKey(5), "world1");
+      messageGroups.put(toGroupBucketIntKey(6), "world2");
+      messageGroups.put(toGroupBucketIntKey(7), "world1");
+      messageGroups.put(toGroupBucketIntKey(8), "world2");
+      messageGroups.put(toGroupBucketIntKey(9), "world3");
+      messageGroups.put(toGroupBucketIntKey(10), "world4");
+
+      assertEquals(10, messageGroups.size());
+
+      messageGroups.removeIf("world1"::equals);
+
+      assertEquals(6, messageGroups.size());
+
+      messageGroups.removeIf("world4"::equals);
+
+      assertEquals(5, messageGroups.size());
+
+      messageGroups.removeIf("world3"::equals);
+
+      assertEquals(4, messageGroups.size());
+
+      messageGroups.removeIf("world2"::equals);
+
+      assertEquals(0, messageGroups.size());
+
+   }
+
+}
\ No newline at end of file
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
index 2e3b691..1dc377a 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/ScheduledDeliveryHandlerTest.java
@@ -839,6 +839,26 @@ public class ScheduledDeliveryHandlerTest extends Assert {
       }
 
       @Override
+      public int getGroupBuckets() {
+         return 0;
+      }
+
+      @Override
+      public void setGroupBuckets(int groupBuckets) {
+
+      }
+
+      @Override
+      public boolean isGroupRebalance() {
+         return false;
+      }
+
+      @Override
+      public void setGroupRebalance(boolean groupRebalance) {
+
+      }
+
+      @Override
       public boolean isConfigurationManaged() {
          return false;
       }
diff --git a/artemis-tools/src/test/resources/artemis-configuration.xsd b/artemis-tools/src/test/resources/artemis-configuration.xsd
index b8b4e85..420d2a1 100644
--- a/artemis-tools/src/test/resources/artemis-configuration.xsd
+++ b/artemis-tools/src/test/resources/artemis-configuration.xsd
@@ -490,6 +490,8 @@
                         <xsd:attribute name="max-consumers" type="xsd:integer" use="optional"/>
                         <xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/>
                         <xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/>
+                        <xsd:attribute name="group-rebalance" type="xsd:boolean" use="optional"/>
+                        <xsd:attribute name="group-buckets" type="xsd:int" use="optional"/>
                         <xsd:attribute name="last-value" type="xsd:boolean" use="optional"/>
                         <xsd:attribute name="last-value-key" type="xsd:string" use="optional"/>
                         <xsd:attribute name="non-destructive" type="xsd:boolean" use="optional"/>
@@ -2518,6 +2520,22 @@
                </xsd:annotation>
             </xsd:element>
 
+            <xsd:element name="default-group-rebalance" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     whether to rebalance groups when a consumer is added
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
+            <xsd:element name="default-group-buckets" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
+               <xsd:annotation>
+                  <xsd:documentation>
+                     number of buckets to use for grouping, -1 (default) is unlimited and uses the raw group, 0 disables message groups.
+                  </xsd:documentation>
+               </xsd:annotation>
+            </xsd:element>
+
             <xsd:element name="default-consumers-before-dispatch" type="xsd:int" default="0" maxOccurs="1" minOccurs="0">
                <xsd:annotation>
                   <xsd:documentation>
@@ -2804,6 +2822,8 @@
       <xsd:attribute name="max-consumers" type="xsd:integer" use="optional"/>
       <xsd:attribute name="purge-on-no-consumers" type="xsd:boolean" use="optional"/>
       <xsd:attribute name="exclusive" type="xsd:boolean" use="optional"/>
+      <xsd:attribute name="group-rebalance" type="xsd:boolean" use="optional"/>
+      <xsd:attribute name="group-buckets" type="xsd:int" use="optional"/>
       <xsd:attribute name="last-value" type="xsd:boolean" use="optional"/>
       <xsd:attribute name="last-value-key" type="xsd:string" use="optional"/>
       <xsd:attribute name="non-destructive" type="xsd:boolean" use="optional"/>
diff --git a/docs/user-manual/en/message-grouping.md b/docs/user-manual/en/message-grouping.md
index 1c4e7ef..fd82b5f 100644
--- a/docs/user-manual/en/message-grouping.md
+++ b/docs/user-manual/en/message-grouping.md
@@ -98,6 +98,52 @@ producer.send(message);
 
 This then closes the message group so if another message is sent in the future with the same message group ID it will be reassigned to a new consumer.
 
+#### Group Buckets
+
+For handling groups in a queue with bounded memory allowing better scaling of groups, 
+you can enable group buckets, essentially the group id is hashed into a bucket instead of keeping track of every single group id.
+
+Setting `group-buckets` to `-1` keeps default behaviour which means the queue keeps track of every group but suffers from unbounded memory use.
+
+Setting `group-buckets` to `0` disables grouping (0 buckets), on a queue. This can be useful on a multicast address, 
+where many queues exist but one queue you may not care for ordering and prefer to keep round robin behaviour.
+
+There is a number of ways to set `group-buckets`.
+
+
+```xml
+<address name="foo.bar">
+   <multicast>
+      <queue name="orders1" group-buckets="1024"/>
+   </multicast>
+</address>
+```
+
+Specified on creating a Queue by using the CORE api specifying the parameter 
+`group-buckets` to `20`. 
+
+Or on auto-create when using the JMS Client by using address parameters when 
+creating the destination used by the consumer.
+
+```java
+Queue queue = session.createQueue("my.destination.name?group-buckets=1024");
+Topic topic = session.createTopic("my.destination.name?group-buckets=1024");
+```
+
+Also the default for all queues under and address can be defaulted using the 
+`address-setting` configuration:
+
+```xml
+<address-setting match="my.bucket.address">
+   <default-group-buckets>1024</default-group-buckets>
+</address-setting>
+```
+
+By default, `default-group-buckets` is `-1` this is to keep compatibility with existing default behaviour. 
+
+Address [wildcards](wildcard-syntax.md) can be used to configure exclusive queues for a 
+set of addresses.
+
 ## Example
 
 See the [Message Group Example](examples.md#message-group) which shows how
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java
index f393faf..7e3e5ac 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/distribution/ClusteredGroupingTest.java
@@ -122,16 +122,16 @@ public class ClusteredGroupingTest extends ClusterTestBase {
 
       QueueImpl queue0Server2 = (QueueImpl) servers[2].locateQueue(SimpleString.toSimpleString("queue0"));
 
-      assertEquals(2, queue0Server2.getGroupsUsed().size());
+      assertEquals(2, queue0Server2.getGroupCount());
 
       assertTrue(latch.await(5, TimeUnit.SECONDS));
 
       long timeLimit = System.currentTimeMillis() + 5000;
-      while (timeLimit > System.currentTimeMillis() && queue0Server2.getGroupsUsed().size() != 0) {
+      while (timeLimit > System.currentTimeMillis() && queue0Server2.getGroupCount() != 0) {
          Thread.sleep(10);
       }
 
-      assertEquals("Unproposal should cleanup the queue group as well", 0, queue0Server2.getGroupsUsed().size());
+      assertEquals("Unproposal should cleanup the queue group as well", 0, queue0Server2.getGroupCount());
 
       removeConsumer(0);
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java
index 8f79344..c42eb29 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/ConsumerDelayDispatchTest.java
@@ -46,8 +46,8 @@ public class ConsumerDelayDispatchTest extends JMSTestBase {
    @Before
    public void setUp() throws Exception {
       super.setUp();
-      server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, -1, false, true, false, null, false, 2, DELAY_BEFORE_DISPATCH, true);
-      server.createQueue(normalQueueName, RoutingType.ANYCAST, normalQueueName, null, null, true, false, false, false, false, -1, false, true, false, null, false, 0, -1, true);
+      server.createQueue(queueName, RoutingType.ANYCAST, queueName, null, null, true, false, false, false, false, -1, false, true, false, -1, false, null, false, 2, DELAY_BEFORE_DISPATCH, true);
+      server.createQueue(normalQueueName, RoutingType.ANYCAST, normalQueueName, null, null, true, false, false, false, false, -1, false, true, false, -1, false, null, false, 0, -1, true);
 
    }
 
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
index 41c325d..f8fa90c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/jms/client/GroupingTest.java
@@ -31,8 +31,11 @@ import javax.jms.TextMessage;
 import java.util.UUID;
 
 import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.postoffice.QueueBinding;
 import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
 import org.apache.activemq.artemis.jms.client.ActiveMQMessage;
 import org.apache.activemq.artemis.jms.client.ActiveMQTextMessage;
 import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
@@ -304,4 +307,353 @@ public class GroupingTest extends JMSTestBase {
       return received;
    }
 
+   @Test
+   public void testGroupBuckets() throws Exception {
+      ConnectionFactory fact = getCF();
+      Assume.assumeFalse("only makes sense withOUT auto-group", ((ActiveMQConnectionFactory) fact).isAutoGroup());
+      Assume.assumeTrue("only makes sense withOUT explicit group-id", ((ActiveMQConnectionFactory) fact).getGroupID() == null);
+
+      String testQueueName = getName() + "_bucket_group";
+
+      server.createQueue(SimpleString.toSimpleString(testQueueName), RoutingType.ANYCAST, SimpleString.toSimpleString(testQueueName), null, null, true, false, false, false, false, -1, false, false, false, 2, false, null, false, 0, 0, true);
+
+
+      JMSContext ctx = addContext(getCF().createContext(JMSContext.SESSION_TRANSACTED));
+
+      Queue testQueue = ctx.createQueue(testQueueName);
+
+
+      final String groupID1 = "groupA";
+      final String groupID2 = "groupB";
+      final String groupID3 = "groupC";
+
+      //Ensure the groups bucket as we expect.
+      assertEquals((groupID1.hashCode() & Integer.MAX_VALUE) % 2, 0);
+      assertEquals((groupID2.hashCode() & Integer.MAX_VALUE) % 2, 1);
+      assertEquals((groupID3.hashCode() & Integer.MAX_VALUE) % 2, 0);
+
+      JMSProducer producer1 = ctx.createProducer().setProperty("JMSXGroupID", groupID1);
+      JMSProducer producer2 = ctx.createProducer().setProperty("JMSXGroupID", groupID2);
+      JMSProducer producer3 = ctx.createProducer().setProperty("JMSXGroupID", groupID3);
+
+      JMSConsumer consumer1 = ctx.createConsumer(testQueue);
+      JMSConsumer consumer2 = ctx.createConsumer(testQueue);
+      JMSConsumer consumer3 = ctx.createConsumer(testQueue);
+
+      ctx.start();
+
+      for (int j = 0; j < 10; j++) {
+         send(ctx, testQueue, groupID1, producer1, j);
+      }
+      for (int j = 10; j < 20; j++) {
+         send(ctx, testQueue, groupID2, producer2, j);
+      }
+      for (int j = 20; j < 30; j++) {
+         send(ctx, testQueue, groupID3, producer3, j);
+      }
+
+      ctx.commit();
+
+      //First two set of msgs should go to the first two consumers only (buckets is 2)
+      for (int j = 0; j < 10; j++) {
+         TextMessage tm = (TextMessage) consumer1.receive(10000);
+         assertNotNull(tm);
+         tm.acknowledge();
+         assertEquals("Message" + j, tm.getText());
+         assertEquals(tm.getStringProperty("JMSXGroupID"), groupID1);
+
+         tm = (TextMessage) consumer2.receive(10000);
+         assertNotNull(tm);
+         tm.acknowledge();
+         assertEquals("Message" + (j + 10), tm.getText());
+         assertEquals(tm.getStringProperty("JMSXGroupID"), groupID2);
+
+         assertNull(consumer3.receiveNoWait());
+      }
+
+      //Last set of msgs should go to the first consumer as bucketed queue with two bucket groups only
+      for (int j = 20; j < 30; j++) {
+         TextMessage tm = (TextMessage) consumer1.receive(10000);
+         assertNotNull(tm);
+         tm.acknowledge();
+         assertEquals("Message" + j, tm.getText());
+         assertEquals(tm.getStringProperty("JMSXGroupID"), groupID3);
+
+         assertNull(consumer2.receiveNoWait());
+         assertNull(consumer3.receiveNoWait());
+
+      }
+
+
+      ctx.commit();
+
+      ctx.close();
+   }
+
+   @Test
+   public void testGroupRebalance() throws Exception {
+      ConnectionFactory fact = getCF();
+      Assume.assumeFalse("only makes sense withOUT auto-group", ((ActiveMQConnectionFactory) fact).isAutoGroup());
+      Assume.assumeTrue("only makes sense withOUT explicit group-id", ((ActiveMQConnectionFactory) fact).getGroupID() == null);
+      String testQueueName = getName() + "_group_rebalance";
+
+      server.createQueue(SimpleString.toSimpleString(testQueueName), RoutingType.ANYCAST, SimpleString.toSimpleString(testQueueName), null, null, true, false, false, false, false, -1, false, false, true, -1, false, null, false, 0, 0, true);
+
+      JMSContext ctx = addContext(getCF().createContext(JMSContext.SESSION_TRANSACTED));
+
+      Queue testQueue = ctx.createQueue(testQueueName);
+
+
+      final String groupID1 = "groupA";
+      final String groupID2 = "groupB";
+      final String groupID3 = "groupC";
+
+
+      JMSProducer producer1 = ctx.createProducer().setProperty("JMSXGroupID", groupID1);
+      JMSProducer producer2 = ctx.createProducer().setProperty("JMSXGroupID", groupID2);
+      JMSProducer producer3 = ctx.createProducer().setProperty("JMSXGroupID", groupID3);
+
+      JMSConsumer consumer1 = ctx.createConsumer(testQueue);
+      JMSConsumer consumer2 = ctx.createConsumer(testQueue);
+
+      ctx.start();
+
+      for (int j = 0; j < 10; j++) {
+         send(ctx, testQueue, groupID1, producer1, j);
+      }
+      for (int j = 10; j < 20; j++) {
+         send(ctx, testQueue, groupID2, producer2, j);
+      }
+      for (int j = 20; j < 30; j++) {
+         send(ctx, testQueue, groupID3, producer3, j);
+      }
+
+      ctx.commit();
+
+      //First set of msgs should go to the first consumer only
+      for (int j = 0; j < 10; j++) {
+         TextMessage tm = (TextMessage) consumer1.receive(10000);
+         assertNotNull(tm);
+         tm.acknowledge();
+         assertEquals("Message" + j, tm.getText());
+         assertEquals(tm.getStringProperty("JMSXGroupID"), groupID1);
+      }
+
+      //Second set of msgs should go to the second consumers only
+      for (int j = 10; j < 20; j++) {
+         TextMessage tm = (TextMessage) consumer2.receive(10000);
+
+         assertNotNull(tm);
+
+         tm.acknowledge();
+
+         assertEquals("Message" + j, tm.getText());
+
+         assertEquals(tm.getStringProperty("JMSXGroupID"), groupID2);
+      }
+
+      //Third set of msgs should go to the first consumer only
+      for (int j = 20; j < 30; j++) {
+         TextMessage tm = (TextMessage) consumer1.receive(10000);
+
+         assertNotNull(tm);
+
+         tm.acknowledge();
+
+         assertEquals("Message" + j, tm.getText());
+
+         assertEquals(tm.getStringProperty("JMSXGroupID"), groupID3);
+      }
+      ctx.commit();
+
+      //Add new consumer, that should cause rebalance
+      JMSConsumer consumer3 = ctx.createConsumer(testQueue);
+
+      for (int j = 0; j < 10; j++) {
+         send(ctx, testQueue, groupID1, producer1, j);
+      }
+      for (int j = 10; j < 20; j++) {
+         send(ctx, testQueue, groupID2, producer2, j);
+      }
+      for (int j = 20; j < 30; j++) {
+         send(ctx, testQueue, groupID3, producer3, j);
+      }
+      ctx.commit();
+
+      //First set of msgs should go to the first consumer only
+      for (int j = 0; j < 10; j++) {
+         TextMessage tm = (TextMessage) consumer1.receive(10000);
+         assertNotNull(tm);
+         tm.acknowledge();
+         assertEquals("Message" + j, tm.getText());
+         assertEquals(tm.getStringProperty("JMSXGroupID"), groupID1);
+      }
+
+      //Second set of msgs should go to the second consumers only
+      for (int j = 10; j < 20; j++) {
+         TextMessage tm = (TextMessage) consumer2.receive(10000);
+
+         assertNotNull(tm);
+
+         tm.acknowledge();
+
+         assertEquals("Message" + j, tm.getText());
+
+         assertEquals(tm.getStringProperty("JMSXGroupID"), groupID2);
+      }
+
+      //Third set of msgs should now go to the third consumer now
+      for (int j = 20; j < 30; j++) {
+         TextMessage tm = (TextMessage) consumer3.receive(10000);
+
+         assertNotNull(tm);
+
+         tm.acknowledge();
+
+         assertEquals("Message" + j, tm.getText());
+
+         assertEquals(tm.getStringProperty("JMSXGroupID"), groupID3);
+      }
+
+      ctx.commit();
+
+      ctx.close();
+   }
+
+   @Test
+   public void testGroupDisable() throws Exception {
+      ConnectionFactory fact = getCF();
+      Assume.assumeFalse("only makes sense withOUT auto-group", ((ActiveMQConnectionFactory) fact).isAutoGroup());
+      Assume.assumeTrue("only makes sense withOUT explicit group-id", ((ActiveMQConnectionFactory) fact).getGroupID() == null);
+      String testQueueName = getName() + "_group_disable";
+
+      server.createQueue(SimpleString.toSimpleString(testQueueName), RoutingType.ANYCAST, SimpleString.toSimpleString(testQueueName), null, null, true, false, false, false, false, -1, false, false, false, 0, false, null, false, 0, 0, true);
+
+      JMSContext ctx = addContext(getCF().createContext(JMSContext.SESSION_TRANSACTED));
+
+      Queue testQueue = ctx.createQueue(testQueueName);
+
+
+      final String groupID1 = "groupA";
+      final String groupID2 = "groupB";
+      final String groupID3 = "groupC";
+
+
+      JMSProducer producer1 = ctx.createProducer().setProperty("JMSXGroupID", groupID1);
+      JMSProducer producer2 = ctx.createProducer().setProperty("JMSXGroupID", groupID2);
+      JMSProducer producer3 = ctx.createProducer().setProperty("JMSXGroupID", groupID3);
+
+      JMSConsumer consumer1 = ctx.createConsumer(testQueue);
+      JMSConsumer consumer2 = ctx.createConsumer(testQueue);
+      JMSConsumer consumer3 = ctx.createConsumer(testQueue);
+
+      ctx.start();
+
+      for (int j = 0; j < 10; j++) {
+         send(ctx, testQueue, groupID1, producer1, j);
+      }
+      for (int j = 10; j < 20; j++) {
+         send(ctx, testQueue, groupID2, producer2, j);
+      }
+      for (int j = 20; j < 30; j++) {
+         send(ctx, testQueue, groupID3, producer3, j);
+      }
+
+      ctx.commit();
+
+      //Msgs should just round robin and ignore grouping semantics
+      int j = 0;
+      while (j < 30) {
+         TextMessage tm = (TextMessage) consumer1.receive(10000);
+         assertNotNull(tm);
+         tm.acknowledge();
+         assertEquals("Message" + j, tm.getText());
+         assertEquals(tm.getStringProperty("JMSXGroupID"), j < 10 ? groupID1 : j < 20 ? groupID2 : groupID3);
+         j++;
+
+         tm = (TextMessage) consumer2.receive(10000);
+         assertNotNull(tm);
+         tm.acknowledge();
+         assertEquals("Message" + j, tm.getText());
+         assertEquals(tm.getStringProperty("JMSXGroupID"), j < 10 ? groupID1 : j < 20 ? groupID2 : groupID3);
+         j++;
+
+         tm = (TextMessage) consumer3.receive(10000);
+         assertNotNull(tm);
+         tm.acknowledge();
+         assertEquals("Message" + j, tm.getText());
+         assertEquals(tm.getStringProperty("JMSXGroupID"), j < 10 ? groupID1 : j < 20 ? groupID2 : groupID3);
+         j++;
+      }
+
+      ctx.commit();
+
+      ctx.close();
+   }
+
+
+   private void send(JMSContext ctx, Queue testQueue, String groupID1, JMSProducer producer1, int j) throws JMSException {
+      TextMessage message = ctx.createTextMessage("Message" + j);
+      producer1.send(testQueue, message);
+      String prop = message.getStringProperty("JMSXGroupID");
+      assertNotNull(prop);
+      assertEquals(groupID1, prop);
+   }
+
+   @Test
+   public void testGroupBucketUsingAddressQueueParameters() throws Exception {
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         String testQueueName = getName() + "group_bucket_param";
+
+         Queue queue = session.createQueue(testQueueName + "?group-buckets=4");
+         assertEquals(testQueueName, queue.getQueueName());
+
+
+         ActiveMQDestination a = (ActiveMQDestination) queue;
+         assertEquals(Integer.valueOf(4), a.getQueueAttributes().getGroupBuckets());
+
+         MessageProducer producer = session.createProducer(queue);
+
+
+         QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
+         assertEquals(4, queueBinding.getQueue().getGroupBuckets());
+      } finally {
+         connection.close();
+      }
+   }
+
+   @Test
+   public void testGroupRebalanceUsingAddressQueueParameters() throws Exception {
+      ConnectionFactory fact = getCF();
+      Connection connection = fact.createConnection();
+
+      try {
+
+         Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+
+         String testQueueName = getName() + "group_rebalance_param";
+
+         Queue queue = session.createQueue(testQueueName + "?group-rebalance=true");
+         assertEquals(testQueueName, queue.getQueueName());
+
+
+         ActiveMQDestination a = (ActiveMQDestination) queue;
+         assertTrue(a.getQueueAttributes().getGroupRebalance());
+
+         MessageProducer producer = session.createProducer(queue);
+
+
+         QueueBinding queueBinding = (QueueBinding) server.getPostOffice().getBinding(SimpleString.toSimpleString(testQueueName));
+         assertTrue(queueBinding.getQueue().isGroupRebalance());
+      } finally {
+         connection.close();
+      }
+   }
+
 }
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
index f0315e8..4c86f92 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/ActiveMQServerControlUsingCoreTest.java
@@ -162,11 +162,13 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
                                    @Parameter(name = "maxConsumers", desc = "The maximum number of consumers allowed on this queue at any one time") Integer maxConsumers,
                                    @Parameter(name = "purgeOnNoConsumers", desc = "Delete this queue when the last consumer disconnects") Boolean purgeOnNoConsumers,
                                    @Parameter(name = "exclusive", desc = "If the queue should route exclusively to one consumer") Boolean exclusive,
+                                   @Parameter(name = "groupRebalance", desc = "If the queue should rebalance groups when a consumer is added") Boolean groupRebalance,
+                                   @Parameter(name = "groupBuckets", desc = "Number of buckets that should be used for message groups, -1 (default) is unlimited, and groups by raw key instead") Integer groupBuckets,
                                    @Parameter(name = "nonDestructive", desc = "If the queue should be nonDestructive") Boolean nonDestructive,
                                    @Parameter(name = "consumersBeforeDispatch", desc = "Number of consumers needed before dispatch can start") Integer consumersBeforeDispatch,
                                    @Parameter(name = "delayBeforeDispatch", desc = "Delay to wait before dispatching if number of consumers before dispatch is not met") Long delayBeforeDispatch,
                                    @Parameter(name = "user", desc = "The user associated with this queue") String user) throws Exception {
-            return (String) proxy.invokeOperation("updateQueue", name, routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user);
+            return (String) proxy.invokeOperation("updateQueue", name, routingType, filter, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, user);
          }
 
          @Override
@@ -203,8 +205,8 @@ public class ActiveMQServerControlUsingCoreTest extends ActiveMQServerControlTes
          }
 
          @Override
-         public String createQueue(String address, String routingType, String name, String filter, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean lastValue, String lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception {
-            return (String) proxy.invokeOperation("createQueue", address, routingType, name, filter, durable, maxConsumers, purgeOnNoConsumers, exclusive, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
+         public String createQueue(String address, String routingType, String name, String filter, boolean durable, int maxConsumers, boolean purgeOnNoConsumers, boolean exclusive, boolean groupRebalance, int groupBuckets, boolean lastValue, String lastValueKey, boolean nonDestructive, int consumersBeforeDispatch, long delayBeforeDispatch, boolean autoCreateAddress) throws Exception {
+            return (String) proxy.invokeOperation("createQueue", address, routingType, name, filter, durable, maxConsumers, purgeOnNoConsumers, exclusive, groupRebalance, groupBuckets, lastValue, lastValueKey, nonDestructive, consumersBeforeDispatch, delayBeforeDispatch, autoCreateAddress);
          }
 
          @Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java
index 6b05eec..421013a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/QueueConfigRestartTest.java
@@ -115,7 +115,7 @@ public class QueueConfigRestartTest extends ActiveMQTestBase {
       SimpleString address = new SimpleString("test.address");
       SimpleString queue = new SimpleString("test.queue");
 
-      server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, true, true, null, false, consumersBeforeDispatch, -1, true);
+      server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, true, false, -1, true, null, false, consumersBeforeDispatch, -1, true);
 
       QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue);
       Assert.assertEquals(consumersBeforeDispatch, queueBinding1.getQueue().getConsumersBeforeDispatch());
@@ -138,7 +138,7 @@ public class QueueConfigRestartTest extends ActiveMQTestBase {
       SimpleString address = new SimpleString("test.address");
       SimpleString queue = new SimpleString("test.queue");
 
-      server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, true, true, null, false,0, delayBeforeDispatch, true);
+      server.createQueue(address, RoutingType.MULTICAST, queue, null, null, true, false, false, false,false, 10, true, true, false, -1, true, null, false,0, delayBeforeDispatch, true);
 
       QueueBinding queueBinding1 = (QueueBinding)server.getPostOffice().getBinding(queue);
       Assert.assertEquals(delayBeforeDispatch, queueBinding1.getQueue().getDelayBeforeDispatch());
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
index 15ac691..e81728c 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/postoffice/impl/FakeQueue.java
@@ -126,6 +126,26 @@ public class FakeQueue extends CriticalComponentImpl implements Queue {
    }
 
    @Override
+   public int getGroupBuckets() {
+      return 0;
+   }
+
+   @Override
+   public void setGroupBuckets(int groupBuckets) {
+
+   }
+
+   @Override
+   public boolean isGroupRebalance() {
+      return false;
+   }
+
+   @Override
+   public void setGroupRebalance(boolean groupRebalance) {
+
+   }
+
+   @Override
    public boolean isConfigurationManaged() {
       return false;
    }
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
index 4bfb7e6..1f775e6 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/server/impl/fakes/FakePostOffice.java
@@ -51,6 +51,8 @@ public class FakePostOffice implements PostOffice {
                                    Integer maxConsumers,
                                    Boolean purgeOnNoConsumers,
                                    Boolean exclusive,
+                                   Boolean groupRebalance,
+                                   Integer groupBuckets,
                                    Boolean lastValue,
                                    Integer consumersBeforeDispatch,
                                    Long delayBeforeDispatch,