You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/09/14 04:45:37 UTC

[rocketmq] branch optimize_rebalance updated: [ISSUE #2149] Apache RocketMQ rebalancing architecture optimization (#2169)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch optimize_rebalance
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/optimize_rebalance by this push:
     new c60cb0f  [ISSUE #2149] Apache RocketMQ rebalancing architecture optimization (#2169)
c60cb0f is described below

commit c60cb0fccbf1a2bc8e4b92d75f4e159691b885f7
Author: Jack Tsai <ja...@outlook.com>
AuthorDate: Mon Sep 14 12:45:24 2020 +0800

    [ISSUE #2149] Apache RocketMQ rebalancing architecture optimization (#2169)
---
 .../apache/rocketmq/broker/BrokerController.java   |   9 +
 .../broker/processor/ConsumerManageProcessor.java  |  87 ++++-
 .../processor/ConsumerManageProcessorTest.java     | 115 +++++++
 .../broker/processor/PullMessageProcessorTest.java |   2 +-
 .../rebalance/AllocateMessageQueueStickyTest.java  | 355 +++++++++++++++++++++
 .../client/consumer/DefaultLitePullConsumer.java   |  16 +-
 .../client/consumer/DefaultMQPullConsumer.java     |  15 +-
 .../client/consumer/DefaultMQPushConsumer.java     |  24 +-
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  36 +++
 .../impl/consumer/DefaultLitePullConsumerImpl.java |   5 +
 .../impl/consumer/DefaultMQPullConsumerImpl.java   |   5 +
 .../impl/consumer/DefaultMQPushConsumerImpl.java   |   5 +
 .../client/impl/consumer/MQConsumerInner.java      |   2 +
 .../client/impl/consumer/RebalanceImpl.java        |  39 ++-
 .../impl/consumer/RebalanceLitePullImpl.java       |   7 +-
 .../client/impl/consumer/RebalancePullImpl.java    |   7 +-
 .../client/impl/consumer/RebalancePushImpl.java    |   7 +-
 .../client/impl/factory/MQClientInstance.java      |  47 ++-
 .../rebalance/AllocateMachineRoomNearByTest.java   |   4 +-
 .../AllocateMessageQueueConsitentHashTest.java     |   3 +-
 .../impl/consumer/RebalancePushImplTest.java       |   2 +-
 .../common}/AllocateMessageQueueStrategy.java      |   2 +-
 .../rocketmq/common/protocol/RequestCode.java      |   2 +
 .../rocketmq/common/protocol/ResponseCode.java     |   1 +
 .../body/AllocateMessageQueueRequestBody.java      |  28 +-
 .../header/AllocateMessageQueueRequestHeader.java  |  58 ++++
 .../header/AllocateMessageQueueResponseBody.java   |  28 +-
 .../header/AllocateMessageQueueResponseHeader.java |  27 ++
 .../rebalance/AllocateMachineRoomNearby.java       |   9 +-
 .../rebalance/AllocateMessageQueueAveragely.java   |  11 +-
 .../AllocateMessageQueueAveragelyByCircle.java     |  11 +-
 .../rebalance/AllocateMessageQueueByConfig.java    |   4 +-
 .../AllocateMessageQueueByMachineRoom.java         |   4 +-
 .../AllocateMessageQueueConsistentHash.java        |  11 +-
 .../rebalance/AllocateMessageQueueSticky.java      | 220 +++++++++++++
 .../AllocateMessageQueueStrategyConstants.java     |  34 ++
 .../apache/rocketmq/example/simple/AclClient.java  |   2 +-
 .../tools/command/topic/AllocateMQSubCommand.java  |   2 +-
 38 files changed, 1156 insertions(+), 90 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 194f285..a39457a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -73,6 +73,7 @@ import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageC
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
 import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
 import org.apache.rocketmq.broker.util.ServiceProvider;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.Configuration;
 import org.apache.rocketmq.common.DataVersion;
@@ -143,6 +144,8 @@ public class BrokerController {
     private final BrokerStatsManager brokerStatsManager;
     private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
     private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
+    private final ConcurrentMap<String /* Consumer Group */, AllocateMessageQueueStrategy /* Strategy Object */> allocateMessageQueueStrategyTable
+        = new ConcurrentHashMap<String, AllocateMessageQueueStrategy>();
     private MessageStore messageStore;
     private RemotingServer remotingServer;
     private RemotingServer fastRemotingServer;
@@ -605,10 +608,12 @@ public class BrokerController {
         this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
         this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
         this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+        this.remotingServer.registerProcessor(RequestCode.ALLOCATE_MESSAGE_QUEUE, consumerManageProcessor, this.consumerManageExecutor);
 
         this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
         this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
         this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+        this.fastRemotingServer.registerProcessor(RequestCode.ALLOCATE_MESSAGE_QUEUE, consumerManageProcessor, this.consumerManageExecutor);
 
         /**
          * EndTransactionProcessor
@@ -741,6 +746,10 @@ public class BrokerController {
         return subscriptionGroupManager;
     }
 
+    public ConcurrentMap<String, AllocateMessageQueueStrategy> getAllocateMessageQueueStrategyTable() {
+        return allocateMessageQueueStrategyTable;
+    }
+
     public void shutdown() {
         if (this.brokerStatsManager != null) {
             this.brokerStatsManager.shutdown();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 77317a6..bc12a38 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -18,12 +18,18 @@ package org.apache.rocketmq.broker.processor;
 
 import io.netty.channel.ChannelHandlerContext;
 import java.util.List;
+import java.util.Map;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseHeader;
 import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
 import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
 import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
@@ -31,6 +37,14 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHead
 import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragelyByCircle;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueByConfig;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueByMachineRoom;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueConsistentHash;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueSticky;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
+import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -57,6 +71,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
                 return this.updateConsumerOffset(ctx, request);
             case RequestCode.QUERY_CONSUMER_OFFSET:
                 return this.queryConsumerOffset(ctx, request);
+            case RequestCode.ALLOCATE_MESSAGE_QUEUE:
+                return this.allocateMessageQueue(ctx, request);
             default:
                 break;
         }
@@ -152,4 +168,73 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
 
         return response;
     }
+
+    private synchronized RemotingCommand allocateMessageQueue(ChannelHandlerContext ctx, RemotingCommand request)
+        throws RemotingCommandException {
+        final RemotingCommand response =
+            RemotingCommand.createResponseCommand(AllocateMessageQueueResponseHeader.class);
+        final AllocateMessageQueueRequestHeader requestHeader =
+            (AllocateMessageQueueRequestHeader) request.decodeCommandCustomHeader(AllocateMessageQueueRequestHeader.class);
+        final AllocateMessageQueueRequestBody requestBody = AllocateMessageQueueRequestBody.decode(request.getBody(),
+            AllocateMessageQueueRequestBody.class);
+
+        AllocateMessageQueueStrategy strategy = null;
+        String consumerGroup = requestHeader.getConsumerGroup();
+        String strategyName = requestHeader.getStrategyName();
+        Map<String, AllocateMessageQueueStrategy> strategyTable = this.brokerController.getAllocateMessageQueueStrategyTable();
+
+        if (strategyTable.containsKey(consumerGroup) && strategyName.equals(strategyTable.get(consumerGroup).getName())) {
+            strategy = strategyTable.get(consumerGroup);
+        } else {
+            switch (strategyName) {
+                case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY:
+                    strategy = new AllocateMessageQueueAveragely();
+                    break;
+                case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY_BY_CIRCLE:
+                    strategy = new AllocateMessageQueueAveragelyByCircle();
+                    break;
+                case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_CONFIG:
+                    strategy = new AllocateMessageQueueByConfig();
+                    break;
+                case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_MACHINE_ROOM:
+                    strategy = new AllocateMessageQueueByMachineRoom();
+                    break;
+                case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_CONSISTENT_HASH:
+                    strategy = new AllocateMessageQueueConsistentHash();
+                    break;
+                case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY:
+                    strategy = new AllocateMessageQueueSticky();
+                    break;
+                default:
+                    response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+                    response.setRemark("AllocateMessageQueueStrategy[" + strategyName + "] is not supported by broker");
+                    return response;
+            }
+            strategyTable.put(consumerGroup, strategy);
+        }
+
+        ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(consumerGroup);
+        List<MessageQueue> allocateResult = null;
+        try {
+            allocateResult = strategy.allocate(
+                requestHeader.getConsumerGroup(),
+                requestHeader.getClientID(),
+                requestBody.getMqAll(),
+                consumerGroupInfo != null ? consumerGroupInfo.getAllClientId() : null);
+        } catch (Throwable e) {
+            log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}",
+                strategy.getName(), e);
+            response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+            response.setRemark(e.getMessage());
+            return response;
+        }
+
+        AllocateMessageQueueResponseBody body = new AllocateMessageQueueResponseBody();
+        body.setAllocateResult(allocateResult);
+        response.setBody(body.encode());
+        response.setCode(ResponseCode.SUCCESS);
+        response.setRemark(null);
+
+        return response;
+    }
 }
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java
new file mode 100644
index 0000000..1cc51d0
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConsumerManageProcessorTest {
+    private ConsumerManageProcessor consumerManageProcessor;
+    @Spy
+    private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
+    @Mock
+    private ChannelHandlerContext handlerContext;
+    @Mock
+    private Channel channel;
+
+    private ClientChannelInfo clientChannelInfo;
+    private String clientId = UUID.randomUUID().toString();
+    private String group = "FooBarGroup";
+    private String topic = "FooBar";
+    private List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
+
+    @Before
+    public void init() {
+        consumerManageProcessor = new ConsumerManageProcessor(brokerController);
+        clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100);
+
+        mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 0));
+        mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 1));
+        mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 3));
+        mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 4));
+
+        ConsumerData consumerData = createConsumerData(group, topic);
+        brokerController.getConsumerManager().registerConsumer(
+            consumerData.getGroupName(),
+            clientChannelInfo,
+            consumerData.getConsumeType(),
+            consumerData.getMessageModel(),
+            consumerData.getConsumeFromWhere(),
+            consumerData.getSubscriptionDataSet(),
+            false);
+    }
+
+    @Test
+    public void testAllocateMessageQueue() throws RemotingCommandException {
+        String emptyClientId = "";
+        RemotingCommand request = buildAllocateMessageQueueRequest(emptyClientId, AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY);
+        RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+        assertThat(response.getRemark()).isEqualTo("currentCID is empty");
+
+        request = buildAllocateMessageQueueRequest(clientId, AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY);
+        response = consumerManageProcessor.processRequest(handlerContext, request);
+        assertThat(response.getBody()).isNotNull();
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+    }
+
+    private RemotingCommand buildAllocateMessageQueueRequest(String clientId, String strategy) {
+        AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader();
+        requestHeader.setConsumerGroup(group);
+        requestHeader.setClientID(clientId);
+        requestHeader.setStrategyName(strategy);
+
+        AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody();
+        requestBody.setMqAll(mqAll);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader);
+        request.setBody(requestBody.encode());
+        request.makeCustomHeaderToNet();
+        return request;
+    }
+}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
index c96f708..245af53 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
@@ -208,7 +208,7 @@ public class PullMessageProcessorTest {
         return request;
     }
 
-    static ConsumerData createConsumerData(String group, String topic) {
+    public static ConsumerData createConsumerData(String group, String topic) {
         ConsumerData consumerData = new ConsumerData();
         consumerData.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
         consumerData.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/rebalance/AllocateMessageQueueStickyTest.java b/broker/src/test/java/org/apache/rocketmq/broker/rebalance/AllocateMessageQueueStickyTest.java
new file mode 100644
index 0000000..e908227
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/rebalance/AllocateMessageQueueStickyTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.rebalance;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+@RunWith(MockitoJUnitRunner.class)
+public class AllocateMessageQueueStickyTest {
+    private ConsumerManageProcessor consumerManageProcessor;
+    @Spy
+    private final BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
+    @Mock
+    private ChannelHandlerContext handlerContext;
+
+    private static final String CID_PREFIX = "CID-";
+
+    private final String group = "FooBarGroup";
+    private final String topic = "FooBar";
+    private List<MessageQueue> messageQueueList;
+    private List<String> consumerIdList;
+
+    @Before
+    public void init() {
+        consumerManageProcessor = new ConsumerManageProcessor(brokerController);
+        messageQueueList = new ArrayList<MessageQueue>();
+        consumerIdList = new ArrayList<String>();
+    }
+
+    @Test
+    public void testCurrentCIDNotExists() throws RemotingCommandException {
+        String currentCID = String.valueOf(Integer.MAX_VALUE);
+        createConsumerIdList(4);
+        createMessageQueueList(15);
+        RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList);
+        RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+        Assert.assertEquals(AllocateMessageQueueRequestBody.decode(response.getBody(),
+            AllocateMessageQueueResponseBody.class).getAllocateResult().size(), 0);
+    }
+
+    @Test
+    public void testCurrentCIDIllegalArgument() throws RemotingCommandException {
+        String currentCID = "";
+        createConsumerIdList(4);
+        createMessageQueueList(15);
+        RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList);
+        RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+        assertThat(response.getRemark()).isEqualTo("currentCID is empty");
+    }
+
+    @Test
+    public void testMessageQueueIllegalArgument() throws RemotingCommandException {
+        String currentCID = CID_PREFIX + 0;
+        createConsumerIdList(4);
+        createMessageQueueList(0);
+        RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList);
+        RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+        assertThat(response.getRemark()).isEqualTo("mqAll is null or mqAll empty");
+    }
+
+    @Test
+    public void testConsumerIdIllegalArgument() throws RemotingCommandException {
+        String currentCID = CID_PREFIX + 0;
+        createConsumerIdList(0);
+        createMessageQueueList(15);
+        RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList);
+        RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+        assertThat(response.getRemark()).isEqualTo("cidAll is null or cidAll empty");
+    }
+
+    @Test
+    public void testAllocateMessageQueue1() throws RemotingCommandException {
+        createConsumerIdList(4);
+        createMessageQueueList(10);
+        for (String clientId : consumerIdList) {
+            RemotingCommand request = buildAllocateMessageQueueRequest(clientId, messageQueueList);
+            RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+            assertThat(response.getBody()).isNotNull();
+            System.out.println(AllocateMessageQueueRequestBody.decode(response.getBody(),
+                AllocateMessageQueueResponseBody.class).getAllocateResult());
+            assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+        }
+    }
+
+    @Test
+    public void testAllocateMessageQueue2() throws RemotingCommandException {
+        createConsumerIdList(10);
+        createMessageQueueList(4);
+        for (String clientId : consumerIdList) {
+            RemotingCommand request = buildAllocateMessageQueueRequest(clientId, messageQueueList);
+            RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+            assertThat(response.getBody()).isNotNull();
+            assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+        }
+    }
+
+    @Test
+    public void testRun10RandomCase() throws RemotingCommandException {
+        for (int i = 0; i < 10; i++) {
+            int consumerSize = new Random().nextInt(20) + 2;
+            int queueSize = new Random().nextInt(20) + 4;
+            testAllocateMessageQueue(consumerSize, queueSize);
+            try {
+                Thread.sleep(1);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private void testAllocateMessageQueue(int consumerSize, int queueSize) throws RemotingCommandException {
+        createConsumerIdList(consumerSize);
+        createMessageQueueList(queueSize);
+
+        Map<MessageQueue, String> allocateToAllOrigin = new TreeMap<MessageQueue, String>();
+        List<MessageQueue> allocatedResAll = new ArrayList<MessageQueue>();
+        // test allocate all
+        {
+            List<String> cidBegin = new ArrayList<String>(consumerIdList);
+            for (String cid : cidBegin) {
+                RemotingCommand request = buildAllocateMessageQueueRequest(cid, messageQueueList);
+                RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+                assertThat(response.getBody()).isNotNull();
+                List<MessageQueue> rs = AllocateMessageQueueRequestBody.decode(response.getBody(),
+                    AllocateMessageQueueResponseBody.class).getAllocateResult();
+                for (MessageQueue mq : rs) {
+                    allocateToAllOrigin.put(mq, cid);
+                }
+                allocatedResAll.addAll(rs);
+                assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+            }
+
+            Assert.assertTrue(verifyAllocateAllConsumer(cidBegin, messageQueueList, allocatedResAll));
+        }
+
+        Map<MessageQueue, String> allocateToAllAfterRemoveOneConsumer = new TreeMap<MessageQueue, String>();
+        List<String> cidAfterRemoveOneConsumer = new ArrayList<String>(consumerIdList);
+        // test allocate after removing one cid
+        {
+            String removeCID = cidAfterRemoveOneConsumer.remove(0);
+            unregisterConsumer(removeCID);
+            List<MessageQueue> mqShouldBeChanged = new ArrayList<MessageQueue>();
+            for (Map.Entry<MessageQueue, String> entry : allocateToAllOrigin.entrySet()) {
+                if (entry.getValue().equals(removeCID)) {
+                    mqShouldBeChanged.add(entry.getKey());
+                }
+            }
+
+            List<MessageQueue> allocatedResAllAfterRemoveOneConsumer = new ArrayList<MessageQueue>();
+            for (String cid : cidAfterRemoveOneConsumer) {
+                RemotingCommand request = buildAllocateMessageQueueRequest(cid, messageQueueList);
+                RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+                assertThat(response.getBody()).isNotNull();
+                List<MessageQueue> rs = AllocateMessageQueueRequestBody.decode(response.getBody(),
+                    AllocateMessageQueueResponseBody.class).getAllocateResult();
+                allocatedResAllAfterRemoveOneConsumer.addAll(rs);
+                for (MessageQueue mq : rs) {
+                    allocateToAllAfterRemoveOneConsumer.put(mq, cid);
+                }
+                assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+            }
+
+            Assert.assertTrue(verifyAllocateAllConsumer(cidAfterRemoveOneConsumer, messageQueueList, allocatedResAllAfterRemoveOneConsumer));
+            verifyAfterRemoveConsumer(allocateToAllOrigin, allocateToAllAfterRemoveOneConsumer, removeCID);
+        }
+
+        Map<MessageQueue, String> allocateToAllAfterAddOneConsumer = new TreeMap<MessageQueue, String>();
+        List<String> cidAfterAddOneConsumer = new ArrayList<String>(cidAfterRemoveOneConsumer);
+        // test allocate after adding one more cid
+        {
+            String newCid = CID_PREFIX + "NEW";
+            cidAfterAddOneConsumer.add(newCid);
+            registerConsumer(newCid);
+
+            List<MessageQueue> mqShouldOnlyChanged = new ArrayList<MessageQueue>();
+            List<MessageQueue> allocatedResAllAfterAddOneConsumer = new ArrayList<MessageQueue>();
+            for (String cid : cidAfterAddOneConsumer) {
+                RemotingCommand request = buildAllocateMessageQueueRequest(cid, messageQueueList);
+                RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+                assertThat(response.getBody()).isNotNull();
+                List<MessageQueue> rs = AllocateMessageQueueRequestBody.decode(response.getBody(),
+                    AllocateMessageQueueResponseBody.class).getAllocateResult();
+                allocatedResAllAfterAddOneConsumer.addAll(rs);
+                for (MessageQueue mq : rs) {
+                    allocateToAllAfterAddOneConsumer.put(mq, cid);
+                    if (cid.equals(newCid)) {
+                        mqShouldOnlyChanged.add(mq);
+                    }
+                }
+                assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+            }
+
+            Assert.assertTrue(verifyAllocateAllConsumer(cidAfterAddOneConsumer, messageQueueList, allocatedResAllAfterAddOneConsumer));
+            verifyAfterAddConsumer(allocateToAllAfterRemoveOneConsumer, allocateToAllAfterAddOneConsumer, newCid);
+        }
+
+        Map<MessageQueue, String> allocateToAllAfterRemoveTwoMq = new TreeMap<MessageQueue, String>();
+        List<MessageQueue> mqAfterRemoveTwoMq = new ArrayList<>(messageQueueList);
+        // test allocate after removing two message queues
+        {
+            for (int i = 0; i < 2; i++) {
+                mqAfterRemoveTwoMq.remove(i);
+            }
+
+            List<MessageQueue> allocatedResAfterRemoveTwoMq = new ArrayList<MessageQueue>();
+            for (String cid : cidAfterAddOneConsumer) {
+                RemotingCommand request = buildAllocateMessageQueueRequest(cid, mqAfterRemoveTwoMq);
+                RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+                assertThat(response.getBody()).isNotNull();
+                List<MessageQueue> rs = AllocateMessageQueueRequestBody.decode(response.getBody(),
+                    AllocateMessageQueueResponseBody.class).getAllocateResult();
+                allocatedResAfterRemoveTwoMq.addAll(rs);
+                for (MessageQueue mq : rs) {
+                    allocateToAllAfterRemoveTwoMq.put(mq, cid);
+                }
+                assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+            }
+
+            Assert.assertTrue(verifyAllocateAllConsumer(cidAfterAddOneConsumer, mqAfterRemoveTwoMq, allocatedResAfterRemoveTwoMq));
+        }
+    }
+
+    private void verifyAfterAddConsumer(Map<MessageQueue, String> allocateBefore,
+        Map<MessageQueue, String> allocateAfter, String newCID) {
+        for (MessageQueue mq : allocateAfter.keySet()) {
+            String allocateToOrigin = allocateBefore.get(mq);
+            String allocateToAfter = allocateAfter.get(mq);
+            if (!allocateToAfter.equals(newCID)) { // the rest queues should be the same
+                Assert.assertEquals(allocateToAfter, allocateToOrigin);
+            }
+        }
+    }
+
+    private void verifyAfterRemoveConsumer(Map<MessageQueue, String> allocateToBefore,
+        Map<MessageQueue, String> allocateAfter, String removeCID) {
+        for (MessageQueue mq : allocateToBefore.keySet()) {
+            String allocateToOrigin = allocateToBefore.get(mq);
+            String allocateToAfter = allocateAfter.get(mq);
+            if (!allocateToOrigin.equals(removeCID)) { // the rest queues should be the same
+                Assert.assertEquals(allocateToAfter, allocateToOrigin); // should be the same
+            }
+        }
+    }
+
+    private boolean verifyAllocateAllConsumer(List<String> cidAll, List<MessageQueue> mqAll,
+        List<MessageQueue> allocatedResAll) {
+        if (cidAll.isEmpty()) {
+            return allocatedResAll.isEmpty();
+        }
+        return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll);
+    }
+
+    private void registerConsumer(String clientId) {
+        Channel channel = mock(Channel.class);
+        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100);
+        ConsumerData consumerData = createConsumerData(group, topic);
+        brokerController.getConsumerManager().registerConsumer(
+            consumerData.getGroupName(),
+            clientChannelInfo,
+            consumerData.getConsumeType(),
+            consumerData.getMessageModel(),
+            consumerData.getConsumeFromWhere(),
+            consumerData.getSubscriptionDataSet(),
+            false);
+        consumerIdList.add(clientId);
+    }
+
+    private void unregisterConsumer(String clientId) {
+        Channel channel = brokerController.getConsumerManager().getConsumerGroupInfo(group).findChannel(clientId).getChannel();
+        brokerController.getConsumerManager().unregisterConsumer(group,
+            new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100), true);
+        consumerIdList.remove(clientId);
+    }
+
+    private RemotingCommand buildAllocateMessageQueueRequest(String clientId, List<MessageQueue> messageQueueList) {
+        AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader();
+        requestHeader.setConsumerGroup(group);
+        requestHeader.setClientID(clientId);
+        requestHeader.setStrategyName(AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY);
+
+        AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody();
+        requestBody.setMqAll(messageQueueList);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader);
+        request.setBody(requestBody.encode());
+        request.makeCustomHeaderToNet();
+
+        return request;
+    }
+
+    private void createConsumerIdList(int size) {
+        for (int i = 0; i < size; i++) {
+            String clientId = CID_PREFIX + i;
+            registerConsumer(clientId);
+        }
+    }
+
+    private void createMessageQueueList(int size) {
+        for (int i = 0; i < size; i++) {
+            MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+            messageQueueList.add(mq);
+        }
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 6718eb5..3ec19f4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -19,10 +19,10 @@ package org.apache.rocketmq.client.consumer;
 import java.util.Collection;
 import java.util.List;
 import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.remoting.RPCHook;
 
 public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
@@ -66,6 +67,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
      * Consumption pattern,default is clustering
      */
     private MessageModel messageModel = MessageModel.CLUSTERING;
+
+    /**
+     * The switch for applying the rebalancing calculation task at the broker side
+     */
+    private boolean rebalanceByBroker = false;
     /**
      * Message queue listener
      */
@@ -409,6 +415,14 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
         this.messageModel = messageModel;
     }
 
+    public boolean isRebalanceByBroker() {
+        return rebalanceByBroker;
+    }
+
+    public void setRebalanceByBroker(boolean rebalanceByBroker) {
+        this.rebalanceByBroker = rebalanceByBroker;
+    }
+
     public String getConsumerGroup() {
         return consumerGroup;
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 0876a94..b9069dc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -20,17 +20,18 @@ import java.util.HashSet;
 import java.util.Set;
 import org.apache.rocketmq.client.ClientConfig;
 import org.apache.rocketmq.client.QueryResult;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.message.MessageDecoder;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 
@@ -66,6 +67,10 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
      */
     private MessageModel messageModel = MessageModel.CLUSTERING;
     /**
+     * The switch for applying the rebalancing calculation task at the broker side
+     */
+    private boolean rebalanceByBroker = false;
+    /**
      * Message queue listener
      */
     private MessageQueueListener messageQueueListener;
@@ -245,6 +250,14 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
         this.messageModel = messageModel;
     }
 
+    public boolean isRebalanceByBroker() {
+        return rebalanceByBroker;
+    }
+
+    public void setRebalanceByBroker(boolean rebalanceByBroker) {
+        this.rebalanceByBroker = rebalanceByBroker;
+    }
+
     public MessageQueueListener getMessageQueueListener() {
         return messageQueueListener;
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 9011117..743dda7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -24,7 +24,6 @@ import org.apache.rocketmq.client.QueryResult;
 import org.apache.rocketmq.client.consumer.listener.MessageListener;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.exception.MQBrokerException;
 import org.apache.rocketmq.client.exception.MQClientException;
@@ -33,6 +32,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
 import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
 import org.apache.rocketmq.client.trace.TraceDispatcher;
 import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -93,6 +94,19 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
     private MessageModel messageModel = MessageModel.CLUSTERING;
 
     /**
+     * The switch for applying the rebalancing calculation task at the broker side.
+     * </p>
+     *
+     * RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with
+     * the same {@link #consumerGroup} would execute rebalancing calculations at the client side in default. The switch
+     * is responsible for shifting the rebalancing calculation task to the broker side.
+     * </p>
+     *
+     * This field defaults to false.
+     */
+    private boolean rebalanceByBroker = false;
+
+    /**
      * Consuming point on consumer booting.
      * </p>
      *
@@ -574,6 +588,14 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
         this.messageModel = messageModel;
     }
 
+    public boolean isRebalanceByBroker() {
+        return rebalanceByBroker;
+    }
+
+    public void setRebalanceByBroker(boolean rebalanceByBroker) {
+        this.rebalanceByBroker = rebalanceByBroker;
+    }
+
     public int getPullBatchSize() {
         return pullBatchSize;
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index c64d7c5..6ebb8ad 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -63,6 +63,7 @@ import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.common.protocol.RequestCode;
 import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
 import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
 import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
 import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
@@ -86,6 +87,8 @@ import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
 import org.apache.rocketmq.common.protocol.body.TopicList;
 import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody;
 import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
 import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
@@ -901,6 +904,39 @@ public class MQClientAPIImpl {
         throw new MQBrokerException(response.getCode(), response.getRemark());
     }
 
+    public List<MessageQueue> getAllocateResultByStrategy(final String addr, final String group, final String clientId,
+        final String strategyName, final List<MessageQueue> mqAll, final long timeoutMillis)
+        throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+        MQBrokerException, InterruptedException {
+        AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader();
+        requestHeader.setConsumerGroup(group);
+        requestHeader.setClientID(clientId);
+        requestHeader.setStrategyName(strategyName);
+
+        AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody();
+        requestBody.setMqAll(mqAll);
+
+        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader);
+        request.setBody(requestBody.encode());
+
+        RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+            request, timeoutMillis);
+        assert response != null;
+        switch (response.getCode()) {
+            case ResponseCode.SUCCESS: {
+                if (response.getBody() != null) {
+                    AllocateMessageQueueResponseBody body = AllocateMessageQueueRequestBody.decode(response.getBody(),
+                        AllocateMessageQueueResponseBody.class);
+                    return body.getAllocateResult();
+                }
+            }
+            default:
+                break;
+        }
+
+        throw new MQBrokerException(response.getCode(), response.getRemark());
+    }
+
     public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException {
         GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index e3d60ff..5d06df4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -874,6 +874,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
     }
 
     @Override
+    public boolean rebalanceByBroker() {
+        return this.defaultLitePullConsumer.isRebalanceByBroker();
+    }
+
+    @Override
     public ConsumeType consumeType() {
         return ConsumeType.CONSUME_ACTIVELY;
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index afd72a0..8bd1c31 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -346,6 +346,11 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
     }
 
     @Override
+    public boolean rebalanceByBroker() {
+        return this.defaultMQPullConsumer.isRebalanceByBroker();
+    }
+
+    @Override
     public ConsumeType consumeType() {
         return ConsumeType.CONSUME_ACTIVELY;
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index ab585ea..afefd97 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -991,6 +991,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
     }
 
     @Override
+    public boolean rebalanceByBroker() {
+        return this.defaultMQPushConsumer.isRebalanceByBroker();
+    }
+
+    @Override
     public ConsumeType consumeType() {
         return ConsumeType.CONSUME_PASSIVELY;
     }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
index c2e8a1d..242820f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
@@ -32,6 +32,8 @@ public interface MQConsumerInner {
 
     MessageModel messageModel();
 
+    boolean rebalanceByBroker();
+
     ConsumeType consumeType();
 
     ConsumeFromWhere consumeFromWhere();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index b8972a9..a5a22fe 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -27,18 +27,19 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.client.impl.FindBrokerResult;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
 import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
+import org.apache.rocketmq.logging.InternalLogger;
 
 public abstract class RebalanceImpl {
     protected static final InternalLogger log = ClientLogger.getLog();
@@ -277,16 +278,30 @@ public abstract class RebalanceImpl {
                     AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
 
                     List<MessageQueue> allocateResult = null;
-                    try {
-                        allocateResult = strategy.allocate(
+
+                    if (isRebalanceByBroker()) {
+                        allocateResult = this.mQClientFactory.getAllocateResult(
+                            topic,
                             this.consumerGroup,
-                            this.mQClientFactory.getClientId(),
-                            mqAll,
-                            cidAll);
-                    } catch (Throwable e) {
-                        log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
-                            e);
-                        return;
+                            strategy.getName(),
+                            mqAll);
+                    } else {
+                        if (strategy.getName().equals(AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY)) {
+                            log.error("AllocateMessageQueueStrategy is not supported while rebalanceByBroker=false. allocateMessageQueueStrategyName={}",
+                                strategy.getName());
+                            return;
+                        }
+                        try {
+                            allocateResult = strategy.allocate(
+                                this.consumerGroup,
+                                this.mQClientFactory.getClientId(),
+                                mqAll,
+                                cidAll);
+                        } catch (Throwable e) {
+                            log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
+                                e);
+                            return;
+                        }
                     }
 
                     Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
@@ -412,6 +427,8 @@ public abstract class RebalanceImpl {
 
     public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
 
+    public abstract boolean isRebalanceByBroker();
+
     public void removeProcessQueue(final MessageQueue mq) {
         ProcessQueue prev = this.processQueueTable.remove(mq);
         if (prev != null) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
index 9d1ea74..31c3fda 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -18,11 +18,11 @@ package org.apache.rocketmq.client.impl.consumer;
 
 import java.util.List;
 import java.util.Set;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -142,4 +142,9 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
     public void dispatchPullRequest(List<PullRequest> pullRequestList) {
     }
 
+    @Override
+    public boolean isRebalanceByBroker() {
+        return this.litePullConsumerImpl.rebalanceByBroker();
+    }
+
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
index 9dd408c..e9373f6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
@@ -18,9 +18,9 @@ package org.apache.rocketmq.client.impl.consumer;
 
 import java.util.List;
 import java.util.Set;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.client.consumer.MessageQueueListener;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -76,4 +76,9 @@ public class RebalancePullImpl extends RebalanceImpl {
     @Override
     public void dispatchPullRequest(List<PullRequest> pullRequestList) {
     }
+
+    @Override
+    public boolean isRebalanceByBroker() {
+        return this.defaultMQPullConsumerImpl.rebalanceByBroker();
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index e5166f3..8341480 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -19,11 +19,11 @@ package org.apache.rocketmq.client.impl.consumer;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -218,4 +218,9 @@ public class RebalancePushImpl extends RebalanceImpl {
             log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
         }
     }
+
+    @Override
+    public boolean isRebalanceByBroker() {
+        return this.defaultMQPushConsumerImpl.rebalanceByBroker();
+    }
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index e937ce3..aad5134 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.impl.factory;
 
 import java.io.UnsupportedEncodingException;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -442,7 +443,7 @@ public class MQClientInstance {
                 }
                 // may need to check one broker every cluster...
                 // assume that the configs of every broker in cluster are the the same.
-                String addr = findBrokerAddrByTopic(subscriptionData.getTopic());
+                String addr = findRandomBrokerAddrByTopic(subscriptionData.getTopic());
 
                 if (addr != null) {
                     try {
@@ -1069,10 +1070,10 @@ public class MQClientInstance {
     }
 
     public List<String> findConsumerIdList(final String topic, final String group) {
-        String brokerAddr = this.findBrokerAddrByTopic(topic);
+        String brokerAddr = this.findRandomBrokerAddrByTopic(topic);
         if (null == brokerAddr) {
             this.updateTopicRouteInfoFromNameServer(topic);
-            brokerAddr = this.findBrokerAddrByTopic(topic);
+            brokerAddr = this.findRandomBrokerAddrByTopic(topic);
         }
 
         if (null != brokerAddr) {
@@ -1086,7 +1087,7 @@ public class MQClientInstance {
         return null;
     }
 
-    public String findBrokerAddrByTopic(final String topic) {
+    public String findRandomBrokerAddrByTopic(final String topic) {
         TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
         if (topicRouteData != null) {
             List<BrokerData> brokers = topicRouteData.getBrokerDatas();
@@ -1100,6 +1101,24 @@ public class MQClientInstance {
         return null;
     }
 
+    public String findUniqueBrokerAddrByTopic(final String topic) {
+        TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
+        if (topicRouteData != null) {
+            List<BrokerData> brokers = topicRouteData.getBrokerDatas();
+            if (!brokers.isEmpty()) {
+                Collections.sort(brokers, new Comparator<BrokerData>() {
+                    @Override public int compare(BrokerData o1, BrokerData o2) {
+                        return o1.getBrokerName().compareTo(o2.getBrokerName());
+                    }
+                });
+                BrokerData bd = brokers.get(0);
+                return bd.selectBrokerAddr();
+            }
+        }
+
+        return null;
+    }
+
     public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
         DefaultMQPushConsumerImpl consumer = null;
         try {
@@ -1161,6 +1180,26 @@ public class MQClientInstance {
         }
     }
 
+    public List<MessageQueue> getAllocateResult(final String topic, final String group, final String strategyName,
+        final List<MessageQueue> mqAll) {
+        String brokerAddr = this.findUniqueBrokerAddrByTopic(topic);
+        if (null == brokerAddr) {
+            this.updateTopicRouteInfoFromNameServer(topic);
+            brokerAddr = this.findUniqueBrokerAddrByTopic(topic);
+        }
+
+        if (null != brokerAddr) {
+            try {
+                return this.mQClientAPIImpl.getAllocateResultByStrategy(brokerAddr, group, clientId, strategyName,
+                    mqAll, 3000);
+            } catch (Exception e) {
+                log.warn("getAllocateResultByStrategy exception, {} {}", brokerAddr, group, e);
+            }
+        }
+
+        return null;
+    }
+
     public TopicRouteData getAnExistTopicRouteData(final String topic) {
         return this.topicRouteTable.get(topic);
     }
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java
index 0d394c3..755ad90 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java
@@ -23,8 +23,10 @@ import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.rebalance.AllocateMachineRoomNearby;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
index 98ce7b6..aa9cfba 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
@@ -22,8 +22,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.TreeMap;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueConsistentHash;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
index 796a394..68260c1 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
@@ -20,13 +20,13 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.consumer.store.OffsetStore;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java b/common/src/main/java/org/apache/rocketmq/common/AllocateMessageQueueStrategy.java
similarity index 97%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
rename to common/src/main/java/org/apache/rocketmq/common/AllocateMessageQueueStrategy.java
index c1f0604..e467609 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
+++ b/common/src/main/java/org/apache/rocketmq/common/AllocateMessageQueueStrategy.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.consumer;
+package org.apache.rocketmq.common;
 
 import java.util.List;
 import org.apache.rocketmq.common.message.MessageQueue;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 75ceff3..90642c2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -33,6 +33,8 @@ public class RequestCode {
 
     public static final int GET_TOPIC_NAME_LIST = 23;
 
+    public static final int ALLOCATE_MESSAGE_QUEUE = 24;
+
     public static final int UPDATE_BROKER_CONFIG = 25;
 
     public static final int GET_BROKER_CONFIG = 26;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index dc74444..0cef0af 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -80,4 +80,5 @@ public class ResponseCode extends RemotingSysResponseCode {
 
     public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;
 
+    public static final int ALLOCATE_MESSAGE_QUEUE_FAILED = 212;
 }
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/AllocateMessageQueueRequestBody.java
similarity index 54%
copy from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/body/AllocateMessageQueueRequestBody.java
index e548803..3e05158 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/AllocateMessageQueueRequestBody.java
@@ -14,31 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.consumer.rebalance;
+
+package org.apache.rocketmq.common.protocol.body;
 
 import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
-public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
-    private List<MessageQueue> messageQueueList;
-
-    @Override
-    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
-        List<String> cidAll) {
-        return this.messageQueueList;
-    }
-
-    @Override
-    public String getName() {
-        return "CONFIG";
-    }
+public class AllocateMessageQueueRequestBody extends RemotingSerializable {
+    private List<MessageQueue> mqAll;
 
-    public List<MessageQueue> getMessageQueueList() {
-        return messageQueueList;
+    public List<MessageQueue> getMqAll() {
+        return mqAll;
     }
 
-    public void setMessageQueueList(List<MessageQueue> messageQueueList) {
-        this.messageQueueList = messageQueueList;
+    public void setMqAll(List<MessageQueue> mqAll) {
+        this.mqAll = mqAll;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueRequestHeader.java
new file mode 100644
index 0000000..4c0c11e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueRequestHeader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class AllocateMessageQueueRequestHeader implements CommandCustomHeader {
+    @CFNotNull
+    private String consumerGroup;
+    @CFNotNull
+    private String clientID;
+    @CFNotNull
+    private String strategyName;
+
+    @Override public void checkFields() throws RemotingCommandException {
+    }
+
+    public String getConsumerGroup() {
+        return consumerGroup;
+    }
+
+    public void setConsumerGroup(String consumerGroup) {
+        this.consumerGroup = consumerGroup;
+    }
+
+    public String getClientID() {
+        return clientID;
+    }
+
+    public void setClientID(String clientID) {
+        this.clientID = clientID;
+    }
+
+    public String getStrategyName() {
+        return strategyName;
+    }
+
+    public void setStrategyName(String strategyName) {
+        this.strategyName = strategyName;
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseBody.java
similarity index 54%
copy from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseBody.java
index e548803..a1d4f47 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseBody.java
@@ -14,31 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.consumer.rebalance;
+
+package org.apache.rocketmq.common.protocol.header;
 
 import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
-public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
-    private List<MessageQueue> messageQueueList;
-
-    @Override
-    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
-        List<String> cidAll) {
-        return this.messageQueueList;
-    }
-
-    @Override
-    public String getName() {
-        return "CONFIG";
-    }
+public class AllocateMessageQueueResponseBody extends RemotingSerializable {
+    private List<MessageQueue> allocateResult;
 
-    public List<MessageQueue> getMessageQueueList() {
-        return messageQueueList;
+    public List<MessageQueue> getAllocateResult() {
+        return allocateResult;
     }
 
-    public void setMessageQueueList(List<MessageQueue> messageQueueList) {
-        this.messageQueueList = messageQueueList;
+    public void setAllocateResult(List<MessageQueue> allocateResult) {
+        this.allocateResult = allocateResult;
     }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseHeader.java
new file mode 100644
index 0000000..7fc460b
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseHeader.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class AllocateMessageQueueResponseHeader implements CommandCustomHeader {
+
+    @Override public void checkFields() throws RemotingCommandException {
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMachineRoomNearby.java
similarity index 94%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMachineRoomNearby.java
index ec0f7f6..4750ba2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMachineRoomNearby.java
@@ -14,17 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 
 /**
  * An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
@@ -35,7 +36,7 @@ import org.apache.rocketmq.logging.InternalLogger;
  * no alive consumer to monopolize them.
  */
 public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {
-    private final InternalLogger log = ClientLogger.getLog();
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
     private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategy
     private final MachineRoomResolver machineRoomResolver;
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragely.java
similarity index 88%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragely.java
index 155e692..525bc82 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragely.java
@@ -14,20 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 
 /**
  * Average Hashing queue algorithm
  */
 public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
-    private final InternalLogger log = ClientLogger.getLog();
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
     @Override
     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragelyByCircle.java
similarity index 87%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragelyByCircle.java
index fe78f0a..4bdfa45 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragelyByCircle.java
@@ -14,20 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 
 /**
  * Cycle average Hashing queue algorithm
  */
 public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
-    private final InternalLogger log = ClientLogger.getLog();
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
     @Override
     public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByConfig.java
similarity index 92%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByConfig.java
index e548803..a5aaf66 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByConfig.java
@@ -14,10 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
 
 import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.message.MessageQueue;
 
 public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByMachineRoom.java
similarity index 95%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByMachineRoom.java
index 3756831..bac3dac 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByMachineRoom.java
@@ -14,12 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
 
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.message.MessageQueue;
 
 /**
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueConsistentHash.java
similarity index 92%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueConsistentHash.java
index 65dcf79..8baf7c7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueConsistentHash.java
@@ -14,24 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
 import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
 import org.apache.rocketmq.common.consistenthash.HashFunction;
 import org.apache.rocketmq.common.consistenthash.Node;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 
 /**
  * Consistent Hashing queue algorithm
  */
 public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {
-    private final InternalLogger log = ClientLogger.getLog();
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
 
     private final int virtualNodeCnt;
     private final HashFunction customHashFunction;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueSticky.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueSticky.java
new file mode 100644
index 0000000..06a9aba
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueSticky.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.rebalance;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class AllocateMessageQueueSticky implements AllocateMessageQueueStrategy {
+    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+
+    private final Map<String, List<MessageQueue>> messageQueueAllocation = new HashMap<String, List<MessageQueue>>();
+
+    private final List<MessageQueue> unassignedQueues = new ArrayList<MessageQueue>();
+
+    @Override
+    public List<MessageQueue> allocate(String consumerGroup, String currentCID,
+        List<MessageQueue> mqAll, final List<String> cidAll) {
+        if (currentCID == null || currentCID.length() < 1) {
+            throw new IllegalArgumentException("currentCID is empty");
+        }
+        if (mqAll == null || mqAll.isEmpty()) {
+            throw new IllegalArgumentException("mqAll is null or mqAll empty");
+        }
+        if (cidAll == null || cidAll.isEmpty()) {
+            throw new IllegalArgumentException("cidAll is null or cidAll empty");
+        }
+
+        List<MessageQueue> result = new ArrayList<MessageQueue>();
+        if (!cidAll.contains(currentCID)) {
+            log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
+                consumerGroup,
+                currentCID,
+                cidAll);
+            return result;
+        }
+
+        // Rebalance (or fresh assignment) needed
+        if (cidAll.size() != messageQueueAllocation.size() || mqAll.size() != getPrevMqAll().size()) {
+            // Update messageQueueAllocation
+            updateMessageQueueAllocation(mqAll, cidAll);
+
+            // Sort consumers based on how many message queues are assigned to them
+            TreeSet<String> sortedSubscriptions = new TreeSet<String>(new ConsumerComparator(messageQueueAllocation));
+            sortedSubscriptions.addAll(messageQueueAllocation.keySet());
+
+            // Assign unassignedQueues to consumers so that queues allocations are as balanced as possible
+            for (MessageQueue mq : unassignedQueues) {
+                String consumer = sortedSubscriptions.first();
+                sortedSubscriptions.remove(consumer);
+                messageQueueAllocation.get(consumer).add(mq);
+                sortedSubscriptions.add(consumer);
+            }
+            unassignedQueues.clear();
+
+            // Reassignment until no message queue can be moved to improve the balance
+            Map<String, List<MessageQueue>> preBalanceAllocation = new HashMap<String, List<MessageQueue>>(messageQueueAllocation);
+            while (!isBalanced(sortedSubscriptions)) {
+                String leastSubscribedConsumer = sortedSubscriptions.first();
+                String mostSubscribedConsumer = sortedSubscriptions.last();
+                MessageQueue mqFromMostSubscribedConsumer = messageQueueAllocation.get(mostSubscribedConsumer).get(0);
+                messageQueueAllocation.get(leastSubscribedConsumer).add(mqFromMostSubscribedConsumer);
+                messageQueueAllocation.get(mostSubscribedConsumer).remove(mqFromMostSubscribedConsumer);
+            }
+
+            // Make sure it is getting a more balanced allocation than before; otherwise, revert to previous allocation
+            if (getBalanceScore(messageQueueAllocation) >= getBalanceScore(preBalanceAllocation)) {
+                deepCopy(preBalanceAllocation, messageQueueAllocation);
+            }
+        }
+
+        return messageQueueAllocation.get(currentCID);
+    }
+
+    private void updateMessageQueueAllocation(List<MessageQueue> mqAll, List<String> cidAll) {
+        // The current size of consumers is larger than before
+        if (cidAll.size() > messageQueueAllocation.size()) {
+            for (String cid : cidAll) {
+                if (!messageQueueAllocation.containsKey(cid)) {
+                    messageQueueAllocation.put(cid, new ArrayList<MessageQueue>());
+                }
+            }
+        }
+
+        // The current size of consumers is smaller than before
+        if (cidAll.size() < messageQueueAllocation.size()) {
+            Iterator<Map.Entry<String, List<MessageQueue>>> it = messageQueueAllocation.entrySet().iterator();
+            while (it.hasNext()) {
+                Map.Entry<String, List<MessageQueue>> entry = it.next();
+                if (!cidAll.contains(entry.getKey())) {
+                    it.remove();
+                }
+            }
+        }
+
+        // The current size of message queues is larger than before
+        List<MessageQueue> prevMqAll = getPrevMqAll();
+        if (mqAll.size() > prevMqAll.size()) {
+            for (MessageQueue mq : mqAll) {
+                if (!prevMqAll.contains(mq)) {
+                    unassignedQueues.add(mq);
+                }
+            }
+        }
+
+        // The current size of message queues is smaller than before
+        if (mqAll.size() < prevMqAll.size()) {
+            for (MessageQueue prevMq : prevMqAll) {
+                if (!isSameQueueIdExists(mqAll, prevMq.getQueueId())) {
+                    for (List<MessageQueue> prevMqs : messageQueueAllocation.values()) {
+                        prevMqs.remove(prevMq);
+                    }
+                }
+            }
+        }
+    }
+
+    private static class ConsumerComparator implements Comparator<String>, Serializable {
+        private final Map<String, List<MessageQueue>> map;
+
+        ConsumerComparator(Map<String, List<MessageQueue>> map) {
+            this.map = map;
+        }
+
+        @Override
+        public int compare(String o1, String o2) {
+            int ret = map.get(o1).size() - map.get(o2).size();
+            if (ret == 0) {
+                ret = o1.compareTo(o2);
+            }
+            return ret;
+        }
+    }
+
+    private boolean isSameQueueIdExists(List<MessageQueue> mqAll, int prevMqId) {
+        for (MessageQueue mq : mqAll) {
+            if (mq.getQueueId() == prevMqId) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean isBalanced(TreeSet<String> sortedCurrentSubscriptions) {
+        int min = this.messageQueueAllocation.get(sortedCurrentSubscriptions.first()).size();
+        int max = this.messageQueueAllocation.get(sortedCurrentSubscriptions.last()).size();
+        // if minimum and maximum numbers of message queues allocated to consumers differ by at most 1
+        return min >= max - 1;
+    }
+
+    /**
+     * @return The balance score of the given allocation, which is the sum of assigned queues size difference of all
+     * consumer. A well balanced allocation with balance score of 0 (all consumers getting the same number of
+     * allocations). Lower balance score represents a more balanced allocation.
+     */
+    private int getBalanceScore(Map<String, List<MessageQueue>> allocation) {
+        int score = 0;
+
+        Map<String, Integer> consumerAllocationSizes = new HashMap<String, Integer>();
+        for (Map.Entry<String, List<MessageQueue>> entry : allocation.entrySet()) {
+            consumerAllocationSizes.put(entry.getKey(), entry.getValue().size());
+        }
+
+        Iterator<Map.Entry<String, Integer>> it = consumerAllocationSizes.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, Integer> entry = it.next();
+            int consumerAllocationSize = entry.getValue();
+            it.remove();
+            for (Map.Entry<String, Integer> otherEntry : consumerAllocationSizes.entrySet()) {
+                score += Math.abs(consumerAllocationSize - otherEntry.getValue());
+            }
+        }
+
+        return score;
+    }
+
+    private void deepCopy(Map<String, List<MessageQueue>> source, Map<String, List<MessageQueue>> dest) {
+        dest.clear();
+        for (Map.Entry<String, List<MessageQueue>> entry : source.entrySet()) {
+            dest.put(entry.getKey(), new ArrayList<MessageQueue>(entry.getValue()));
+        }
+    }
+
+    private List<MessageQueue> getPrevMqAll() {
+        List<MessageQueue> prevMqAll = new ArrayList<MessageQueue>();
+        for (List<MessageQueue> queues : messageQueueAllocation.values()) {
+            prevMqAll.addAll(queues);
+        }
+        return prevMqAll;
+    }
+
+    @Override
+    public String getName() {
+        return "STICKY";
+    }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueStrategyConstants.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueStrategyConstants.java
new file mode 100644
index 0000000..bbfefb3
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueStrategyConstants.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.rebalance;
+
+public class AllocateMessageQueueStrategyConstants {
+
+    public static final String ALLOCATE_MACHINE_ROOM_NEARBY = "MACHINE_ROOM_NEARBY";
+
+    public static final String ALLOCATE_MESSAGE_QUEUE_AVERAGELY = "AVG";
+
+    public static final String ALLOCATE_MESSAGE_QUEUE_AVERAGELY_BY_CIRCLE = "AVG_BY_CIRCLE";
+
+    public static final String ALLOCATE_MESSAGE_QUEUE_BY_CONFIG = "CONFIG";
+
+    public static final String ALLOCATE_MESSAGE_QUEUE_BY_MACHINE_ROOM = "MACHINE_ROOM";
+
+    public static final String ALLOCATE_MESSAGE_QUEUE_CONSISTENT_HASH = "CONSISTENT_HASH";
+
+    public static final String ALLOCATE_MESSAGE_QUEUE_STICKY = "STICKY";
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
index 0c97cd3..0e674ef 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
@@ -29,7 +29,6 @@ import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
 import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;
@@ -37,6 +36,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageExt;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
 
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
index a9b9ab0..5e5d446 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
@@ -23,10 +23,10 @@ import java.util.Set;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.Options;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.client.impl.factory.MQClientInstance;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;