You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2020/09/14 04:45:37 UTC
[rocketmq] branch optimize_rebalance updated: [ISSUE #2149] Apache
RocketMQ rebalancing architecture optimization (#2169)
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch optimize_rebalance
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/optimize_rebalance by this push:
new c60cb0f [ISSUE #2149] Apache RocketMQ rebalancing architecture optimization (#2169)
c60cb0f is described below
commit c60cb0fccbf1a2bc8e4b92d75f4e159691b885f7
Author: Jack Tsai <ja...@outlook.com>
AuthorDate: Mon Sep 14 12:45:24 2020 +0800
[ISSUE #2149] Apache RocketMQ rebalancing architecture optimization (#2169)
---
.../apache/rocketmq/broker/BrokerController.java | 9 +
.../broker/processor/ConsumerManageProcessor.java | 87 ++++-
.../processor/ConsumerManageProcessorTest.java | 115 +++++++
.../broker/processor/PullMessageProcessorTest.java | 2 +-
.../rebalance/AllocateMessageQueueStickyTest.java | 355 +++++++++++++++++++++
.../client/consumer/DefaultLitePullConsumer.java | 16 +-
.../client/consumer/DefaultMQPullConsumer.java | 15 +-
.../client/consumer/DefaultMQPushConsumer.java | 24 +-
.../rocketmq/client/impl/MQClientAPIImpl.java | 36 +++
.../impl/consumer/DefaultLitePullConsumerImpl.java | 5 +
.../impl/consumer/DefaultMQPullConsumerImpl.java | 5 +
.../impl/consumer/DefaultMQPushConsumerImpl.java | 5 +
.../client/impl/consumer/MQConsumerInner.java | 2 +
.../client/impl/consumer/RebalanceImpl.java | 39 ++-
.../impl/consumer/RebalanceLitePullImpl.java | 7 +-
.../client/impl/consumer/RebalancePullImpl.java | 7 +-
.../client/impl/consumer/RebalancePushImpl.java | 7 +-
.../client/impl/factory/MQClientInstance.java | 47 ++-
.../rebalance/AllocateMachineRoomNearByTest.java | 4 +-
.../AllocateMessageQueueConsitentHashTest.java | 3 +-
.../impl/consumer/RebalancePushImplTest.java | 2 +-
.../common}/AllocateMessageQueueStrategy.java | 2 +-
.../rocketmq/common/protocol/RequestCode.java | 2 +
.../rocketmq/common/protocol/ResponseCode.java | 1 +
.../body/AllocateMessageQueueRequestBody.java | 28 +-
.../header/AllocateMessageQueueRequestHeader.java | 58 ++++
.../header/AllocateMessageQueueResponseBody.java | 28 +-
.../header/AllocateMessageQueueResponseHeader.java | 27 ++
.../rebalance/AllocateMachineRoomNearby.java | 9 +-
.../rebalance/AllocateMessageQueueAveragely.java | 11 +-
.../AllocateMessageQueueAveragelyByCircle.java | 11 +-
.../rebalance/AllocateMessageQueueByConfig.java | 4 +-
.../AllocateMessageQueueByMachineRoom.java | 4 +-
.../AllocateMessageQueueConsistentHash.java | 11 +-
.../rebalance/AllocateMessageQueueSticky.java | 220 +++++++++++++
.../AllocateMessageQueueStrategyConstants.java | 34 ++
.../apache/rocketmq/example/simple/AclClient.java | 2 +-
.../tools/command/topic/AllocateMQSubCommand.java | 2 +-
38 files changed, 1156 insertions(+), 90 deletions(-)
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 194f285..a39457a 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -73,6 +73,7 @@ import org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageC
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge;
import org.apache.rocketmq.broker.transaction.queue.TransactionalMessageServiceImpl;
import org.apache.rocketmq.broker.util.ServiceProvider;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.Configuration;
import org.apache.rocketmq.common.DataVersion;
@@ -143,6 +144,8 @@ public class BrokerController {
private final BrokerStatsManager brokerStatsManager;
private final List<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();
private final List<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
+ private final ConcurrentMap<String /* Consumer Group */, AllocateMessageQueueStrategy /* Strategy Object */> allocateMessageQueueStrategyTable
+ = new ConcurrentHashMap<String, AllocateMessageQueueStrategy>();
private MessageStore messageStore;
private RemotingServer remotingServer;
private RemotingServer fastRemotingServer;
@@ -605,10 +608,12 @@ public class BrokerController {
this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+ this.remotingServer.registerProcessor(RequestCode.ALLOCATE_MESSAGE_QUEUE, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);
+ this.fastRemotingServer.registerProcessor(RequestCode.ALLOCATE_MESSAGE_QUEUE, consumerManageProcessor, this.consumerManageExecutor);
/**
* EndTransactionProcessor
@@ -741,6 +746,10 @@ public class BrokerController {
return subscriptionGroupManager;
}
+ public ConcurrentMap<String, AllocateMessageQueueStrategy> getAllocateMessageQueueStrategyTable() {
+ return allocateMessageQueueStrategyTable;
+ }
+
public void shutdown() {
if (this.brokerStatsManager != null) {
this.brokerStatsManager.shutdown();
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
index 77317a6..bc12a38 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessor.java
@@ -18,12 +18,18 @@ package org.apache.rocketmq.broker.processor;
import io.netty.channel.ChannelHandlerContext;
import java.util.List;
+import java.util.Map;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupRequestHeader;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
@@ -31,6 +37,14 @@ import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHead
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragelyByCircle;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueByConfig;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueByMachineRoom;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueConsistentHash;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueSticky;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
+import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
@@ -57,6 +71,8 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
return this.updateConsumerOffset(ctx, request);
case RequestCode.QUERY_CONSUMER_OFFSET:
return this.queryConsumerOffset(ctx, request);
+ case RequestCode.ALLOCATE_MESSAGE_QUEUE:
+ return this.allocateMessageQueue(ctx, request);
default:
break;
}
@@ -152,4 +168,73 @@ public class ConsumerManageProcessor extends AsyncNettyRequestProcessor implemen
return response;
}
+
+ private synchronized RemotingCommand allocateMessageQueue(ChannelHandlerContext ctx, RemotingCommand request)
+ throws RemotingCommandException {
+ final RemotingCommand response =
+ RemotingCommand.createResponseCommand(AllocateMessageQueueResponseHeader.class);
+ final AllocateMessageQueueRequestHeader requestHeader =
+ (AllocateMessageQueueRequestHeader) request.decodeCommandCustomHeader(AllocateMessageQueueRequestHeader.class);
+ final AllocateMessageQueueRequestBody requestBody = AllocateMessageQueueRequestBody.decode(request.getBody(),
+ AllocateMessageQueueRequestBody.class);
+
+ AllocateMessageQueueStrategy strategy = null;
+ String consumerGroup = requestHeader.getConsumerGroup();
+ String strategyName = requestHeader.getStrategyName();
+ Map<String, AllocateMessageQueueStrategy> strategyTable = this.brokerController.getAllocateMessageQueueStrategyTable();
+
+ if (strategyTable.containsKey(consumerGroup) && strategyName.equals(strategyTable.get(consumerGroup).getName())) {
+ strategy = strategyTable.get(consumerGroup);
+ } else {
+ switch (strategyName) {
+ case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY:
+ strategy = new AllocateMessageQueueAveragely();
+ break;
+ case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY_BY_CIRCLE:
+ strategy = new AllocateMessageQueueAveragelyByCircle();
+ break;
+ case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_CONFIG:
+ strategy = new AllocateMessageQueueByConfig();
+ break;
+ case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_BY_MACHINE_ROOM:
+ strategy = new AllocateMessageQueueByMachineRoom();
+ break;
+ case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_CONSISTENT_HASH:
+ strategy = new AllocateMessageQueueConsistentHash();
+ break;
+ case AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY:
+ strategy = new AllocateMessageQueueSticky();
+ break;
+ default:
+ response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+ response.setRemark("AllocateMessageQueueStrategy[" + strategyName + "] is not supported by broker");
+ return response;
+ }
+ strategyTable.put(consumerGroup, strategy);
+ }
+
+ ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(consumerGroup);
+ List<MessageQueue> allocateResult = null;
+ try {
+ allocateResult = strategy.allocate(
+ requestHeader.getConsumerGroup(),
+ requestHeader.getClientID(),
+ requestBody.getMqAll(),
+ consumerGroupInfo != null ? consumerGroupInfo.getAllClientId() : null);
+ } catch (Throwable e) {
+ log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}",
+ strategy.getName(), e);
+ response.setCode(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+ response.setRemark(e.getMessage());
+ return response;
+ }
+
+ AllocateMessageQueueResponseBody body = new AllocateMessageQueueResponseBody();
+ body.setAllocateResult(allocateResult);
+ response.setBody(body.encode());
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+
+ return response;
+ }
}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java
new file mode 100644
index 0000000..1cc51d0
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/ConsumerManageProcessorTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.processor;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@RunWith(MockitoJUnitRunner.class)
+public class ConsumerManageProcessorTest {
+ private ConsumerManageProcessor consumerManageProcessor;
+ @Spy
+ private BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
+ @Mock
+ private ChannelHandlerContext handlerContext;
+ @Mock
+ private Channel channel;
+
+ private ClientChannelInfo clientChannelInfo;
+ private String clientId = UUID.randomUUID().toString();
+ private String group = "FooBarGroup";
+ private String topic = "FooBar";
+ private List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
+
+ @Before
+ public void init() {
+ consumerManageProcessor = new ConsumerManageProcessor(brokerController);
+ clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100);
+
+ mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 0));
+ mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 1));
+ mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 3));
+ mqAll.add(new MessageQueue(topic, brokerController.getBrokerConfig().getBrokerName(), 4));
+
+ ConsumerData consumerData = createConsumerData(group, topic);
+ brokerController.getConsumerManager().registerConsumer(
+ consumerData.getGroupName(),
+ clientChannelInfo,
+ consumerData.getConsumeType(),
+ consumerData.getMessageModel(),
+ consumerData.getConsumeFromWhere(),
+ consumerData.getSubscriptionDataSet(),
+ false);
+ }
+
+ @Test
+ public void testAllocateMessageQueue() throws RemotingCommandException {
+ String emptyClientId = "";
+ RemotingCommand request = buildAllocateMessageQueueRequest(emptyClientId, AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+ assertThat(response.getRemark()).isEqualTo("currentCID is empty");
+
+ request = buildAllocateMessageQueueRequest(clientId, AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_AVERAGELY);
+ response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getBody()).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ private RemotingCommand buildAllocateMessageQueueRequest(String clientId, String strategy) {
+ AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader();
+ requestHeader.setConsumerGroup(group);
+ requestHeader.setClientID(clientId);
+ requestHeader.setStrategyName(strategy);
+
+ AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody();
+ requestBody.setMqAll(mqAll);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader);
+ request.setBody(requestBody.encode());
+ request.makeCustomHeaderToNet();
+ return request;
+ }
+}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
index c96f708..245af53 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PullMessageProcessorTest.java
@@ -208,7 +208,7 @@ public class PullMessageProcessorTest {
return request;
}
- static ConsumerData createConsumerData(String group, String topic) {
+ public static ConsumerData createConsumerData(String group, String topic) {
ConsumerData consumerData = new ConsumerData();
consumerData.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumerData.setConsumeType(ConsumeType.CONSUME_PASSIVELY);
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/rebalance/AllocateMessageQueueStickyTest.java b/broker/src/test/java/org/apache/rocketmq/broker/rebalance/AllocateMessageQueueStickyTest.java
new file mode 100644
index 0000000..e908227
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/rebalance/AllocateMessageQueueStickyTest.java
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.rebalance;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.TreeMap;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.client.ClientChannelInfo;
+import org.apache.rocketmq.broker.processor.ConsumerManageProcessor;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody;
+import org.apache.rocketmq.common.protocol.heartbeat.ConsumerData;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.remoting.protocol.LanguageCode;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Spy;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.apache.rocketmq.broker.processor.PullMessageProcessorTest.createConsumerData;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+
+@RunWith(MockitoJUnitRunner.class)
+public class AllocateMessageQueueStickyTest {
+ private ConsumerManageProcessor consumerManageProcessor;
+ @Spy
+ private final BrokerController brokerController = new BrokerController(new BrokerConfig(), new NettyServerConfig(), new NettyClientConfig(), new MessageStoreConfig());
+ @Mock
+ private ChannelHandlerContext handlerContext;
+
+ private static final String CID_PREFIX = "CID-";
+
+ private final String group = "FooBarGroup";
+ private final String topic = "FooBar";
+ private List<MessageQueue> messageQueueList;
+ private List<String> consumerIdList;
+
+ @Before
+ public void init() {
+ consumerManageProcessor = new ConsumerManageProcessor(brokerController);
+ messageQueueList = new ArrayList<MessageQueue>();
+ consumerIdList = new ArrayList<String>();
+ }
+
+ @Test
+ public void testCurrentCIDNotExists() throws RemotingCommandException {
+ String currentCID = String.valueOf(Integer.MAX_VALUE);
+ createConsumerIdList(4);
+ createMessageQueueList(15);
+ RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ Assert.assertEquals(AllocateMessageQueueRequestBody.decode(response.getBody(),
+ AllocateMessageQueueResponseBody.class).getAllocateResult().size(), 0);
+ }
+
+ @Test
+ public void testCurrentCIDIllegalArgument() throws RemotingCommandException {
+ String currentCID = "";
+ createConsumerIdList(4);
+ createMessageQueueList(15);
+ RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+ assertThat(response.getRemark()).isEqualTo("currentCID is empty");
+ }
+
+ @Test
+ public void testMessageQueueIllegalArgument() throws RemotingCommandException {
+ String currentCID = CID_PREFIX + 0;
+ createConsumerIdList(4);
+ createMessageQueueList(0);
+ RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+ assertThat(response.getRemark()).isEqualTo("mqAll is null or mqAll empty");
+ }
+
+ @Test
+ public void testConsumerIdIllegalArgument() throws RemotingCommandException {
+ String currentCID = CID_PREFIX + 0;
+ createConsumerIdList(0);
+ createMessageQueueList(15);
+ RemotingCommand request = buildAllocateMessageQueueRequest(currentCID, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.ALLOCATE_MESSAGE_QUEUE_FAILED);
+ assertThat(response.getRemark()).isEqualTo("cidAll is null or cidAll empty");
+ }
+
+ @Test
+ public void testAllocateMessageQueue1() throws RemotingCommandException {
+ createConsumerIdList(4);
+ createMessageQueueList(10);
+ for (String clientId : consumerIdList) {
+ RemotingCommand request = buildAllocateMessageQueueRequest(clientId, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getBody()).isNotNull();
+ System.out.println(AllocateMessageQueueRequestBody.decode(response.getBody(),
+ AllocateMessageQueueResponseBody.class).getAllocateResult());
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+ }
+
+ @Test
+ public void testAllocateMessageQueue2() throws RemotingCommandException {
+ createConsumerIdList(10);
+ createMessageQueueList(4);
+ for (String clientId : consumerIdList) {
+ RemotingCommand request = buildAllocateMessageQueueRequest(clientId, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getBody()).isNotNull();
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+ }
+
+ @Test
+ public void testRun10RandomCase() throws RemotingCommandException {
+ for (int i = 0; i < 10; i++) {
+ int consumerSize = new Random().nextInt(20) + 2;
+ int queueSize = new Random().nextInt(20) + 4;
+ testAllocateMessageQueue(consumerSize, queueSize);
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private void testAllocateMessageQueue(int consumerSize, int queueSize) throws RemotingCommandException {
+ createConsumerIdList(consumerSize);
+ createMessageQueueList(queueSize);
+
+ Map<MessageQueue, String> allocateToAllOrigin = new TreeMap<MessageQueue, String>();
+ List<MessageQueue> allocatedResAll = new ArrayList<MessageQueue>();
+ // test allocate all
+ {
+ List<String> cidBegin = new ArrayList<String>(consumerIdList);
+ for (String cid : cidBegin) {
+ RemotingCommand request = buildAllocateMessageQueueRequest(cid, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getBody()).isNotNull();
+ List<MessageQueue> rs = AllocateMessageQueueRequestBody.decode(response.getBody(),
+ AllocateMessageQueueResponseBody.class).getAllocateResult();
+ for (MessageQueue mq : rs) {
+ allocateToAllOrigin.put(mq, cid);
+ }
+ allocatedResAll.addAll(rs);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ Assert.assertTrue(verifyAllocateAllConsumer(cidBegin, messageQueueList, allocatedResAll));
+ }
+
+ Map<MessageQueue, String> allocateToAllAfterRemoveOneConsumer = new TreeMap<MessageQueue, String>();
+ List<String> cidAfterRemoveOneConsumer = new ArrayList<String>(consumerIdList);
+ // test allocate after removing one cid
+ {
+ String removeCID = cidAfterRemoveOneConsumer.remove(0);
+ unregisterConsumer(removeCID);
+ List<MessageQueue> mqShouldBeChanged = new ArrayList<MessageQueue>();
+ for (Map.Entry<MessageQueue, String> entry : allocateToAllOrigin.entrySet()) {
+ if (entry.getValue().equals(removeCID)) {
+ mqShouldBeChanged.add(entry.getKey());
+ }
+ }
+
+ List<MessageQueue> allocatedResAllAfterRemoveOneConsumer = new ArrayList<MessageQueue>();
+ for (String cid : cidAfterRemoveOneConsumer) {
+ RemotingCommand request = buildAllocateMessageQueueRequest(cid, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getBody()).isNotNull();
+ List<MessageQueue> rs = AllocateMessageQueueRequestBody.decode(response.getBody(),
+ AllocateMessageQueueResponseBody.class).getAllocateResult();
+ allocatedResAllAfterRemoveOneConsumer.addAll(rs);
+ for (MessageQueue mq : rs) {
+ allocateToAllAfterRemoveOneConsumer.put(mq, cid);
+ }
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ Assert.assertTrue(verifyAllocateAllConsumer(cidAfterRemoveOneConsumer, messageQueueList, allocatedResAllAfterRemoveOneConsumer));
+ verifyAfterRemoveConsumer(allocateToAllOrigin, allocateToAllAfterRemoveOneConsumer, removeCID);
+ }
+
+ Map<MessageQueue, String> allocateToAllAfterAddOneConsumer = new TreeMap<MessageQueue, String>();
+ List<String> cidAfterAddOneConsumer = new ArrayList<String>(cidAfterRemoveOneConsumer);
+ // test allocate after adding one more cid
+ {
+ String newCid = CID_PREFIX + "NEW";
+ cidAfterAddOneConsumer.add(newCid);
+ registerConsumer(newCid);
+
+ List<MessageQueue> mqShouldOnlyChanged = new ArrayList<MessageQueue>();
+ List<MessageQueue> allocatedResAllAfterAddOneConsumer = new ArrayList<MessageQueue>();
+ for (String cid : cidAfterAddOneConsumer) {
+ RemotingCommand request = buildAllocateMessageQueueRequest(cid, messageQueueList);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getBody()).isNotNull();
+ List<MessageQueue> rs = AllocateMessageQueueRequestBody.decode(response.getBody(),
+ AllocateMessageQueueResponseBody.class).getAllocateResult();
+ allocatedResAllAfterAddOneConsumer.addAll(rs);
+ for (MessageQueue mq : rs) {
+ allocateToAllAfterAddOneConsumer.put(mq, cid);
+ if (cid.equals(newCid)) {
+ mqShouldOnlyChanged.add(mq);
+ }
+ }
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ Assert.assertTrue(verifyAllocateAllConsumer(cidAfterAddOneConsumer, messageQueueList, allocatedResAllAfterAddOneConsumer));
+ verifyAfterAddConsumer(allocateToAllAfterRemoveOneConsumer, allocateToAllAfterAddOneConsumer, newCid);
+ }
+
+ Map<MessageQueue, String> allocateToAllAfterRemoveTwoMq = new TreeMap<MessageQueue, String>();
+ List<MessageQueue> mqAfterRemoveTwoMq = new ArrayList<>(messageQueueList);
+ // test allocate after removing two message queues
+ {
+ for (int i = 0; i < 2; i++) {
+ mqAfterRemoveTwoMq.remove(i);
+ }
+
+ List<MessageQueue> allocatedResAfterRemoveTwoMq = new ArrayList<MessageQueue>();
+ for (String cid : cidAfterAddOneConsumer) {
+ RemotingCommand request = buildAllocateMessageQueueRequest(cid, mqAfterRemoveTwoMq);
+ RemotingCommand response = consumerManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getBody()).isNotNull();
+ List<MessageQueue> rs = AllocateMessageQueueRequestBody.decode(response.getBody(),
+ AllocateMessageQueueResponseBody.class).getAllocateResult();
+ allocatedResAfterRemoveTwoMq.addAll(rs);
+ for (MessageQueue mq : rs) {
+ allocateToAllAfterRemoveTwoMq.put(mq, cid);
+ }
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ Assert.assertTrue(verifyAllocateAllConsumer(cidAfterAddOneConsumer, mqAfterRemoveTwoMq, allocatedResAfterRemoveTwoMq));
+ }
+ }
+
+ private void verifyAfterAddConsumer(Map<MessageQueue, String> allocateBefore,
+ Map<MessageQueue, String> allocateAfter, String newCID) {
+ for (MessageQueue mq : allocateAfter.keySet()) {
+ String allocateToOrigin = allocateBefore.get(mq);
+ String allocateToAfter = allocateAfter.get(mq);
+ if (!allocateToAfter.equals(newCID)) { // the rest queues should be the same
+ Assert.assertEquals(allocateToAfter, allocateToOrigin);
+ }
+ }
+ }
+
+ private void verifyAfterRemoveConsumer(Map<MessageQueue, String> allocateToBefore,
+ Map<MessageQueue, String> allocateAfter, String removeCID) {
+ for (MessageQueue mq : allocateToBefore.keySet()) {
+ String allocateToOrigin = allocateToBefore.get(mq);
+ String allocateToAfter = allocateAfter.get(mq);
+ if (!allocateToOrigin.equals(removeCID)) { // the rest queues should be the same
+ Assert.assertEquals(allocateToAfter, allocateToOrigin); // should be the same
+ }
+ }
+ }
+
+ private boolean verifyAllocateAllConsumer(List<String> cidAll, List<MessageQueue> mqAll,
+ List<MessageQueue> allocatedResAll) {
+ if (cidAll.isEmpty()) {
+ return allocatedResAll.isEmpty();
+ }
+ return mqAll.containsAll(allocatedResAll) && allocatedResAll.containsAll(mqAll);
+ }
+
+ private void registerConsumer(String clientId) {
+ Channel channel = mock(Channel.class);
+ ClientChannelInfo clientChannelInfo = new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100);
+ ConsumerData consumerData = createConsumerData(group, topic);
+ brokerController.getConsumerManager().registerConsumer(
+ consumerData.getGroupName(),
+ clientChannelInfo,
+ consumerData.getConsumeType(),
+ consumerData.getMessageModel(),
+ consumerData.getConsumeFromWhere(),
+ consumerData.getSubscriptionDataSet(),
+ false);
+ consumerIdList.add(clientId);
+ }
+
+ private void unregisterConsumer(String clientId) {
+ Channel channel = brokerController.getConsumerManager().getConsumerGroupInfo(group).findChannel(clientId).getChannel();
+ brokerController.getConsumerManager().unregisterConsumer(group,
+ new ClientChannelInfo(channel, clientId, LanguageCode.JAVA, 100), true);
+ consumerIdList.remove(clientId);
+ }
+
+ private RemotingCommand buildAllocateMessageQueueRequest(String clientId, List<MessageQueue> messageQueueList) {
+ AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader();
+ requestHeader.setConsumerGroup(group);
+ requestHeader.setClientID(clientId);
+ requestHeader.setStrategyName(AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY);
+
+ AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody();
+ requestBody.setMqAll(messageQueueList);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader);
+ request.setBody(requestBody.encode());
+ request.makeCustomHeaderToNet();
+
+ return request;
+ }
+
+ private void createConsumerIdList(int size) {
+ for (int i = 0; i < size; i++) {
+ String clientId = CID_PREFIX + i;
+ registerConsumer(clientId);
+ }
+ }
+
+ private void createMessageQueueList(int size) {
+ for (int i = 0; i < size; i++) {
+ MessageQueue mq = new MessageQueue(topic, "brokerName", i);
+ messageQueueList.add(mq);
+ }
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
index 6718eb5..3ec19f4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumer.java
@@ -19,10 +19,10 @@ package org.apache.rocketmq.client.consumer;
import java.util.Collection;
import java.util.List;
import org.apache.rocketmq.client.ClientConfig;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultLitePullConsumerImpl;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
public class DefaultLitePullConsumer extends ClientConfig implements LitePullConsumer {
@@ -66,6 +67,11 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
* Consumption pattern,default is clustering
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
+
+ /**
+ * The switch for applying the rebalancing calculation task at the broker side
+ */
+ private boolean rebalanceByBroker = false;
/**
* Message queue listener
*/
@@ -409,6 +415,14 @@ public class DefaultLitePullConsumer extends ClientConfig implements LitePullCon
this.messageModel = messageModel;
}
+ public boolean isRebalanceByBroker() {
+ return rebalanceByBroker;
+ }
+
+ public void setRebalanceByBroker(boolean rebalanceByBroker) {
+ this.rebalanceByBroker = rebalanceByBroker;
+ }
+
public String getConsumerGroup() {
return consumerGroup;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
index 0876a94..b9069dc 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPullConsumer.java
@@ -20,17 +20,18 @@ import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.client.ClientConfig;
import org.apache.rocketmq.client.QueryResult;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -66,6 +67,10 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
*/
private MessageModel messageModel = MessageModel.CLUSTERING;
/**
+ * The switch for applying the rebalancing calculation task at the broker side
+ */
+ private boolean rebalanceByBroker = false;
+ /**
* Message queue listener
*/
private MessageQueueListener messageQueueListener;
@@ -245,6 +250,14 @@ public class DefaultMQPullConsumer extends ClientConfig implements MQPullConsume
this.messageModel = messageModel;
}
+ public boolean isRebalanceByBroker() {
+ return rebalanceByBroker;
+ }
+
+ public void setRebalanceByBroker(boolean rebalanceByBroker) {
+ this.rebalanceByBroker = rebalanceByBroker;
+ }
+
public MessageQueueListener getMessageQueueListener() {
return messageQueueListener;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 9011117..743dda7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -24,7 +24,6 @@ import org.apache.rocketmq.client.QueryResult;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
@@ -33,6 +32,7 @@ import org.apache.rocketmq.client.log.ClientLogger;
import org.apache.rocketmq.client.trace.AsyncTraceDispatcher;
import org.apache.rocketmq.client.trace.TraceDispatcher;
import org.apache.rocketmq.client.trace.hook.ConsumeMessageTraceHookImpl;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -41,6 +41,7 @@ import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@ -93,6 +94,19 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
private MessageModel messageModel = MessageModel.CLUSTERING;
/**
+ * The switch for applying the rebalancing calculation task at the broker side.
+ * </p>
+ *
+ * RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with
+ * the same {@link #consumerGroup} would execute rebalancing calculations at the client side in default. The switch
+ * is responsible for shifting the rebalancing calculation task to the broker side.
+ * </p>
+ *
+ * This field defaults to false.
+ */
+ private boolean rebalanceByBroker = false;
+
+ /**
* Consuming point on consumer booting.
* </p>
*
@@ -574,6 +588,14 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
this.messageModel = messageModel;
}
+ public boolean isRebalanceByBroker() {
+ return rebalanceByBroker;
+ }
+
+ public void setRebalanceByBroker(boolean rebalanceByBroker) {
+ this.rebalanceByBroker = rebalanceByBroker;
+ }
+
public int getPullBatchSize() {
return pullBatchSize;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index c64d7c5..6ebb8ad 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -63,6 +63,7 @@ import org.apache.rocketmq.common.namesrv.TopAddressing;
import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.AllocateMessageQueueRequestBody;
import org.apache.rocketmq.common.protocol.body.BrokerStatsData;
import org.apache.rocketmq.common.protocol.body.CheckClientRequestBody;
import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
@@ -86,6 +87,8 @@ import org.apache.rocketmq.common.protocol.body.SubscriptionGroupWrapper;
import org.apache.rocketmq.common.protocol.body.TopicConfigSerializeWrapper;
import org.apache.rocketmq.common.protocol.body.TopicList;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueRequestHeader;
+import org.apache.rocketmq.common.protocol.header.AllocateMessageQueueResponseBody;
import org.apache.rocketmq.common.protocol.header.CloneGroupOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumeMessageDirectlyResultRequestHeader;
import org.apache.rocketmq.common.protocol.header.ConsumerSendMsgBackRequestHeader;
@@ -901,6 +904,39 @@ public class MQClientAPIImpl {
throw new MQBrokerException(response.getCode(), response.getRemark());
}
+ public List<MessageQueue> getAllocateResultByStrategy(final String addr, final String group, final String clientId,
+ final String strategyName, final List<MessageQueue> mqAll, final long timeoutMillis)
+ throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
+ MQBrokerException, InterruptedException {
+ AllocateMessageQueueRequestHeader requestHeader = new AllocateMessageQueueRequestHeader();
+ requestHeader.setConsumerGroup(group);
+ requestHeader.setClientID(clientId);
+ requestHeader.setStrategyName(strategyName);
+
+ AllocateMessageQueueRequestBody requestBody = new AllocateMessageQueueRequestBody();
+ requestBody.setMqAll(mqAll);
+
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.ALLOCATE_MESSAGE_QUEUE, requestHeader);
+ request.setBody(requestBody.encode());
+
+ RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
+ request, timeoutMillis);
+ assert response != null;
+ switch (response.getCode()) {
+ case ResponseCode.SUCCESS: {
+ if (response.getBody() != null) {
+ AllocateMessageQueueResponseBody body = AllocateMessageQueueRequestBody.decode(response.getBody(),
+ AllocateMessageQueueResponseBody.class);
+ return body.getAllocateResult();
+ }
+ }
+ default:
+ break;
+ }
+
+ throw new MQBrokerException(response.getCode(), response.getRemark());
+ }
+
public long getMinOffset(final String addr, final String topic, final int queueId, final long timeoutMillis)
throws RemotingException, MQBrokerException, InterruptedException {
GetMinOffsetRequestHeader requestHeader = new GetMinOffsetRequestHeader();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index e3d60ff..5d06df4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -874,6 +874,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
}
@Override
+ public boolean rebalanceByBroker() {
+ return this.defaultLitePullConsumer.isRebalanceByBroker();
+ }
+
+ @Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_ACTIVELY;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
index afd72a0..8bd1c31 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPullConsumerImpl.java
@@ -346,6 +346,11 @@ public class DefaultMQPullConsumerImpl implements MQConsumerInner {
}
@Override
+ public boolean rebalanceByBroker() {
+ return this.defaultMQPullConsumer.isRebalanceByBroker();
+ }
+
+ @Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_ACTIVELY;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index ab585ea..afefd97 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -991,6 +991,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
}
@Override
+ public boolean rebalanceByBroker() {
+ return this.defaultMQPushConsumer.isRebalanceByBroker();
+ }
+
+ @Override
public ConsumeType consumeType() {
return ConsumeType.CONSUME_PASSIVELY;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
index c2e8a1d..242820f 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/MQConsumerInner.java
@@ -32,6 +32,8 @@ public interface MQConsumerInner {
MessageModel messageModel();
+ boolean rebalanceByBroker();
+
ConsumeType consumeType();
ConsumeFromWhere consumeFromWhere();
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index b8972a9..a5a22fe 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@ -27,18 +27,19 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueStrategyConstants;
+import org.apache.rocketmq.logging.InternalLogger;
public abstract class RebalanceImpl {
protected static final InternalLogger log = ClientLogger.getLog();
@@ -277,16 +278,30 @@ public abstract class RebalanceImpl {
AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
List<MessageQueue> allocateResult = null;
- try {
- allocateResult = strategy.allocate(
+
+ if (isRebalanceByBroker()) {
+ allocateResult = this.mQClientFactory.getAllocateResult(
+ topic,
this.consumerGroup,
- this.mQClientFactory.getClientId(),
- mqAll,
- cidAll);
- } catch (Throwable e) {
- log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
- e);
- return;
+ strategy.getName(),
+ mqAll);
+ } else {
+ if (strategy.getName().equals(AllocateMessageQueueStrategyConstants.ALLOCATE_MESSAGE_QUEUE_STICKY)) {
+ log.error("AllocateMessageQueueStrategy is not supported while rebalanceByBroker=false. allocateMessageQueueStrategyName={}",
+ strategy.getName());
+ return;
+ }
+ try {
+ allocateResult = strategy.allocate(
+ this.consumerGroup,
+ this.mQClientFactory.getClientId(),
+ mqAll,
+ cidAll);
+ } catch (Throwable e) {
+ log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
+ e);
+ return;
+ }
}
Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
@@ -412,6 +427,8 @@ public abstract class RebalanceImpl {
public abstract void dispatchPullRequest(final List<PullRequest> pullRequestList);
+ public abstract boolean isRebalanceByBroker();
+
public void removeProcessQueue(final MessageQueue mq) {
ProcessQueue prev = this.processQueueTable.remove(mq);
if (prev != null) {
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
index 9d1ea74..31c3fda 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceLitePullImpl.java
@@ -18,11 +18,11 @@ package org.apache.rocketmq.client.impl.consumer;
import java.util.List;
import java.util.Set;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -142,4 +142,9 @@ public class RebalanceLitePullImpl extends RebalanceImpl {
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
}
+ @Override
+ public boolean isRebalanceByBroker() {
+ return this.litePullConsumerImpl.rebalanceByBroker();
+ }
+
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
index 9dd408c..e9373f6 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePullImpl.java
@@ -18,9 +18,9 @@ package org.apache.rocketmq.client.impl.consumer;
import java.util.List;
import java.util.Set;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.MessageQueueListener;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
@@ -76,4 +76,9 @@ public class RebalancePullImpl extends RebalanceImpl {
@Override
public void dispatchPullRequest(List<PullRequest> pullRequestList) {
}
+
+ @Override
+ public boolean isRebalanceByBroker() {
+ return this.defaultMQPullConsumerImpl.rebalanceByBroker();
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index e5166f3..8341480 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -19,11 +19,11 @@ package org.apache.rocketmq.client.impl.consumer;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
@@ -218,4 +218,9 @@ public class RebalancePushImpl extends RebalanceImpl {
log.info("doRebalance, {}, add a new pull request {}", consumerGroup, pullRequest);
}
}
+
+ @Override
+ public boolean isRebalanceByBroker() {
+ return this.defaultMQPushConsumerImpl.rebalanceByBroker();
+ }
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index e937ce3..aad5134 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.client.impl.factory;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -442,7 +443,7 @@ public class MQClientInstance {
}
// may need to check one broker every cluster...
// assume that the configs of every broker in cluster are the the same.
- String addr = findBrokerAddrByTopic(subscriptionData.getTopic());
+ String addr = findRandomBrokerAddrByTopic(subscriptionData.getTopic());
if (addr != null) {
try {
@@ -1069,10 +1070,10 @@ public class MQClientInstance {
}
public List<String> findConsumerIdList(final String topic, final String group) {
- String brokerAddr = this.findBrokerAddrByTopic(topic);
+ String brokerAddr = this.findRandomBrokerAddrByTopic(topic);
if (null == brokerAddr) {
this.updateTopicRouteInfoFromNameServer(topic);
- brokerAddr = this.findBrokerAddrByTopic(topic);
+ brokerAddr = this.findRandomBrokerAddrByTopic(topic);
}
if (null != brokerAddr) {
@@ -1086,7 +1087,7 @@ public class MQClientInstance {
return null;
}
- public String findBrokerAddrByTopic(final String topic) {
+ public String findRandomBrokerAddrByTopic(final String topic) {
TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
if (topicRouteData != null) {
List<BrokerData> brokers = topicRouteData.getBrokerDatas();
@@ -1100,6 +1101,24 @@ public class MQClientInstance {
return null;
}
+ public String findUniqueBrokerAddrByTopic(final String topic) {
+ TopicRouteData topicRouteData = this.topicRouteTable.get(topic);
+ if (topicRouteData != null) {
+ List<BrokerData> brokers = topicRouteData.getBrokerDatas();
+ if (!brokers.isEmpty()) {
+ Collections.sort(brokers, new Comparator<BrokerData>() {
+ @Override public int compare(BrokerData o1, BrokerData o2) {
+ return o1.getBrokerName().compareTo(o2.getBrokerName());
+ }
+ });
+ BrokerData bd = brokers.get(0);
+ return bd.selectBrokerAddr();
+ }
+ }
+
+ return null;
+ }
+
public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
DefaultMQPushConsumerImpl consumer = null;
try {
@@ -1161,6 +1180,26 @@ public class MQClientInstance {
}
}
+ public List<MessageQueue> getAllocateResult(final String topic, final String group, final String strategyName,
+ final List<MessageQueue> mqAll) {
+ String brokerAddr = this.findUniqueBrokerAddrByTopic(topic);
+ if (null == brokerAddr) {
+ this.updateTopicRouteInfoFromNameServer(topic);
+ brokerAddr = this.findUniqueBrokerAddrByTopic(topic);
+ }
+
+ if (null != brokerAddr) {
+ try {
+ return this.mQClientAPIImpl.getAllocateResultByStrategy(brokerAddr, group, clientId, strategyName,
+ mqAll, 3000);
+ } catch (Exception e) {
+ log.warn("getAllocateResultByStrategy exception, {} {}", brokerAddr, group, e);
+ }
+ }
+
+ return null;
+ }
+
public TopicRouteData getAnExistTopicRouteData(final String topic) {
return this.topicRouteTable.get(topic);
}
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java
index 0d394c3..755ad90 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearByTest.java
@@ -23,8 +23,10 @@ import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.rebalance.AllocateMachineRoomNearby;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
diff --git a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
index 98ce7b6..aa9cfba 100644
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsitentHashTest.java
@@ -22,8 +22,9 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.TreeMap;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueConsistentHash;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
diff --git a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
index 796a394..68260c1 100644
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
@@ -20,13 +20,13 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java b/common/src/main/java/org/apache/rocketmq/common/AllocateMessageQueueStrategy.java
similarity index 97%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
rename to common/src/main/java/org/apache/rocketmq/common/AllocateMessageQueueStrategy.java
index c1f0604..e467609 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/AllocateMessageQueueStrategy.java
+++ b/common/src/main/java/org/apache/rocketmq/common/AllocateMessageQueueStrategy.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.consumer;
+package org.apache.rocketmq.common;
import java.util.List;
import org.apache.rocketmq.common.message.MessageQueue;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 75ceff3..90642c2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@ -33,6 +33,8 @@ public class RequestCode {
public static final int GET_TOPIC_NAME_LIST = 23;
+ public static final int ALLOCATE_MESSAGE_QUEUE = 24;
+
public static final int UPDATE_BROKER_CONFIG = 25;
public static final int GET_BROKER_CONFIG = 26;
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
index dc74444..0cef0af 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/ResponseCode.java
@@ -80,4 +80,5 @@ public class ResponseCode extends RemotingSysResponseCode {
public static final int UPDATE_GLOBAL_WHITE_ADDRS_CONFIG_FAILED = 211;
+ public static final int ALLOCATE_MESSAGE_QUEUE_FAILED = 212;
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/common/src/main/java/org/apache/rocketmq/common/protocol/body/AllocateMessageQueueRequestBody.java
similarity index 54%
copy from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/body/AllocateMessageQueueRequestBody.java
index e548803..3e05158 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/AllocateMessageQueueRequestBody.java
@@ -14,31 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.consumer.rebalance;
+
+package org.apache.rocketmq.common.protocol.body;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
- private List<MessageQueue> messageQueueList;
-
- @Override
- public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
- List<String> cidAll) {
- return this.messageQueueList;
- }
-
- @Override
- public String getName() {
- return "CONFIG";
- }
+public class AllocateMessageQueueRequestBody extends RemotingSerializable {
+ private List<MessageQueue> mqAll;
- public List<MessageQueue> getMessageQueueList() {
- return messageQueueList;
+ public List<MessageQueue> getMqAll() {
+ return mqAll;
}
- public void setMessageQueueList(List<MessageQueue> messageQueueList) {
- this.messageQueueList = messageQueueList;
+ public void setMqAll(List<MessageQueue> mqAll) {
+ this.mqAll = mqAll;
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueRequestHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueRequestHeader.java
new file mode 100644
index 0000000..4c0c11e
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueRequestHeader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.annotation.CFNotNull;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class AllocateMessageQueueRequestHeader implements CommandCustomHeader {
+ @CFNotNull
+ private String consumerGroup;
+ @CFNotNull
+ private String clientID;
+ @CFNotNull
+ private String strategyName;
+
+ @Override public void checkFields() throws RemotingCommandException {
+ }
+
+ public String getConsumerGroup() {
+ return consumerGroup;
+ }
+
+ public void setConsumerGroup(String consumerGroup) {
+ this.consumerGroup = consumerGroup;
+ }
+
+ public String getClientID() {
+ return clientID;
+ }
+
+ public void setClientID(String clientID) {
+ this.clientID = clientID;
+ }
+
+ public String getStrategyName() {
+ return strategyName;
+ }
+
+ public void setStrategyName(String strategyName) {
+ this.strategyName = strategyName;
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseBody.java
similarity index 54%
copy from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
copy to common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseBody.java
index e548803..a1d4f47 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseBody.java
@@ -14,31 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.consumer.rebalance;
+
+package org.apache.rocketmq.common.protocol.header;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
-public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
- private List<MessageQueue> messageQueueList;
-
- @Override
- public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
- List<String> cidAll) {
- return this.messageQueueList;
- }
-
- @Override
- public String getName() {
- return "CONFIG";
- }
+public class AllocateMessageQueueResponseBody extends RemotingSerializable {
+ private List<MessageQueue> allocateResult;
- public List<MessageQueue> getMessageQueueList() {
- return messageQueueList;
+ public List<MessageQueue> getAllocateResult() {
+ return allocateResult;
}
- public void setMessageQueueList(List<MessageQueue> messageQueueList) {
- this.messageQueueList = messageQueueList;
+ public void setAllocateResult(List<MessageQueue> allocateResult) {
+ this.allocateResult = allocateResult;
}
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseHeader.java b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseHeader.java
new file mode 100644
index 0000000..7fc460b
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/AllocateMessageQueueResponseHeader.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.protocol.header;
+
+import org.apache.rocketmq.remoting.CommandCustomHeader;
+import org.apache.rocketmq.remoting.exception.RemotingCommandException;
+
+public class AllocateMessageQueueResponseHeader implements CommandCustomHeader {
+
+ @Override public void checkFields() throws RemotingCommandException {
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMachineRoomNearby.java
similarity index 94%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMachineRoomNearby.java
index ec0f7f6..4750ba2 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMachineRoomNearby.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMachineRoomNearby.java
@@ -14,17 +14,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
* An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
@@ -35,7 +36,7 @@ import org.apache.rocketmq.logging.InternalLogger;
* no alive consumer to monopolize them.
*/
public class AllocateMachineRoomNearby implements AllocateMessageQueueStrategy {
- private final InternalLogger log = ClientLogger.getLog();
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private final AllocateMessageQueueStrategy allocateMessageQueueStrategy;//actual allocate strategy
private final MachineRoomResolver machineRoomResolver;
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragely.java
similarity index 88%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragely.java
index 155e692..525bc82 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragely.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragely.java
@@ -14,20 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
import java.util.ArrayList;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
* Average Hashing queue algorithm
*/
public class AllocateMessageQueueAveragely implements AllocateMessageQueueStrategy {
- private final InternalLogger log = ClientLogger.getLog();
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragelyByCircle.java
similarity index 87%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragelyByCircle.java
index fe78f0a..4bdfa45 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueAveragelyByCircle.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueAveragelyByCircle.java
@@ -14,20 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
import java.util.ArrayList;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
* Cycle average Hashing queue algorithm
*/
public class AllocateMessageQueueAveragelyByCircle implements AllocateMessageQueueStrategy {
- private final InternalLogger log = ClientLogger.getLog();
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
@Override
public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll,
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByConfig.java
similarity index 92%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByConfig.java
index e548803..a5aaf66 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByConfig.java
@@ -14,10 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
public class AllocateMessageQueueByConfig implements AllocateMessageQueueStrategy {
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByMachineRoom.java
similarity index 95%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByMachineRoom.java
index 3756831..bac3dac 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueByMachineRoom.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueByMachineRoom.java
@@ -14,12 +14,12 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.message.MessageQueue;
/**
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueConsistentHash.java
similarity index 92%
rename from client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
rename to common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueConsistentHash.java
index 65dcf79..8baf7c7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/rebalance/AllocateMessageQueueConsistentHash.java
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueConsistentHash.java
@@ -14,24 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.rocketmq.client.consumer.rebalance;
+package org.apache.rocketmq.common.rebalance;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
-import org.apache.rocketmq.client.log.ClientLogger;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
import org.apache.rocketmq.common.consistenthash.ConsistentHashRouter;
import org.apache.rocketmq.common.consistenthash.HashFunction;
import org.apache.rocketmq.common.consistenthash.Node;
-import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
/**
* Consistent Hashing queue algorithm
*/
public class AllocateMessageQueueConsistentHash implements AllocateMessageQueueStrategy {
- private final InternalLogger log = ClientLogger.getLog();
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
private final int virtualNodeCnt;
private final HashFunction customHashFunction;
diff --git a/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueSticky.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueSticky.java
new file mode 100644
index 0000000..06a9aba
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueSticky.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.rebalance;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+import org.apache.rocketmq.common.AllocateMessageQueueStrategy;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+
+public class AllocateMessageQueueSticky implements AllocateMessageQueueStrategy {
+ private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+
+ private final Map<String, List<MessageQueue>> messageQueueAllocation = new HashMap<String, List<MessageQueue>>();
+
+ private final List<MessageQueue> unassignedQueues = new ArrayList<MessageQueue>();
+
+ @Override
+ public List<MessageQueue> allocate(String consumerGroup, String currentCID,
+ List<MessageQueue> mqAll, final List<String> cidAll) {
+ if (currentCID == null || currentCID.length() < 1) {
+ throw new IllegalArgumentException("currentCID is empty");
+ }
+ if (mqAll == null || mqAll.isEmpty()) {
+ throw new IllegalArgumentException("mqAll is null or mqAll empty");
+ }
+ if (cidAll == null || cidAll.isEmpty()) {
+ throw new IllegalArgumentException("cidAll is null or cidAll empty");
+ }
+
+ List<MessageQueue> result = new ArrayList<MessageQueue>();
+ if (!cidAll.contains(currentCID)) {
+ log.info("[BUG] ConsumerGroup: {} The consumerId: {} not in cidAll: {}",
+ consumerGroup,
+ currentCID,
+ cidAll);
+ return result;
+ }
+
+ // Rebalance (or fresh assignment) needed
+ if (cidAll.size() != messageQueueAllocation.size() || mqAll.size() != getPrevMqAll().size()) {
+ // Update messageQueueAllocation
+ updateMessageQueueAllocation(mqAll, cidAll);
+
+ // Sort consumers based on how many message queues are assigned to them
+ TreeSet<String> sortedSubscriptions = new TreeSet<String>(new ConsumerComparator(messageQueueAllocation));
+ sortedSubscriptions.addAll(messageQueueAllocation.keySet());
+
+ // Assign unassignedQueues to consumers so that queues allocations are as balanced as possible
+ for (MessageQueue mq : unassignedQueues) {
+ String consumer = sortedSubscriptions.first();
+ sortedSubscriptions.remove(consumer);
+ messageQueueAllocation.get(consumer).add(mq);
+ sortedSubscriptions.add(consumer);
+ }
+ unassignedQueues.clear();
+
+ // Reassignment until no message queue can be moved to improve the balance
+ Map<String, List<MessageQueue>> preBalanceAllocation = new HashMap<String, List<MessageQueue>>(messageQueueAllocation);
+ while (!isBalanced(sortedSubscriptions)) {
+ String leastSubscribedConsumer = sortedSubscriptions.first();
+ String mostSubscribedConsumer = sortedSubscriptions.last();
+ MessageQueue mqFromMostSubscribedConsumer = messageQueueAllocation.get(mostSubscribedConsumer).get(0);
+ messageQueueAllocation.get(leastSubscribedConsumer).add(mqFromMostSubscribedConsumer);
+ messageQueueAllocation.get(mostSubscribedConsumer).remove(mqFromMostSubscribedConsumer);
+ }
+
+ // Make sure it is getting a more balanced allocation than before; otherwise, revert to previous allocation
+ if (getBalanceScore(messageQueueAllocation) >= getBalanceScore(preBalanceAllocation)) {
+ deepCopy(preBalanceAllocation, messageQueueAllocation);
+ }
+ }
+
+ return messageQueueAllocation.get(currentCID);
+ }
+
+ private void updateMessageQueueAllocation(List<MessageQueue> mqAll, List<String> cidAll) {
+ // The current size of consumers is larger than before
+ if (cidAll.size() > messageQueueAllocation.size()) {
+ for (String cid : cidAll) {
+ if (!messageQueueAllocation.containsKey(cid)) {
+ messageQueueAllocation.put(cid, new ArrayList<MessageQueue>());
+ }
+ }
+ }
+
+ // The current size of consumers is smaller than before
+ if (cidAll.size() < messageQueueAllocation.size()) {
+ Iterator<Map.Entry<String, List<MessageQueue>>> it = messageQueueAllocation.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, List<MessageQueue>> entry = it.next();
+ if (!cidAll.contains(entry.getKey())) {
+ it.remove();
+ }
+ }
+ }
+
+ // The current size of message queues is larger than before
+ List<MessageQueue> prevMqAll = getPrevMqAll();
+ if (mqAll.size() > prevMqAll.size()) {
+ for (MessageQueue mq : mqAll) {
+ if (!prevMqAll.contains(mq)) {
+ unassignedQueues.add(mq);
+ }
+ }
+ }
+
+ // The current size of message queues is smaller than before
+ if (mqAll.size() < prevMqAll.size()) {
+ for (MessageQueue prevMq : prevMqAll) {
+ if (!isSameQueueIdExists(mqAll, prevMq.getQueueId())) {
+ for (List<MessageQueue> prevMqs : messageQueueAllocation.values()) {
+ prevMqs.remove(prevMq);
+ }
+ }
+ }
+ }
+ }
+
+ private static class ConsumerComparator implements Comparator<String>, Serializable {
+ private final Map<String, List<MessageQueue>> map;
+
+ ConsumerComparator(Map<String, List<MessageQueue>> map) {
+ this.map = map;
+ }
+
+ @Override
+ public int compare(String o1, String o2) {
+ int ret = map.get(o1).size() - map.get(o2).size();
+ if (ret == 0) {
+ ret = o1.compareTo(o2);
+ }
+ return ret;
+ }
+ }
+
+ private boolean isSameQueueIdExists(List<MessageQueue> mqAll, int prevMqId) {
+ for (MessageQueue mq : mqAll) {
+ if (mq.getQueueId() == prevMqId) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isBalanced(TreeSet<String> sortedCurrentSubscriptions) {
+ int min = this.messageQueueAllocation.get(sortedCurrentSubscriptions.first()).size();
+ int max = this.messageQueueAllocation.get(sortedCurrentSubscriptions.last()).size();
+ // if minimum and maximum numbers of message queues allocated to consumers differ by at most 1
+ return min >= max - 1;
+ }
+
+ /**
+ * @return The balance score of the given allocation, which is the sum of assigned queues size difference of all
+ * consumer. A well balanced allocation with balance score of 0 (all consumers getting the same number of
+ * allocations). Lower balance score represents a more balanced allocation.
+ */
+ private int getBalanceScore(Map<String, List<MessageQueue>> allocation) {
+ int score = 0;
+
+ Map<String, Integer> consumerAllocationSizes = new HashMap<String, Integer>();
+ for (Map.Entry<String, List<MessageQueue>> entry : allocation.entrySet()) {
+ consumerAllocationSizes.put(entry.getKey(), entry.getValue().size());
+ }
+
+ Iterator<Map.Entry<String, Integer>> it = consumerAllocationSizes.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, Integer> entry = it.next();
+ int consumerAllocationSize = entry.getValue();
+ it.remove();
+ for (Map.Entry<String, Integer> otherEntry : consumerAllocationSizes.entrySet()) {
+ score += Math.abs(consumerAllocationSize - otherEntry.getValue());
+ }
+ }
+
+ return score;
+ }
+
+ private void deepCopy(Map<String, List<MessageQueue>> source, Map<String, List<MessageQueue>> dest) {
+ dest.clear();
+ for (Map.Entry<String, List<MessageQueue>> entry : source.entrySet()) {
+ dest.put(entry.getKey(), new ArrayList<MessageQueue>(entry.getValue()));
+ }
+ }
+
+ private List<MessageQueue> getPrevMqAll() {
+ List<MessageQueue> prevMqAll = new ArrayList<MessageQueue>();
+ for (List<MessageQueue> queues : messageQueueAllocation.values()) {
+ prevMqAll.addAll(queues);
+ }
+ return prevMqAll;
+ }
+
+ @Override
+ public String getName() {
+ return "STICKY";
+ }
+}
diff --git a/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueStrategyConstants.java b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueStrategyConstants.java
new file mode 100644
index 0000000..bbfefb3
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/rebalance/AllocateMessageQueueStrategyConstants.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.common.rebalance;
+
+public class AllocateMessageQueueStrategyConstants {
+
+ public static final String ALLOCATE_MACHINE_ROOM_NEARBY = "MACHINE_ROOM_NEARBY";
+
+ public static final String ALLOCATE_MESSAGE_QUEUE_AVERAGELY = "AVG";
+
+ public static final String ALLOCATE_MESSAGE_QUEUE_AVERAGELY_BY_CIRCLE = "AVG_BY_CIRCLE";
+
+ public static final String ALLOCATE_MESSAGE_QUEUE_BY_CONFIG = "CONFIG";
+
+ public static final String ALLOCATE_MESSAGE_QUEUE_BY_MACHINE_ROOM = "MACHINE_ROOM";
+
+ public static final String ALLOCATE_MESSAGE_QUEUE_CONSISTENT_HASH = "CONSISTENT_HASH";
+
+ public static final String ALLOCATE_MESSAGE_QUEUE_STICKY = "STICKY";
+}
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
index 0c97cd3..0e674ef 100644
--- a/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/AclClient.java
@@ -29,7 +29,6 @@ import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
@@ -37,6 +36,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.common.RemotingHelper;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
index a9b9ab0..5e5d446 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/topic/AllocateMQSubCommand.java
@@ -23,10 +23,10 @@ import java.util.Set;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
-import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.route.TopicRouteData;
+import org.apache.rocketmq.common.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;