You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2021/09/24 09:07:09 UTC

[rocketmq] 01/02: Merge branch 'develop' into 5.0.0-preview

This is an automated email from the ASF dual-hosted git repository.

zhoubo pushed a commit to branch 5.0.0-preview
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 2bf133be29c444f0aea648133123a2d9a61ffb07
Merge: 48d3c7e 1e8e728
Author: odbozhou <87...@qq.com>
AuthorDate: Fri Sep 24 15:35:46 2021 +0800

    Merge branch 'develop' into 5.0.0-preview
    
    # Conflicts:
    #	.travis.yml
    #	acl/pom.xml
    #	broker/pom.xml
    #	broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
    #	broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
    #	broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
    #	client/pom.xml
    #	client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
    #	client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
    #	client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
    #	common/pom.xml
    #	common/src/main/java/org/apache/rocketmq/common/MQVersion.java
    #	common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
    #	distribution/pom.xml
    #	example/pom.xml
    #	filter/pom.xml
    #	logappender/pom.xml
    #	logging/pom.xml
    #	namesrv/pom.xml
    #	namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
    #	namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
    #	openmessaging/pom.xml
    #	pom.xml
    #	remoting/pom.xml
    #	srvutil/pom.xml
    #	store/pom.xml
    #	store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
    #	test/pom.xml
    #	tools/pom.xml
    #	tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java

 .github/workflows/greetings.yml                    |  29 +
 README.md                                          |   2 +-
 .../org/apache/rocketmq/acl/common/AclUtils.java   |  69 +-
 .../rocketmq/acl/plain/PlainPermissionManager.java |  53 +-
 .../acl/plain/RemoteAddressStrategyFactory.java    |   2 +-
 .../apache/rocketmq/acl/common/AclUtilsTest.java   |   1 +
 .../acl/plain/PlainAccessValidatorTest.java        |  53 +-
 .../broker/pagecache/ManyMessageTransfer.java      |  27 +
 .../broker/pagecache/OneMessageTransfer.java       |  27 +
 .../broker/pagecache/QueryMessageTransfer.java     |  27 +
 .../broker/plugin/AbstractPluginMessageStore.java  |   4 +-
 .../processor/AbstractSendMessageProcessor.java    |  89 ++-
 .../broker/processor/AdminBrokerProcessor.java     |  20 +-
 .../broker/processor/ReplyMessageProcessor.java    |   4 +-
 .../broker/processor/SendMessageProcessor.java     | 120 ++-
 .../AbstractTransactionalMessageCheckListener.java |   2 -
 .../DefaultTransactionalMessageCheckListener.java  |   4 +-
 .../AbstractSendMessageProcessorTest.java          |  82 ++
 client/pom.xml                                     |   4 -
 .../org/apache/rocketmq/client/ClientConfig.java   |  13 +-
 .../client/consumer/DefaultLitePullConsumer.java   |   4 +-
 .../client/consumer/DefaultMQPushConsumer.java     |  18 +-
 .../rocketmq/client/impl/MQClientAPIImpl.java      |  28 +-
 .../ConsumeMessageConcurrentlyService.java         |  28 +-
 .../consumer/ConsumeMessageOrderlyService.java     |   4 +-
 .../impl/consumer/DefaultLitePullConsumerImpl.java |  64 +-
 .../impl/consumer/DefaultMQPushConsumerImpl.java   |   2 +-
 .../client/impl/consumer/RebalanceImpl.java        |   2 +-
 .../impl/consumer/RebalanceLitePullImpl.java       |   3 +-
 .../client/impl/factory/MQClientInstance.java      |  42 +-
 .../impl/producer/DefaultMQProducerImpl.java       |  61 +-
 .../client/producer/DefaultMQProducer.java         |  54 +-
 .../client/producer/RequestFutureTable.java        |   7 +
 .../client/trace/AsyncTraceDispatcher.java         |  74 +-
 .../rocketmq/client/trace/TraceDataEncoder.java    |   1 -
 .../apache/rocketmq/client/trace/TraceView.java    |   4 +-
 .../rocketmq/client/impl/MQClientAPIImplTest.java  |  27 +
 .../ConsumeMessageConcurrentlyServiceTest.java     |   2 +-
 .../trace/DefaultMQConsumerWithTraceTest.java      |   9 +
 .../DefaultMQLitePullConsumerWithTraceTest.java    |  12 +-
 .../trace/DefaultMQProducerWithTraceTest.java      |  10 +
 .../client/trace/TraceDataEncoderTest.java         | 106 ++-
 .../rocketmq/client/trace/TraceViewTest.java       |   4 +-
 .../org/apache/rocketmq/common/BrokerConfig.java   |   8 +-
 .../java/org/apache/rocketmq/common/MixAll.java    |   1 +
 .../java/org/apache/rocketmq/common/UtilAll.java   |  33 +-
 .../apache/rocketmq/common/message/Message.java    |  11 +-
 .../rocketmq/common/message/MessageDecoder.java    |  41 +-
 .../rocketmq/common/protocol/NamespaceUtil.java    |   2 +-
 .../rocketmq/common/protocol/RequestCode.java      |   2 +
 .../namesrv/AddWritePermOfBrokerRequestHeader.java |  39 +
 .../AddWritePermOfBrokerResponseHeader.java        |  38 +
 .../apache/rocketmq/common/stats/StatsItem.java    |  22 +-
 .../apache/rocketmq/common/stats/StatsItemSet.java |   8 +-
 .../common/utils/NameServerAddressUtils.java       |   1 +
 .../org/apache/rocketmq/common/UtilAllTest.java    |  13 +
 .../common/message/MessageDecoderTest.java         | 108 +++
 .../rocketmq/common/stats/StatsItemSetTest.java    |   4 +-
 .../rocketmq/common/utils/IOTinyUtilsTest.java     |   4 +-
 distribution/benchmark/batchproducer.sh            |  18 +
 distribution/bin/runbroker.cmd                     |   2 +-
 distribution/bin/runbroker.sh                      |   2 +-
 distribution/bin/runserver.cmd                     |   2 +-
 distribution/bin/runserver.sh                      |   6 +-
 distribution/release.xml                           |   4 +
 docs/cn/Configuration_System.md                    |  70 ++
 docs/cn/Deployment.md                              | 159 ++++
 docs/cn/Example_Batch.md                           |  82 ++
 docs/cn/Example_Delay.md                           |  85 +++
 docs/cn/Example_Simple_cn.md                       | 136 ++++
 docs/cn/FAQ.md                                     | 110 +++
 docs/cn/RocketMQ_Example.md                        |   8 +-
 docs/cn/best_practice.md                           |   2 +-
 docs/cn/design.md                                  |   4 +-
 docs/cn/image/rocketmq_architecture_1.png          | Bin 89290 -> 62810 bytes
 docs/cn/image/rocketmq_architecture_3.png          | Bin 106758 -> 74884 bytes
 docs/cn/msg_trace/user_guide.md                    |  19 +-
 docs/cn/operation.md                               |  10 +-
 docs/en/CLITools.md                                |   8 +
 docs/en/Example_Transaction.md                     |   2 +-
 docs/en/best_practice.md                           |   2 +-
 .../client/java/API_Reference_DefaultMQProducer.md |  71 ++
 docs/en/image/rocketmq_architecture_1.png          | Bin 89290 -> 62810 bytes
 docs/en/image/rocketmq_architecture_3.png          | Bin 106758 -> 74884 bytes
 docs/en/msg_trace/user_guide.md                    |  16 +
 docs/en/operation.md                               |   2 +-
 .../rocketmq/example/benchmark/BatchProducer.java  | 403 ++++++++++
 .../rocketmq/example/benchmark/Consumer.java       |  66 +-
 .../rocketmq/example/benchmark/Producer.java       | 157 ++--
 .../example/benchmark/TransactionProducer.java     | 117 +--
 .../rocketmq/example/quickstart/Producer.java      |  33 +
 .../rocketmq/example/simple/PullConsumer.java      | 154 ++--
 .../filter/expression/UnaryExpression.java         |  16 +-
 .../rocketmq/filter/parser/ParseException.java     |   7 +-
 .../rocketmq/filter/parser/TokenMgrError.java      |   3 +-
 .../org/apache/rocketmq/filter/ParserTest.java     |   2 +-
 .../namesrv/processor/DefaultRequestProcessor.java |  24 +
 .../namesrv/routeinfo/RouteInfoManager.java        |  41 +-
 .../namesrv/routeinfo/RouteInfoManagerTest.java    |  52 +-
 pom.xml                                            |   2 +-
 .../rocketmq/remoting/common/RemotingHelper.java   |   2 +-
 .../rocketmq/remoting/netty/NettyClientConfig.java |   8 +-
 .../rocketmq/remoting/netty/NettyLogger.java       |  46 ++
 .../remoting/netty/NettyRemotingAbstract.java      |   7 +-
 .../rocketmq/remoting/netty/NettySystemConfig.java |  16 +
 .../remoting/netty/NettyClientConfigTest.java      |  64 ++
 .../rocketmq/store/AppendMessageCallback.java      |   5 +-
 .../apache/rocketmq/store/AppendMessageResult.java |  17 +
 .../java/org/apache/rocketmq/store/CommitLog.java  | 835 ++++++++-------------
 .../apache/rocketmq/store/DefaultMessageStore.java |  73 +-
 .../java/org/apache/rocketmq/store/MappedFile.java |  26 +-
 .../rocketmq/store/MessageExtBrokerInner.java      |  12 +
 .../org/apache/rocketmq/store/MessageStore.java    |   4 +-
 .../rocketmq/store/SelectMappedBufferResult.java   |   7 -
 .../apache/rocketmq/store/StoreStatsService.java   | 109 +--
 .../rocketmq/store/config/MessageStoreConfig.java  |   7 +-
 .../rocketmq/store/dledger/DLedgerCommitLog.java   | 250 +-----
 .../org/apache/rocketmq/store/ha/HAService.java    |  63 +-
 .../apache/rocketmq/store/ha/WaitNotifyObject.java |  64 +-
 .../store/schedule/ScheduleMessageService.java     |   5 +
 .../apache/rocketmq/store/stats/BrokerStats.java   |   4 +-
 .../rocketmq/store/stats/BrokerStatsManager.java   |  50 +-
 .../apache/rocketmq/store/AppendCallbackTest.java  |  28 +-
 .../apache/rocketmq/store/BatchPutMessageTest.java |  18 +-
 .../rocketmq/store/StoreStatsServiceTest.java      |  18 +-
 .../store/schedule/ScheduleMessageServiceTest.java |   6 +-
 .../test/java/stats/BrokerStatsManagerTest.java    |  47 ++
 .../rocketmq/tools/admin/DefaultMQAdminExt.java    |   8 +-
 .../tools/admin/DefaultMQAdminExtImpl.java         |  12 +-
 .../apache/rocketmq/tools/admin/MQAdminExt.java    |   5 +-
 .../rocketmq/tools/command/MQAdminStartup.java     |   4 +
 .../command/acl/DeleteAccessConfigSubCommand.java  |  14 +-
 .../command/acl/UpdateAccessConfigSubCommand.java  |   8 +-
 .../acl/UpdateGlobalWhiteAddrSubCommand.java       |   6 +-
 .../consumer/GetConsumerConfigSubCommand.java      | 146 ++++
 .../message/QueryMsgByUniqueKeySubCommand.java     |   5 +-
 .../tools/command/message/SendMessageCommand.java  |  17 +-
 ...SubCommand.java => AddWritePermSubCommand.java} |  29 +-
 .../command/namesrv/WipeWritePermSubCommand.java   |   2 +-
 .../tools/admin/DefaultMQAdminExtTest.java         |   7 +
 .../consumer/GetConsumerConfigSubCommandTest.java  |  83 ++
 .../namesrv/AddWritePermSubCommandTest.java        |  37 +
 142 files changed, 3998 insertions(+), 1606 deletions(-)

diff --cc broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 4daa832,9d26e99..d2f751b
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@@ -21,19 -19,15 +21,22 @@@ import com.google.common.collect.Maps
  import io.netty.channel.ChannelHandlerContext;
  import java.net.InetSocketAddress;
  import java.net.SocketAddress;
+ import java.util.HashMap;
  import java.util.List;
 -import java.util.Map;
 +import java.util.Locale;
- import java.util.Map;
++import java.util.Map;Optional
++
+ import java.util.concurrent.ThreadLocalRandom;
+ 
 +import java.util.Optional;
- import java.util.Random;
 +import java.util.concurrent.CompletableFuture;
 +import java.util.concurrent.ConcurrentMap;
 +import java.util.concurrent.atomic.LongAdder;
  import org.apache.rocketmq.broker.BrokerController;
 +import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker;
  import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
  import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
 -import org.apache.rocketmq.common.topic.TopicValidator;
 +import org.apache.rocketmq.broker.topic.TopicConfigManager;
  import org.apache.rocketmq.common.MixAll;
  import org.apache.rocketmq.common.TopicConfig;
  import org.apache.rocketmq.common.TopicFilterType;
diff --cc broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index b3309e1,86aab63..e7b7949
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@@ -154,7 -116,7 +154,8 @@@ import org.apache.rocketmq.remoting.net
  import org.apache.rocketmq.remoting.protocol.LanguageCode;
  import org.apache.rocketmq.remoting.protocol.RemotingCommand;
  import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+ import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
 +import org.apache.rocketmq.srvutil.ConcurrentHashMapUtil;
  import org.apache.rocketmq.store.ConsumeQueue;
  import org.apache.rocketmq.store.ConsumeQueueExt;
  import org.apache.rocketmq.store.DefaultMessageStore;
@@@ -261,31 -235,9 +262,29 @@@ public class AdminBrokerProcessor exten
                  return resumeCheckHalfMessage(ctx, request);
              case RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG:
                  return getBrokerClusterAclConfig(ctx, request);
 +            case RequestCode.GET_TOPIC_CONFIG:
 +                return getTopicConfig(ctx, request);
 +            case RequestCode.UPDATE_TOPIC_LOGICAL_QUEUE_MAPPING:
 +                return updateTopicLogicalQueueMapping(ctx, request);
 +            case RequestCode.DELETE_TOPIC_LOGICAL_QUEUE_MAPPING:
 +                return deleteTopicLogicalQueueMapping(ctx, request);
 +            case RequestCode.QUERY_TOPIC_LOGICAL_QUEUE_MAPPING:
 +                return queryTopicLogicalQueueMapping(ctx, request);
 +            case RequestCode.SEAL_TOPIC_LOGICAL_QUEUE:
 +                return sealTopicLogicalQueue(ctx, request);
 +            case RequestCode.REUSE_TOPIC_LOGICAL_QUEUE:
 +                return reuseTopicLogicalQueue(ctx, request);
 +            case RequestCode.CREATE_MESSAGE_QUEUE_FOR_LOGICAL_QUEUE:
 +                return createMessageQueueForLogicalQueue(ctx, request);
 +            case RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_PREPARE:
 +                return migrateTopicLogicalQueuePrepare(ctx, request);
 +            case RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_COMMIT:
 +                return migrateTopicLogicalQueueCommit(ctx, request);
 +            case RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY:
 +                return migrateTopicLogicalQueueNotify(ctx, request);
              default:
-                 break;
+                 return getUnknownCmdResponse(ctx, request);
          }
- 
-         return null;
      }
  
      @Override
diff --cc broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 97b7e62,3a401e1..692c98c
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@@ -285,16 -292,18 +292,24 @@@ public class SendMessageProcessor exten
          msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
          String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
          MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
-         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+         if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {
+             // There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.
+             // It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.
+             String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
+             msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+             // Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later
+             origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
+         } else {
+             msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+         }
  
 +        LogicalQueueContext logicalQueueContext = super.buildLogicalQueueContext(msgInner.getTopic(), msgInner.getQueueId(), response);
 +        CompletableFuture<RemotingCommand> future = logicalQueueContext.hookBeforePut(ctx, requestHeader, request, response);
 +        if (future != null) {
 +            return future;
 +        }
 +
          CompletableFuture<PutMessageResult> putMessageResult = null;
-         Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
          String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
          if (transFlag != null && Boolean.parseBoolean(transFlag)) {
              if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
diff --cc client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 7a457c1,7677d8b..ab0d885
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@@ -557,201 -401,9 +557,201 @@@ public abstract class RebalanceImpl 
                      log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
                  }
              }
 +
 +        }
 +
 +        if (!allMQLocked) {
 +            mQClientFactory.rebalanceLater(500);
 +        }
 +
 +        this.dispatchPullRequest(pullRequestList, 500);
 +
 +        return changed;
 +    }
 +
 +    private boolean updateMessageQueueAssignment(final String topic, final Set<MessageQueueAssignment> assignments,
 +        final boolean isOrder) {
 +        boolean changed = false;
 +
 +        Map<MessageQueue, MessageQueueAssignment> mq2PushAssignment = new HashMap<MessageQueue, MessageQueueAssignment>();
 +        Map<MessageQueue, MessageQueueAssignment> mq2PopAssignment = new HashMap<MessageQueue, MessageQueueAssignment>();
 +        for (MessageQueueAssignment assignment : assignments) {
 +            MessageQueue messageQueue = assignment.getMessageQueue();
 +            if (messageQueue == null) {
 +                continue;
 +            }
 +            if (MessageRequestMode.POP == assignment.getMode()) {
 +                mq2PopAssignment.put(messageQueue, assignment);
 +            } else {
 +                mq2PushAssignment.put(messageQueue, assignment);
 +            }
 +        }
 +
 +        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
 +            if (mq2PopAssignment.isEmpty() && !mq2PushAssignment.isEmpty()) {
 +                //pop switch to push
 +                //subscribe pop retry topic
 +                try {
 +                    final String retryTopic = KeyBuilder.buildPopRetryTopic(topic, getConsumerGroup());
 +                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
 +                    getSubscriptionInner().put(retryTopic, subscriptionData);
 +                } catch (Exception ignored) {
 +                }
 +
 +            } else if (!mq2PopAssignment.isEmpty() && mq2PushAssignment.isEmpty()) {
 +                //push switch to pop
 +                //unsubscribe pop retry topic
 +                try {
 +                    final String retryTopic = KeyBuilder.buildPopRetryTopic(topic, getConsumerGroup());
 +                    getSubscriptionInner().remove(retryTopic);
 +                } catch (Exception ignored) {
 +                }
 +
 +            }
 +        }
 +
 +        {
 +            // drop process queues no longer belong me
 +            HashMap<MessageQueue, ProcessQueue> removeQueueMap = new HashMap<MessageQueue, ProcessQueue>(this.processQueueTable.size());
 +            Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
 +            while (it.hasNext()) {
 +                Entry<MessageQueue, ProcessQueue> next = it.next();
 +                MessageQueue mq = next.getKey();
 +                ProcessQueue pq = next.getValue();
 +
 +                if (mq.getTopic().equals(topic)) {
 +                    if (!mq2PushAssignment.containsKey(mq)) {
 +                        pq.setDropped(true);
 +                        removeQueueMap.put(mq, pq);
 +                    } else if (pq.isPullExpired() && this.consumeType() == ConsumeType.CONSUME_PASSIVELY) {
 +                        pq.setDropped(true);
 +                        removeQueueMap.put(mq, pq);
 +                        log.error("[BUG]doRebalance, {}, try remove unnecessary mq, {}, because pull is pause, so try to fixed it",
 +                            consumerGroup, mq);
 +                    }
 +                }
 +            }
 +            // remove message queues no longer belong me
 +            for (Entry<MessageQueue, ProcessQueue> entry : removeQueueMap.entrySet()) {
 +                MessageQueue mq = entry.getKey();
 +                ProcessQueue pq = entry.getValue();
 +
 +                if (this.removeUnnecessaryMessageQueue(mq, pq)) {
 +                    this.processQueueTable.remove(mq);
 +                    changed = true;
 +                    log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
 +                }
 +            }
 +        }
 +
 +        {
 +            HashMap<MessageQueue, PopProcessQueue> removeQueueMap = new HashMap<MessageQueue, PopProcessQueue>(this.popProcessQueueTable.size());
 +            Iterator<Entry<MessageQueue, PopProcessQueue>> it = this.popProcessQueueTable.entrySet().iterator();
 +            while (it.hasNext()) {
 +                Entry<MessageQueue, PopProcessQueue> next = it.next();
 +                MessageQueue mq = next.getKey();
 +                PopProcessQueue pq = next.getValue();
 +
 +                if (mq.getTopic().equals(topic)) {
 +                    if (!mq2PopAssignment.containsKey(mq)) {
 +                        //the queue is no longer your assignment
 +                        pq.setDropped(true);
 +                        removeQueueMap.put(mq, pq);
 +                    } else if (pq.isPullExpired() && this.consumeType() == ConsumeType.CONSUME_PASSIVELY) {
 +                        pq.setDropped(true);
 +                        removeQueueMap.put(mq, pq);
 +                        log.error("[BUG]doRebalance, {}, try remove unnecessary pop mq, {}, because pop is pause, so try to fixed it",
 +                            consumerGroup, mq);
 +                    }
 +                }
 +            }
 +            // remove message queues no longer belong me
 +            for (Entry<MessageQueue, PopProcessQueue> entry : removeQueueMap.entrySet()) {
 +                MessageQueue mq = entry.getKey();
 +                PopProcessQueue pq = entry.getValue();
 +
 +                if (this.removeUnnecessaryPopMessageQueue(mq, pq)) {
 +                    this.popProcessQueueTable.remove(mq);
 +                    changed = true;
 +                    log.info("doRebalance, {}, remove unnecessary pop mq, {}", consumerGroup, mq);
 +                }
 +            }
 +        }
 +
 +        {
 +            // add new message queue
 +            boolean allMQLocked = true;
 +            List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
 +            for (MessageQueue mq : mq2PushAssignment.keySet()) {
 +                if (!this.processQueueTable.containsKey(mq)) {
 +                    if (isOrder && !this.lock(mq)) {
 +                        log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
 +                        allMQLocked = false;
 +                        continue;
 +                    }
 +
 +                    this.removeDirtyOffset(mq);
 +                    ProcessQueue pq = createProcessQueue();
 +                    pq.setLocked(true);
 +                    long nextOffset = -1L;
 +                    try {
 +                        nextOffset = this.computePullFromWhereWithException(mq);
-                     } catch (MQClientException e) {
++                    } catch (Exception e) {
 +                        log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
 +                        continue;
 +                    }
 +
 +                    if (nextOffset >= 0) {
 +                        ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
 +                        if (pre != null) {
 +                            log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
 +                        } else {
 +                            log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
 +                            PullRequest pullRequest = new PullRequest();
 +                            pullRequest.setConsumerGroup(consumerGroup);
 +                            pullRequest.setNextOffset(nextOffset);
 +                            pullRequest.setMessageQueue(mq);
 +                            pullRequest.setProcessQueue(pq);
 +                            pullRequestList.add(pullRequest);
 +                            changed = true;
 +                        }
 +                    } else {
 +                        log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
 +                    }
 +                }
 +            }
 +
 +            if (!allMQLocked) {
 +                mQClientFactory.rebalanceLater(500);
 +            }
 +            this.dispatchPullRequest(pullRequestList, 500);
          }
  
 -        this.dispatchPullRequest(pullRequestList);
 +        {
 +            // add new message queue
 +            List<PopRequest> popRequestList = new ArrayList<PopRequest>();
 +            for (MessageQueue mq : mq2PopAssignment.keySet()) {
 +                if (!this.popProcessQueueTable.containsKey(mq)) {
 +                    PopProcessQueue pq = createPopProcessQueue();
 +                    PopProcessQueue pre = this.popProcessQueueTable.putIfAbsent(mq, pq);
 +                    if (pre != null) {
 +                        log.info("doRebalance, {}, mq pop already exists, {}", consumerGroup, mq);
 +                    } else {
 +                        log.info("doRebalance, {}, add a new pop mq, {}", consumerGroup, mq);
 +                        PopRequest popRequest = new PopRequest();
 +                        popRequest.setTopic(topic);
 +                        popRequest.setConsumerGroup(consumerGroup);
 +                        popRequest.setMessageQueue(mq);
 +                        popRequest.setPopProcessQueue(pq);
 +                        popRequest.setInitMode(getConsumeInitMode());
 +                        popRequestList.add(popRequest);
 +                        changed = true;
 +                    }
 +                }
 +            }
 +
 +            this.dispatchPopPullRequest(popRequestList, 500);
 +        }
  
          return changed;
      }
diff --cc client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index c1a50dd,e897d49..dafc4f8
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@@ -676,7 -619,7 +676,7 @@@ public class MQClientInstance 
                              }
                          }
                      } else {
-                         topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3, true, logicalQueueIdsFilter);
 -                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
++                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout(), true, logicalQueueIdsFilter);
                      }
                      if (topicRouteData != null) {
                          TopicRouteData old = this.topicRouteTable.get(topic);
diff --cc client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 8802a9c,bdc103f..ba479d2
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@@ -22,14 -20,10 +22,12 @@@ import java.io.IOException
  import java.net.UnknownHostException;
  import java.util.ArrayList;
  import java.util.Arrays;
 +import java.util.Collections;
  import java.util.HashSet;
  import java.util.List;
 +import java.util.Locale;
  import java.util.Random;
  import java.util.Set;
- import java.util.Timer;
- import java.util.TimerTask;
  import java.util.concurrent.BlockingQueue;
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.ConcurrentMap;
diff --cc client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index c91d55a,e1b3bed..34b34f9
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@@ -37,42 -25,14 +37,44 @@@ import org.apache.rocketmq.client.produ
  import org.apache.rocketmq.client.producer.SendCallback;
  import org.apache.rocketmq.client.producer.SendResult;
  import org.apache.rocketmq.client.producer.SendStatus;
 +import org.apache.rocketmq.common.AclConfig;
 +import org.apache.rocketmq.common.DataVersion;
  import org.apache.rocketmq.common.PlainAccessConfig;
 +import org.apache.rocketmq.common.TopicConfig;
  import org.apache.rocketmq.common.message.Message;
  import org.apache.rocketmq.common.message.MessageConst;
+ import org.apache.rocketmq.common.protocol.RequestCode;
 +import org.apache.rocketmq.common.message.MessageDecoder;
 +import org.apache.rocketmq.common.message.MessageExt;
 +import org.apache.rocketmq.common.message.MessageQueueAssignment;
 +import org.apache.rocketmq.common.message.MessageRequestMode;
  import org.apache.rocketmq.common.protocol.ResponseCode;
 +import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
 +import org.apache.rocketmq.common.protocol.body.QueryAssignmentResponseBody;
 +import org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader;
 +import org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeRequestHeader;
 +import org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeResponseHeader;
 +import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
 +import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
 +import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
 +import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseHeader;
 +import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
 +import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
 +import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
 +import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
 +import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
 +import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
 +import org.apache.rocketmq.common.protocol.header.PopMessageResponseHeader;
 +import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
 +import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
 +import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
  import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
  import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+ import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader;
 +import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
 +import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
 +import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
 +import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
  import org.apache.rocketmq.remoting.InvokeCallback;
  import org.apache.rocketmq.remoting.RemotingClient;
  import org.apache.rocketmq.remoting.exception.RemotingException;
diff --cc common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 17cc2a1,f710cdb..bf2355c
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@@ -58,12 -57,10 +58,11 @@@ public class BrokerConfig 
      @ImportantField
      private boolean traceTopicEnable = false;
      /**
-      * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default
-      * value is 1.
+      * thread numbers for send message thread pool.
       */
-     private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
+     private int sendMessageThreadPoolNums = Math.min(Runtime.getRuntime().availableProcessors(), 4);
      private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
 +    private int ackMessageThreadPoolNums = 3;
      private int processReplyMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
      private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
  
diff --cc common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 3613049,5624a7e..04f126b
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@@ -193,18 -189,5 +193,20 @@@ public class RequestCode 
  
      public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
  
+     public static final int ADD_WRITE_PERM_OF_BROKER = 327;
++
 +    public static final int GET_TOPIC_CONFIG = 351;
 +
 +    public static final int QUERY_ASSIGNMENT = 400;
 +    public static final int SET_MESSAGE_REQUEST_MODE = 401;
 +
 +    public static final int UPDATE_TOPIC_LOGICAL_QUEUE_MAPPING = 411;
 +    public static final int DELETE_TOPIC_LOGICAL_QUEUE_MAPPING = 422;
 +    public static final int QUERY_TOPIC_LOGICAL_QUEUE_MAPPING = 413;
 +    public static final int SEAL_TOPIC_LOGICAL_QUEUE = 414;
 +    public static final int REUSE_TOPIC_LOGICAL_QUEUE = 415;
 +    public static final int CREATE_MESSAGE_QUEUE_FOR_LOGICAL_QUEUE = 416;
 +    public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_PREPARE = 417;
 +    public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_COMMIT = 418;
 +    public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY = 419;
  }
diff --cc store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index b11eb49,5bf68ac..152af7b
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@@ -35,7 -34,7 +35,8 @@@ import java.util.Set
  import java.util.concurrent.CompletableFuture;
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.ExecutionException;
 +import java.util.concurrent.CopyOnWriteArrayList;
  import java.util.concurrent.Executors;
  import java.util.concurrent.ScheduledExecutorService;
  import java.util.concurrent.TimeUnit;
@@@ -675,8 -634,8 +638,8 @@@ public class DefaultMessageStore implem
                                  continue;
                              }
  
-                             this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
+                             this.storeStatsService.getGetMessageTransferedMsgCount().add(1);
 -                            getResult.addMessage(selectResult);
 +                            getResult.addMessage(selectResult, offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE));
                              status = GetMessageStatus.FOUND;
                              nextPhyFileStartOffset = Long.MIN_VALUE;
                          }
diff --cc store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index 9057ebe,1164ab8..c127515
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@@ -26,10 -24,13 +26,11 @@@ import java.util.TimerTask
  import java.util.concurrent.ConcurrentHashMap;
  import java.util.concurrent.ConcurrentMap;
  import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.stream.Collectors;
  import org.apache.rocketmq.common.ConfigManager;
+ import org.apache.rocketmq.common.MixAll;
  import org.apache.rocketmq.common.TopicFilterType;
  import org.apache.rocketmq.common.constant.LoggerName;
 -import org.apache.rocketmq.common.topic.TopicValidator;
 -import org.apache.rocketmq.logging.InternalLogger;
 -import org.apache.rocketmq.logging.InternalLoggerFactory;
  import org.apache.rocketmq.common.message.MessageAccessor;
  import org.apache.rocketmq.common.message.MessageConst;
  import org.apache.rocketmq.common.message.MessageDecoder;
diff --cc tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index c1b42f5,d701056..0da449d
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@@ -98,10 -90,9 +98,10 @@@ public interface MQAdminExt extends MQA
          final SubscriptionGroupConfig config) throws RemotingException,
          MQBrokerException, InterruptedException, MQClientException;
  
-     SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group);
+     SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;
  
 -    TopicConfig examineTopicConfig(final String addr, final String topic);
 +    TopicConfig examineTopicConfig(final String addr,
 +        final String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
  
      TopicStatsTable examineTopicStats(
          final String topic) throws RemotingException, MQClientException, InterruptedException,
diff --cc tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 3e98939,4411a6c..1d29959
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@@ -49,14 -49,9 +49,15 @@@ import org.apache.rocketmq.tools.comman
  import org.apache.rocketmq.tools.command.consumer.ConsumerProgressSubCommand;
  import org.apache.rocketmq.tools.command.consumer.ConsumerStatusSubCommand;
  import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand;
+ import org.apache.rocketmq.tools.command.consumer.GetConsumerConfigSubCommand;
 +import org.apache.rocketmq.tools.command.consumer.SetConsumeModeSubCommand;
  import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
  import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
 +import org.apache.rocketmq.tools.command.logicalqueue.DeleteTopicLogicalQueueMappingCommand;
 +import org.apache.rocketmq.tools.command.logicalqueue.MigrateTopicLogicalQueueCommand;
 +import org.apache.rocketmq.tools.command.logicalqueue.QueryTopicLogicalQueueMappingCommand;
 +import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand;
 +import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueNumCommand;
  import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
  import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand;
  import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;