You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by zh...@apache.org on 2021/09/24 09:07:09 UTC
[rocketmq] 01/02: Merge branch 'develop' into 5.0.0-preview
This is an automated email from the ASF dual-hosted git repository.
zhoubo pushed a commit to branch 5.0.0-preview
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 2bf133be29c444f0aea648133123a2d9a61ffb07
Merge: 48d3c7e 1e8e728
Author: odbozhou <87...@qq.com>
AuthorDate: Fri Sep 24 15:35:46 2021 +0800
Merge branch 'develop' into 5.0.0-preview
# Conflicts:
# .travis.yml
# acl/pom.xml
# broker/pom.xml
# broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
# broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
# broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
# client/pom.xml
# client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
# client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
# client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
# common/pom.xml
# common/src/main/java/org/apache/rocketmq/common/MQVersion.java
# common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
# distribution/pom.xml
# example/pom.xml
# filter/pom.xml
# logappender/pom.xml
# logging/pom.xml
# namesrv/pom.xml
# namesrv/src/main/java/org/apache/rocketmq/namesrv/processor/DefaultRequestProcessor.java
# namesrv/src/main/java/org/apache/rocketmq/namesrv/routeinfo/RouteInfoManager.java
# openmessaging/pom.xml
# pom.xml
# remoting/pom.xml
# srvutil/pom.xml
# store/pom.xml
# store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
# test/pom.xml
# tools/pom.xml
# tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
.github/workflows/greetings.yml | 29 +
README.md | 2 +-
.../org/apache/rocketmq/acl/common/AclUtils.java | 69 +-
.../rocketmq/acl/plain/PlainPermissionManager.java | 53 +-
.../acl/plain/RemoteAddressStrategyFactory.java | 2 +-
.../apache/rocketmq/acl/common/AclUtilsTest.java | 1 +
.../acl/plain/PlainAccessValidatorTest.java | 53 +-
.../broker/pagecache/ManyMessageTransfer.java | 27 +
.../broker/pagecache/OneMessageTransfer.java | 27 +
.../broker/pagecache/QueryMessageTransfer.java | 27 +
.../broker/plugin/AbstractPluginMessageStore.java | 4 +-
.../processor/AbstractSendMessageProcessor.java | 89 ++-
.../broker/processor/AdminBrokerProcessor.java | 20 +-
.../broker/processor/ReplyMessageProcessor.java | 4 +-
.../broker/processor/SendMessageProcessor.java | 120 ++-
.../AbstractTransactionalMessageCheckListener.java | 2 -
.../DefaultTransactionalMessageCheckListener.java | 4 +-
.../AbstractSendMessageProcessorTest.java | 82 ++
client/pom.xml | 4 -
.../org/apache/rocketmq/client/ClientConfig.java | 13 +-
.../client/consumer/DefaultLitePullConsumer.java | 4 +-
.../client/consumer/DefaultMQPushConsumer.java | 18 +-
.../rocketmq/client/impl/MQClientAPIImpl.java | 28 +-
.../ConsumeMessageConcurrentlyService.java | 28 +-
.../consumer/ConsumeMessageOrderlyService.java | 4 +-
.../impl/consumer/DefaultLitePullConsumerImpl.java | 64 +-
.../impl/consumer/DefaultMQPushConsumerImpl.java | 2 +-
.../client/impl/consumer/RebalanceImpl.java | 2 +-
.../impl/consumer/RebalanceLitePullImpl.java | 3 +-
.../client/impl/factory/MQClientInstance.java | 42 +-
.../impl/producer/DefaultMQProducerImpl.java | 61 +-
.../client/producer/DefaultMQProducer.java | 54 +-
.../client/producer/RequestFutureTable.java | 7 +
.../client/trace/AsyncTraceDispatcher.java | 74 +-
.../rocketmq/client/trace/TraceDataEncoder.java | 1 -
.../apache/rocketmq/client/trace/TraceView.java | 4 +-
.../rocketmq/client/impl/MQClientAPIImplTest.java | 27 +
.../ConsumeMessageConcurrentlyServiceTest.java | 2 +-
.../trace/DefaultMQConsumerWithTraceTest.java | 9 +
.../DefaultMQLitePullConsumerWithTraceTest.java | 12 +-
.../trace/DefaultMQProducerWithTraceTest.java | 10 +
.../client/trace/TraceDataEncoderTest.java | 106 ++-
.../rocketmq/client/trace/TraceViewTest.java | 4 +-
.../org/apache/rocketmq/common/BrokerConfig.java | 8 +-
.../java/org/apache/rocketmq/common/MixAll.java | 1 +
.../java/org/apache/rocketmq/common/UtilAll.java | 33 +-
.../apache/rocketmq/common/message/Message.java | 11 +-
.../rocketmq/common/message/MessageDecoder.java | 41 +-
.../rocketmq/common/protocol/NamespaceUtil.java | 2 +-
.../rocketmq/common/protocol/RequestCode.java | 2 +
.../namesrv/AddWritePermOfBrokerRequestHeader.java | 39 +
.../AddWritePermOfBrokerResponseHeader.java | 38 +
.../apache/rocketmq/common/stats/StatsItem.java | 22 +-
.../apache/rocketmq/common/stats/StatsItemSet.java | 8 +-
.../common/utils/NameServerAddressUtils.java | 1 +
.../org/apache/rocketmq/common/UtilAllTest.java | 13 +
.../common/message/MessageDecoderTest.java | 108 +++
.../rocketmq/common/stats/StatsItemSetTest.java | 4 +-
.../rocketmq/common/utils/IOTinyUtilsTest.java | 4 +-
distribution/benchmark/batchproducer.sh | 18 +
distribution/bin/runbroker.cmd | 2 +-
distribution/bin/runbroker.sh | 2 +-
distribution/bin/runserver.cmd | 2 +-
distribution/bin/runserver.sh | 6 +-
distribution/release.xml | 4 +
docs/cn/Configuration_System.md | 70 ++
docs/cn/Deployment.md | 159 ++++
docs/cn/Example_Batch.md | 82 ++
docs/cn/Example_Delay.md | 85 +++
docs/cn/Example_Simple_cn.md | 136 ++++
docs/cn/FAQ.md | 110 +++
docs/cn/RocketMQ_Example.md | 8 +-
docs/cn/best_practice.md | 2 +-
docs/cn/design.md | 4 +-
docs/cn/image/rocketmq_architecture_1.png | Bin 89290 -> 62810 bytes
docs/cn/image/rocketmq_architecture_3.png | Bin 106758 -> 74884 bytes
docs/cn/msg_trace/user_guide.md | 19 +-
docs/cn/operation.md | 10 +-
docs/en/CLITools.md | 8 +
docs/en/Example_Transaction.md | 2 +-
docs/en/best_practice.md | 2 +-
.../client/java/API_Reference_DefaultMQProducer.md | 71 ++
docs/en/image/rocketmq_architecture_1.png | Bin 89290 -> 62810 bytes
docs/en/image/rocketmq_architecture_3.png | Bin 106758 -> 74884 bytes
docs/en/msg_trace/user_guide.md | 16 +
docs/en/operation.md | 2 +-
.../rocketmq/example/benchmark/BatchProducer.java | 403 ++++++++++
.../rocketmq/example/benchmark/Consumer.java | 66 +-
.../rocketmq/example/benchmark/Producer.java | 157 ++--
.../example/benchmark/TransactionProducer.java | 117 +--
.../rocketmq/example/quickstart/Producer.java | 33 +
.../rocketmq/example/simple/PullConsumer.java | 154 ++--
.../filter/expression/UnaryExpression.java | 16 +-
.../rocketmq/filter/parser/ParseException.java | 7 +-
.../rocketmq/filter/parser/TokenMgrError.java | 3 +-
.../org/apache/rocketmq/filter/ParserTest.java | 2 +-
.../namesrv/processor/DefaultRequestProcessor.java | 24 +
.../namesrv/routeinfo/RouteInfoManager.java | 41 +-
.../namesrv/routeinfo/RouteInfoManagerTest.java | 52 +-
pom.xml | 2 +-
.../rocketmq/remoting/common/RemotingHelper.java | 2 +-
.../rocketmq/remoting/netty/NettyClientConfig.java | 8 +-
.../rocketmq/remoting/netty/NettyLogger.java | 46 ++
.../remoting/netty/NettyRemotingAbstract.java | 7 +-
.../rocketmq/remoting/netty/NettySystemConfig.java | 16 +
.../remoting/netty/NettyClientConfigTest.java | 64 ++
.../rocketmq/store/AppendMessageCallback.java | 5 +-
.../apache/rocketmq/store/AppendMessageResult.java | 17 +
.../java/org/apache/rocketmq/store/CommitLog.java | 835 ++++++++-------------
.../apache/rocketmq/store/DefaultMessageStore.java | 73 +-
.../java/org/apache/rocketmq/store/MappedFile.java | 26 +-
.../rocketmq/store/MessageExtBrokerInner.java | 12 +
.../org/apache/rocketmq/store/MessageStore.java | 4 +-
.../rocketmq/store/SelectMappedBufferResult.java | 7 -
.../apache/rocketmq/store/StoreStatsService.java | 109 +--
.../rocketmq/store/config/MessageStoreConfig.java | 7 +-
.../rocketmq/store/dledger/DLedgerCommitLog.java | 250 +-----
.../org/apache/rocketmq/store/ha/HAService.java | 63 +-
.../apache/rocketmq/store/ha/WaitNotifyObject.java | 64 +-
.../store/schedule/ScheduleMessageService.java | 5 +
.../apache/rocketmq/store/stats/BrokerStats.java | 4 +-
.../rocketmq/store/stats/BrokerStatsManager.java | 50 +-
.../apache/rocketmq/store/AppendCallbackTest.java | 28 +-
.../apache/rocketmq/store/BatchPutMessageTest.java | 18 +-
.../rocketmq/store/StoreStatsServiceTest.java | 18 +-
.../store/schedule/ScheduleMessageServiceTest.java | 6 +-
.../test/java/stats/BrokerStatsManagerTest.java | 47 ++
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 8 +-
.../tools/admin/DefaultMQAdminExtImpl.java | 12 +-
.../apache/rocketmq/tools/admin/MQAdminExt.java | 5 +-
.../rocketmq/tools/command/MQAdminStartup.java | 4 +
.../command/acl/DeleteAccessConfigSubCommand.java | 14 +-
.../command/acl/UpdateAccessConfigSubCommand.java | 8 +-
.../acl/UpdateGlobalWhiteAddrSubCommand.java | 6 +-
.../consumer/GetConsumerConfigSubCommand.java | 146 ++++
.../message/QueryMsgByUniqueKeySubCommand.java | 5 +-
.../tools/command/message/SendMessageCommand.java | 17 +-
...SubCommand.java => AddWritePermSubCommand.java} | 29 +-
.../command/namesrv/WipeWritePermSubCommand.java | 2 +-
.../tools/admin/DefaultMQAdminExtTest.java | 7 +
.../consumer/GetConsumerConfigSubCommandTest.java | 83 ++
.../namesrv/AddWritePermSubCommandTest.java | 37 +
142 files changed, 3998 insertions(+), 1606 deletions(-)
diff --cc broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
index 4daa832,9d26e99..d2f751b
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AbstractSendMessageProcessor.java
@@@ -21,19 -19,15 +21,22 @@@ import com.google.common.collect.Maps
import io.netty.channel.ChannelHandlerContext;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+ import java.util.HashMap;
import java.util.List;
-import java.util.Map;
+import java.util.Locale;
- import java.util.Map;
++import java.util.Map;Optional
++
+ import java.util.concurrent.ThreadLocalRandom;
+
+import java.util.Optional;
- import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.domain.LogicalQueuesInfoInBroker;
import org.apache.rocketmq.broker.mqtrace.SendMessageContext;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
-import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
diff --cc broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index b3309e1,86aab63..e7b7949
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@@ -154,7 -116,7 +154,8 @@@ import org.apache.rocketmq.remoting.net
import org.apache.rocketmq.remoting.protocol.LanguageCode;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
+ import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
+import org.apache.rocketmq.srvutil.ConcurrentHashMapUtil;
import org.apache.rocketmq.store.ConsumeQueue;
import org.apache.rocketmq.store.ConsumeQueueExt;
import org.apache.rocketmq.store.DefaultMessageStore;
@@@ -261,31 -235,9 +262,29 @@@ public class AdminBrokerProcessor exten
return resumeCheckHalfMessage(ctx, request);
case RequestCode.GET_BROKER_CLUSTER_ACL_CONFIG:
return getBrokerClusterAclConfig(ctx, request);
+ case RequestCode.GET_TOPIC_CONFIG:
+ return getTopicConfig(ctx, request);
+ case RequestCode.UPDATE_TOPIC_LOGICAL_QUEUE_MAPPING:
+ return updateTopicLogicalQueueMapping(ctx, request);
+ case RequestCode.DELETE_TOPIC_LOGICAL_QUEUE_MAPPING:
+ return deleteTopicLogicalQueueMapping(ctx, request);
+ case RequestCode.QUERY_TOPIC_LOGICAL_QUEUE_MAPPING:
+ return queryTopicLogicalQueueMapping(ctx, request);
+ case RequestCode.SEAL_TOPIC_LOGICAL_QUEUE:
+ return sealTopicLogicalQueue(ctx, request);
+ case RequestCode.REUSE_TOPIC_LOGICAL_QUEUE:
+ return reuseTopicLogicalQueue(ctx, request);
+ case RequestCode.CREATE_MESSAGE_QUEUE_FOR_LOGICAL_QUEUE:
+ return createMessageQueueForLogicalQueue(ctx, request);
+ case RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_PREPARE:
+ return migrateTopicLogicalQueuePrepare(ctx, request);
+ case RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_COMMIT:
+ return migrateTopicLogicalQueueCommit(ctx, request);
+ case RequestCode.MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY:
+ return migrateTopicLogicalQueueNotify(ctx, request);
default:
- break;
+ return getUnknownCmdResponse(ctx, request);
}
-
- return null;
}
@Override
diff --cc broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 97b7e62,3a401e1..692c98c
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@@ -285,16 -292,18 +292,24 @@@ public class SendMessageProcessor exten
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
- msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+ if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK)) {
+ // There is no need to store "WAIT=true", remove it from propertiesString to save 9 bytes for each message.
+ // It works for most case. In some cases msgInner.setPropertiesString invoked later and replace it.
+ String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
+ msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+ // Reput to properties, since msgInner.isWaitStoreMsgOK() will be invoked later
+ origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
+ } else {
+ msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
+ }
+ LogicalQueueContext logicalQueueContext = super.buildLogicalQueueContext(msgInner.getTopic(), msgInner.getQueueId(), response);
+ CompletableFuture<RemotingCommand> future = logicalQueueContext.hookBeforePut(ctx, requestHeader, request, response);
+ if (future != null) {
+ return future;
+ }
+
CompletableFuture<PutMessageResult> putMessageResult = null;
- Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
diff --cc client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
index 7a457c1,7677d8b..ab0d885
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalanceImpl.java
@@@ -557,201 -401,9 +557,201 @@@ public abstract class RebalanceImpl
log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
}
}
+
+ }
+
+ if (!allMQLocked) {
+ mQClientFactory.rebalanceLater(500);
+ }
+
+ this.dispatchPullRequest(pullRequestList, 500);
+
+ return changed;
+ }
+
+ private boolean updateMessageQueueAssignment(final String topic, final Set<MessageQueueAssignment> assignments,
+ final boolean isOrder) {
+ boolean changed = false;
+
+ Map<MessageQueue, MessageQueueAssignment> mq2PushAssignment = new HashMap<MessageQueue, MessageQueueAssignment>();
+ Map<MessageQueue, MessageQueueAssignment> mq2PopAssignment = new HashMap<MessageQueue, MessageQueueAssignment>();
+ for (MessageQueueAssignment assignment : assignments) {
+ MessageQueue messageQueue = assignment.getMessageQueue();
+ if (messageQueue == null) {
+ continue;
+ }
+ if (MessageRequestMode.POP == assignment.getMode()) {
+ mq2PopAssignment.put(messageQueue, assignment);
+ } else {
+ mq2PushAssignment.put(messageQueue, assignment);
+ }
+ }
+
+ if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
+ if (mq2PopAssignment.isEmpty() && !mq2PushAssignment.isEmpty()) {
+ //pop switch to push
+ //subscribe pop retry topic
+ try {
+ final String retryTopic = KeyBuilder.buildPopRetryTopic(topic, getConsumerGroup());
+ SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(retryTopic, SubscriptionData.SUB_ALL);
+ getSubscriptionInner().put(retryTopic, subscriptionData);
+ } catch (Exception ignored) {
+ }
+
+ } else if (!mq2PopAssignment.isEmpty() && mq2PushAssignment.isEmpty()) {
+ //push switch to pop
+ //unsubscribe pop retry topic
+ try {
+ final String retryTopic = KeyBuilder.buildPopRetryTopic(topic, getConsumerGroup());
+ getSubscriptionInner().remove(retryTopic);
+ } catch (Exception ignored) {
+ }
+
+ }
+ }
+
+ {
+ // drop process queues no longer belong me
+ HashMap<MessageQueue, ProcessQueue> removeQueueMap = new HashMap<MessageQueue, ProcessQueue>(this.processQueueTable.size());
+ Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, ProcessQueue> next = it.next();
+ MessageQueue mq = next.getKey();
+ ProcessQueue pq = next.getValue();
+
+ if (mq.getTopic().equals(topic)) {
+ if (!mq2PushAssignment.containsKey(mq)) {
+ pq.setDropped(true);
+ removeQueueMap.put(mq, pq);
+ } else if (pq.isPullExpired() && this.consumeType() == ConsumeType.CONSUME_PASSIVELY) {
+ pq.setDropped(true);
+ removeQueueMap.put(mq, pq);
+ log.error("[BUG]doRebalance, {}, try remove unnecessary mq, {}, because pull is pause, so try to fixed it",
+ consumerGroup, mq);
+ }
+ }
+ }
+ // remove message queues no longer belong me
+ for (Entry<MessageQueue, ProcessQueue> entry : removeQueueMap.entrySet()) {
+ MessageQueue mq = entry.getKey();
+ ProcessQueue pq = entry.getValue();
+
+ if (this.removeUnnecessaryMessageQueue(mq, pq)) {
+ this.processQueueTable.remove(mq);
+ changed = true;
+ log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq);
+ }
+ }
+ }
+
+ {
+ HashMap<MessageQueue, PopProcessQueue> removeQueueMap = new HashMap<MessageQueue, PopProcessQueue>(this.popProcessQueueTable.size());
+ Iterator<Entry<MessageQueue, PopProcessQueue>> it = this.popProcessQueueTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, PopProcessQueue> next = it.next();
+ MessageQueue mq = next.getKey();
+ PopProcessQueue pq = next.getValue();
+
+ if (mq.getTopic().equals(topic)) {
+ if (!mq2PopAssignment.containsKey(mq)) {
+ //the queue is no longer your assignment
+ pq.setDropped(true);
+ removeQueueMap.put(mq, pq);
+ } else if (pq.isPullExpired() && this.consumeType() == ConsumeType.CONSUME_PASSIVELY) {
+ pq.setDropped(true);
+ removeQueueMap.put(mq, pq);
+ log.error("[BUG]doRebalance, {}, try remove unnecessary pop mq, {}, because pop is pause, so try to fixed it",
+ consumerGroup, mq);
+ }
+ }
+ }
+ // remove message queues no longer belong me
+ for (Entry<MessageQueue, PopProcessQueue> entry : removeQueueMap.entrySet()) {
+ MessageQueue mq = entry.getKey();
+ PopProcessQueue pq = entry.getValue();
+
+ if (this.removeUnnecessaryPopMessageQueue(mq, pq)) {
+ this.popProcessQueueTable.remove(mq);
+ changed = true;
+ log.info("doRebalance, {}, remove unnecessary pop mq, {}", consumerGroup, mq);
+ }
+ }
+ }
+
+ {
+ // add new message queue
+ boolean allMQLocked = true;
+ List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
+ for (MessageQueue mq : mq2PushAssignment.keySet()) {
+ if (!this.processQueueTable.containsKey(mq)) {
+ if (isOrder && !this.lock(mq)) {
+ log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
+ allMQLocked = false;
+ continue;
+ }
+
+ this.removeDirtyOffset(mq);
+ ProcessQueue pq = createProcessQueue();
+ pq.setLocked(true);
+ long nextOffset = -1L;
+ try {
+ nextOffset = this.computePullFromWhereWithException(mq);
- } catch (MQClientException e) {
++ } catch (Exception e) {
+ log.info("doRebalance, {}, compute offset failed, {}", consumerGroup, mq);
+ continue;
+ }
+
+ if (nextOffset >= 0) {
+ ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
+ if (pre != null) {
+ log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
+ } else {
+ log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
+ PullRequest pullRequest = new PullRequest();
+ pullRequest.setConsumerGroup(consumerGroup);
+ pullRequest.setNextOffset(nextOffset);
+ pullRequest.setMessageQueue(mq);
+ pullRequest.setProcessQueue(pq);
+ pullRequestList.add(pullRequest);
+ changed = true;
+ }
+ } else {
+ log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
+ }
+ }
+ }
+
+ if (!allMQLocked) {
+ mQClientFactory.rebalanceLater(500);
+ }
+ this.dispatchPullRequest(pullRequestList, 500);
}
- this.dispatchPullRequest(pullRequestList);
+ {
+ // add new message queue
+ List<PopRequest> popRequestList = new ArrayList<PopRequest>();
+ for (MessageQueue mq : mq2PopAssignment.keySet()) {
+ if (!this.popProcessQueueTable.containsKey(mq)) {
+ PopProcessQueue pq = createPopProcessQueue();
+ PopProcessQueue pre = this.popProcessQueueTable.putIfAbsent(mq, pq);
+ if (pre != null) {
+ log.info("doRebalance, {}, mq pop already exists, {}", consumerGroup, mq);
+ } else {
+ log.info("doRebalance, {}, add a new pop mq, {}", consumerGroup, mq);
+ PopRequest popRequest = new PopRequest();
+ popRequest.setTopic(topic);
+ popRequest.setConsumerGroup(consumerGroup);
+ popRequest.setMessageQueue(mq);
+ popRequest.setPopProcessQueue(pq);
+ popRequest.setInitMode(getConsumeInitMode());
+ popRequestList.add(popRequest);
+ changed = true;
+ }
+ }
+ }
+
+ this.dispatchPopPullRequest(popRequestList, 500);
+ }
return changed;
}
diff --cc client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index c1a50dd,e897d49..dafc4f8
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@@ -676,7 -619,7 +676,7 @@@ public class MQClientInstance
}
}
} else {
- topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3, true, logicalQueueIdsFilter);
- topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
++ topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout(), true, logicalQueueIdsFilter);
}
if (topicRouteData != null) {
TopicRouteData old = this.topicRouteTable.get(topic);
diff --cc client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
index 8802a9c,bdc103f..ba479d2
--- a/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/producer/DefaultMQProducerImpl.java
@@@ -22,14 -20,10 +22,12 @@@ import java.io.IOException
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Random;
import java.util.Set;
- import java.util.Timer;
- import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
diff --cc client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
index c91d55a,e1b3bed..34b34f9
--- a/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/MQClientAPIImplTest.java
@@@ -37,42 -25,14 +37,44 @@@ import org.apache.rocketmq.client.produ
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.AclConfig;
+import org.apache.rocketmq.common.DataVersion;
import org.apache.rocketmq.common.PlainAccessConfig;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageConst;
+ import org.apache.rocketmq.common.protocol.RequestCode;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.protocol.ResponseCode;
+import org.apache.rocketmq.common.protocol.body.ClusterAclVersionInfo;
+import org.apache.rocketmq.common.protocol.body.QueryAssignmentResponseBody;
+import org.apache.rocketmq.common.protocol.header.AckMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeRequestHeader;
+import org.apache.rocketmq.common.protocol.header.ChangeInvisibleTimeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.ExtraInfoUtil;
+import org.apache.rocketmq.common.protocol.header.GetBrokerAclConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseBody;
+import org.apache.rocketmq.common.protocol.header.GetBrokerClusterAclConfigResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseBody;
+import org.apache.rocketmq.common.protocol.header.GetConsumerListByGroupResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMaxOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.GetMinOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.PopMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.header.PopMessageResponseHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.common.protocol.header.SendMessageResponseHeader;
+ import org.apache.rocketmq.common.protocol.header.namesrv.AddWritePermOfBrokerResponseHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetResponseHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.remoting.InvokeCallback;
import org.apache.rocketmq.remoting.RemotingClient;
import org.apache.rocketmq.remoting.exception.RemotingException;
diff --cc common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 17cc2a1,f710cdb..bf2355c
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@@ -58,12 -57,10 +58,11 @@@ public class BrokerConfig
@ImportantField
private boolean traceTopicEnable = false;
/**
- * thread numbers for send message thread pool, since spin lock will be used by default since 4.0.x, the default
- * value is 1.
+ * thread numbers for send message thread pool.
*/
- private int sendMessageThreadPoolNums = 1; //16 + Runtime.getRuntime().availableProcessors() * 4;
+ private int sendMessageThreadPoolNums = Math.min(Runtime.getRuntime().availableProcessors(), 4);
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
+ private int ackMessageThreadPoolNums = 3;
private int processReplyMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
diff --cc common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
index 3613049,5624a7e..04f126b
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/RequestCode.java
@@@ -193,18 -189,5 +193,20 @@@ public class RequestCode
public static final int PUSH_REPLY_MESSAGE_TO_CLIENT = 326;
+ public static final int ADD_WRITE_PERM_OF_BROKER = 327;
++
+ public static final int GET_TOPIC_CONFIG = 351;
+
+ public static final int QUERY_ASSIGNMENT = 400;
+ public static final int SET_MESSAGE_REQUEST_MODE = 401;
+
+ public static final int UPDATE_TOPIC_LOGICAL_QUEUE_MAPPING = 411;
+ public static final int DELETE_TOPIC_LOGICAL_QUEUE_MAPPING = 422;
+ public static final int QUERY_TOPIC_LOGICAL_QUEUE_MAPPING = 413;
+ public static final int SEAL_TOPIC_LOGICAL_QUEUE = 414;
+ public static final int REUSE_TOPIC_LOGICAL_QUEUE = 415;
+ public static final int CREATE_MESSAGE_QUEUE_FOR_LOGICAL_QUEUE = 416;
+ public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_PREPARE = 417;
+ public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_COMMIT = 418;
+ public static final int MIGRATE_TOPIC_LOGICAL_QUEUE_NOTIFY = 419;
}
diff --cc store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index b11eb49,5bf68ac..152af7b
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@@ -35,7 -34,7 +35,8 @@@ import java.util.Set
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.ExecutionException;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@@ -675,8 -634,8 +638,8 @@@ public class DefaultMessageStore implem
continue;
}
- this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();
+ this.storeStatsService.getGetMessageTransferedMsgCount().add(1);
- getResult.addMessage(selectResult);
+ getResult.addMessage(selectResult, offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE));
status = GetMessageStatus.FOUND;
nextPhyFileStartOffset = Long.MIN_VALUE;
}
diff --cc store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index 9057ebe,1164ab8..c127515
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@@ -26,10 -24,13 +26,11 @@@ import java.util.TimerTask
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.apache.rocketmq.common.ConfigManager;
+ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
diff --cc tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index c1b42f5,d701056..0da449d
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@@ -98,10 -90,9 +98,10 @@@ public interface MQAdminExt extends MQA
final SubscriptionGroupConfig config) throws RemotingException,
MQBrokerException, InterruptedException, MQClientException;
- SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group);
+ SubscriptionGroupConfig examineSubscriptionGroupConfig(final String addr, final String group) throws InterruptedException, RemotingException, MQClientException, MQBrokerException;
- TopicConfig examineTopicConfig(final String addr, final String topic);
+ TopicConfig examineTopicConfig(final String addr,
+ final String topic) throws InterruptedException, MQBrokerException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException;
TopicStatsTable examineTopicStats(
final String topic) throws RemotingException, MQClientException, InterruptedException,
diff --cc tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
index 3e98939,4411a6c..1d29959
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/MQAdminStartup.java
@@@ -49,14 -49,9 +49,15 @@@ import org.apache.rocketmq.tools.comman
import org.apache.rocketmq.tools.command.consumer.ConsumerProgressSubCommand;
import org.apache.rocketmq.tools.command.consumer.ConsumerStatusSubCommand;
import org.apache.rocketmq.tools.command.consumer.DeleteSubscriptionGroupCommand;
+ import org.apache.rocketmq.tools.command.consumer.GetConsumerConfigSubCommand;
+import org.apache.rocketmq.tools.command.consumer.SetConsumeModeSubCommand;
import org.apache.rocketmq.tools.command.consumer.StartMonitoringSubCommand;
import org.apache.rocketmq.tools.command.consumer.UpdateSubGroupSubCommand;
+import org.apache.rocketmq.tools.command.logicalqueue.DeleteTopicLogicalQueueMappingCommand;
+import org.apache.rocketmq.tools.command.logicalqueue.MigrateTopicLogicalQueueCommand;
+import org.apache.rocketmq.tools.command.logicalqueue.QueryTopicLogicalQueueMappingCommand;
+import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueMappingCommand;
+import org.apache.rocketmq.tools.command.logicalqueue.UpdateTopicLogicalQueueNumCommand;
import org.apache.rocketmq.tools.command.message.CheckMsgSendRTCommand;
import org.apache.rocketmq.tools.command.message.ConsumeMessageCommand;
import org.apache.rocketmq.tools.command.message.PrintMessageByQueueCommand;