You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2022/03/04 08:50:01 UTC
[rocketmq] 03/05: Merge 5.0.0-alpha-merge with develop branch
This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch 5.0.0-alpha
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 0d15e6c2d5b436081c91fa6d8e86e2e2b6be18e9
Merge: de76e06 1393329
Author: RongtongJin <ji...@mails.ucas.ac.cn>
AuthorDate: Fri Feb 25 17:43:06 2022 +0800
Merge 5.0.0-alpha-merge with develop branch
.travis.yml | 15 +-
NOTICE | 2 +-
README.md | 22 +-
.../org/apache/rocketmq/acl/AccessValidator.java | 13 +
.../org/apache/rocketmq/acl/common/AclUtils.java | 2 +-
.../rocketmq/acl/plain/PlainAccessValidator.java | 19 +-
.../rocketmq/acl/plain/PlainPermissionManager.java | 427 +++++++++---
.../acl/plain/RemoteAddressStrategyFactory.java | 4 +-
.../apache/rocketmq/acl/common/AclUtilsTest.java | 13 +-
.../acl/plain/PlainAccessValidatorTest.java | 762 ++++++++++++++-------
.../acl/plain/PlainPermissionManagerTest.java | 59 +-
.../plain_acl.yml} | 32 +-
acl/src/test/resources/conf/plain_acl_null.yml | 18 -
broker/pom.xml | 4 +
.../apache/rocketmq/broker/BrokerController.java | 40 +-
.../rocketmq/broker/BrokerPathConfigHelper.java | 4 -
.../org/apache/rocketmq/broker/BrokerStartup.java | 16 +-
.../broker/filter/ConsumerFilterManager.java | 17 +-
.../broker/filter/MessageEvaluationContext.java | 5 +-
.../longpolling/LmqPullRequestHoldService.java | 62 ++
.../broker/longpolling/ManyPullRequest.java | 4 +
.../broker/longpolling/PullRequestHoldService.java | 10 +-
.../broker/offset/ConsumerOffsetManager.java | 6 +-
.../broker/offset/LmqConsumerOffsetManager.java | 109 +++
.../broker/plugin/AbstractPluginMessageStore.java | 549 +++++++--------
.../broker/processor/AdminBrokerProcessor.java | 8 +
.../broker/processor/PullMessageProcessor.java | 2 +-
.../broker/processor/SendMessageProcessor.java | 18 +-
.../subscription/LmqSubscriptionGroupManager.java | 46 ++
.../broker/topic/LmqTopicConfigManager.java | 49 ++
.../rocketmq/broker/topic/TopicConfigManager.java | 6 +-
.../queue/TransactionalMessageBridge.java | 8 +-
.../queue/TransactionalMessageServiceImpl.java | 4 +-
.../rocketmq/broker/BrokerControllerTest.java | 33 +
.../broker/BrokerPathConfigHelperTest.java | 42 ++
.../broker/filter/MessageStoreWithFilterTest.java | 2 +-
.../offset/LmqConsumerOffsetManagerTest.java | 81 +++
.../broker/processor/AdminBrokerProcessorTest.java | 262 ++++++-
.../broker/processor/SendMessageProcessorTest.java | 4 +-
client/pom.xml | 15 -
.../org/apache/rocketmq/client/ClientConfig.java | 10 +-
.../java/org/apache/rocketmq/client/MQHelper.java | 1 +
.../org/apache/rocketmq/client/Validators.java | 52 +-
.../client/consumer/DefaultMQPullConsumer.java | 1 +
.../client/consumer/DefaultMQPushConsumer.java | 5 +-
.../rebalance/AllocateMachineRoomNearby.java | 7 +-
.../AllocateMessageQueueByMachineRoom.java | 11 +
.../consumer/store/LocalFileOffsetStore.java | 11 +-
.../consumer/store/RemoteBrokerOffsetStore.java | 4 +-
.../client/impl/ClientRemotingProcessor.java | 6 +-
.../apache/rocketmq/client/impl/MQAdminImpl.java | 5 +-
.../rocketmq/client/impl/MQClientAPIImpl.java | 135 ++--
.../ConsumeMessageConcurrentlyService.java | 14 +-
.../consumer/ConsumeMessageOrderlyService.java | 14 +-
.../impl/consumer/DefaultLitePullConsumerImpl.java | 2 +
.../client/impl/consumer/ProcessQueue.java | 12 +-
.../client/impl/factory/MQClientInstance.java | 17 +-
.../impl/producer/DefaultMQProducerImpl.java | 87 ++-
.../client/producer/DefaultMQProducer.java | 1 -
.../client/producer/LocalTransactionExecuter.java | 2 +-
...stFutureTable.java => RequestFutureHolder.java} | 54 +-
.../client/producer/TransactionCheckListener.java | 3 +-
.../rocketmq/client/trace/TraceDataEncoder.java | 5 +-
.../org/apache/rocketmq/client/ValidatorsTest.java | 2 +-
.../consumer/DefaultLitePullConsumerTest.java | 18 +-
.../client/consumer/DefaultMQPushConsumerTest.java | 34 +-
.../AllocateMessageQueueAveragelyByCircleTest.java | 68 ++
.../AllocateMessageQueueAveragelyTest.java | 57 ++
.../store/RemoteBrokerOffsetStoreTest.java | 3 +-
.../rocketmq/client/impl/MQClientAPIImplTest.java | 36 +-
.../ConsumeMessageConcurrentlyServiceTest.java | 57 +-
.../consumer/ConsumeMessageOrderlyServiceTest.java | 177 +++++
.../impl/consumer/RebalanceLitePullImplTest.java | 100 +++
.../impl/consumer/RebalancePushImplTest.java | 93 ++-
.../client/producer/DefaultMQProducerTest.java | 11 +-
.../DefaultMQConsumerWithOpenTracingTest.java | 25 +-
.../trace/DefaultMQConsumerWithTraceTest.java | 5 +-
.../DefaultMQLitePullConsumerWithTraceTest.java | 1 -
.../client/trace/TraceDataEncoderTest.java | 33 +-
.../trace/TransactionMQProducerWithTraceTest.java | 36 +-
.../org/apache/rocketmq/common/BrokerConfig.java | 41 +-
.../org/apache/rocketmq/common/Configuration.java | 19 +-
.../java/org/apache/rocketmq/common/MixAll.java | 5 +
.../java/org/apache/rocketmq/common/UtilAll.java | 14 +
.../common/message/MessageClientIDSetter.java | 27 +-
.../rocketmq/common/message/MessageConst.java | 5 +-
.../rocketmq/common/message/MessageDecoder.java | 5 +-
.../apache/rocketmq/common/message/MessageExt.java | 4 +
.../protocol/body/ClusterAclVersionInfo.java | 15 +-
.../common/protocol/body/ConsumerRunningInfo.java | 8 +-
.../header/GetBrokerAclConfigResponseHeader.java | 10 +
...va => DeleteTopicFromNamesrvRequestHeader.java} | 2 +-
.../rocketmq/common/sysflag/TopicSysFlag.java | 3 -
.../rocketmq/common/topic/TopicValidator.java | 47 +-
.../org/apache/rocketmq/common/MixAllTest.java | 12 +
.../common/message/MessageClientIDSetterTest.java | 22 +
distribution/NOTICE-BIN | 2 +-
distribution/benchmark/runclass.sh | 3 +-
distribution/bin/runbroker.cmd | 3 +-
distribution/bin/runbroker.sh | 25 +-
distribution/bin/runserver.cmd | 3 +-
distribution/bin/runserver.sh | 3 +-
distribution/bin/tools.cmd | 3 +-
distribution/bin/tools.sh | 3 +-
distribution/conf/{ => acl}/plain_acl.yml | 44 +-
distribution/conf/logback_broker.xml | 48 +-
docs/cn/Example_LMQ.md | 85 +++
...Multiple_ACL_Files_\350\256\276\350\256\241.md" | 137 ++++
docs/cn/architecture.md | 6 +-
docs/cn/best_practice.md | 6 +-
.../java/API_Reference_ DefaultPullConsumer.md | 143 ++++
.../client/java/API_Reference_DefaultMQProducer.md | 26 +-
docs/cn/design.md | 2 +-
docs/cn/image/LMQ_1.png | Bin 0 -> 304040 bytes
docs/en/CLITools.md | 6 +-
docs/en/Configuration_Client.md | 16 +-
docs/en/Design_Query.md | 6 +-
docs/en/Example_Transaction.md | 10 +-
docs/en/best_practice.md | 79 +++
example/pom.xml | 4 -
.../rocketmq/example/benchmark/AclClient.java | 12 +-
.../rocketmq/example/benchmark/Consumer.java | 13 +-
.../rocketmq/example/benchmark/Producer.java | 15 +-
.../example/benchmark/TransactionProducer.java | 17 +-
.../rocketmq/example/ordermessage/Consumer.java | 4 -
.../rocketmq/example/quickstart/Consumer.java | 4 +-
.../example/schedule/ScheduledMessageConsumer.java | 51 ++
.../example/schedule/ScheduledMessageProducer.java | 41 ++
.../rocketmq/example/simple/OnewayProducer.java | 45 ++
.../rocketmq/example/simple/PullConsumer.java | 2 +-
example/src/main/resources/MessageFilterImpl.java | 39 --
logappender/pom.xml | 80 ---
.../logappender/common/ProducerInstance.java | 97 ---
.../logappender/log4j/RocketmqLog4jAppender.java | 189 -----
.../logappender/log4j2/RocketmqLog4j2Appender.java | 226 ------
.../logback/RocketmqLogbackAppender.java | 179 -----
.../rocketmq/logappender/AbstractTestCase.java | 72 --
.../rocketmq/logappender/Log4jPropertiesTest.java | 32 -
.../apache/rocketmq/logappender/Log4jXmlTest.java | 32 -
.../apache/rocketmq/logappender/LogbackTest.java | 52 --
.../apache/rocketmq/logappender/log4j2Test.java | 44 --
.../src/test/resources/log4j-example.properties | 33 -
logappender/src/test/resources/log4j-example.xml | 56 --
logappender/src/test/resources/log4j2-example.xml | 41 --
logappender/src/test/resources/logback-example.xml | 81 ---
.../rocketmq/logging/inner/LoggingBuilder.java | 2 +-
.../rocketmq/logging/inner/LoggingEvent.java | 3 +
namesrv/pom.xml | 4 +
.../apache/rocketmq/namesrv/NamesrvStartup.java | 2 +-
.../namesrv/processor/DefaultRequestProcessor.java | 6 +-
.../processor/DefaultRequestProcessorTest.java | 149 +++-
.../io/openmessaging/rocketmq/utils/BeanUtils.java | 9 +-
pom.xml | 23 +-
.../rocketmq/remoting/common/RemotingHelper.java | 36 +-
.../rocketmq/remoting/common/RemotingUtil.java | 9 +-
.../rocketmq/remoting/netty/NettyClientConfig.java | 19 +
.../remoting/netty/NettyRemotingAbstract.java | 5 +-
.../remoting/netty/NettyRemotingClient.java | 21 +-
.../remoting/netty/NettyRemotingServer.java | 19 +-
.../rocketmq/remoting/netty/NettyServerConfig.java | 27 +
.../rocketmq/remoting/netty/NettySystemConfig.java | 17 +-
.../remoting/protocol/RemotingCommand.java | 12 +-
.../remoting/protocol/RocketMQSerializable.java | 9 +-
.../remoting/netty/NettyServerConfigTest.java | 35 +-
.../remoting/protocol/RemotingCommandTest.java | 59 +-
.../protocol/RocketMQSerializableTest.java | 74 +-
.../rocketmq/srvutil/AclFileWatchService.java | 162 +++++
.../org/apache/rocketmq/srvutil/ServerUtil.java | 2 +-
store/pom.xml | 2 +-
.../java/org/apache/rocketmq/store/CommitLog.java | 197 ++++--
.../apache/rocketmq/store/CommitLogDispatcher.java | 4 +
.../org/apache/rocketmq/store/ConsumeQueue.java | 56 +-
.../apache/rocketmq/store/DefaultMessageStore.java | 121 +++-
.../apache/rocketmq/store/FlushDiskWatcher.java | 78 +++
.../org/apache/rocketmq/store/MappedFileQueue.java | 4 +-
.../rocketmq/store/MessageArrivingListener.java | 11 +
.../org/apache/rocketmq/store/MessageStore.java | 10 +
.../org/apache/rocketmq/store/MultiDispatch.java | 157 +++++
.../apache/rocketmq/store/PutMessageStatus.java | 1 +
.../apache/rocketmq/store/StoreStatsService.java | 70 +-
.../rocketmq/store/config/MessageStoreConfig.java | 65 ++
.../rocketmq/store/dledger/DLedgerCommitLog.java | 4 +-
.../org/apache/rocketmq/store/ha/HAConnection.java | 15 +-
.../org/apache/rocketmq/store/ha/HAService.java | 9 +-
.../rocketmq/store/logfile/DefaultMappedFile.java | 12 +-
.../store/schedule/ScheduleMessageService.java | 587 ++++++++++++----
.../rocketmq/store/stats/BrokerStatsManager.java | 109 ++-
.../store/stats/LmqBrokerStatsManager.java | 120 ++++
.../apache/rocketmq/store/BatchPutMessageTest.java | 4 +-
.../apache/rocketmq/store/ConsumeQueueTest.java | 141 +++-
.../store/DefaultMessageStoreCleanFilesTest.java | 2 +-
.../store/DefaultMessageStoreShutDownTest.java | 3 +-
.../rocketmq/store/DefaultMessageStoreTest.java | 18 +-
.../rocketmq/store/FlushDiskWatcherTest.java | 84 +++
.../java/org/apache/rocketmq/store/HATest.java | 2 +-
.../apache/rocketmq/store/MultiDispatchTest.java | 98 +++
.../rocketmq/store/ScheduleMessageServiceTest.java | 118 +++-
.../store/dledger/MessageStoreTestBase.java | 4 +-
.../store/schedule/ScheduleMessageServiceTest.java | 2 +-
.../test/java/stats/BrokerStatsManagerTest.java | 2 +-
test/pom.xml | 21 +
.../rocketmq/test/lmq/benchmark/BenchLmqStore.java | 305 +++++++++
.../rocketmq/test/message/MessageQueueMsg.java | 8 +-
.../org/apache/rocketmq/test/util/FileUtil.java | 7 +-
.../org/apache/rocketmq/test/util/MQAdmin.java | 166 +++++
.../org/apache/rocketmq/test/util/StatUtil.java | 478 +++++++++++++
.../org/apache/rocketmq/test/base/BaseConf.java | 9 +-
.../producer/exception/msg/MessageExceptionIT.java | 2 +-
.../client/producer/querymsg/QueryMsgByKeyIT.java | 57 ++
.../rocketmq/test/delay/NormalMsgDelayIT.java | 2 +-
.../rocketmq/test/lmq/TestBenchLmqStore.java | 100 +++
tools/pom.xml | 4 -
.../rocketmq/tools/admin/DefaultMQAdminExt.java | 2 +-
.../tools/admin/DefaultMQAdminExtImpl.java | 4 +-
.../apache/rocketmq/tools/command/CommandUtil.java | 7 +-
.../rocketmq/tools/command/MQAdminStartup.java | 2 +
.../acl/ClusterAclConfigVersionListSubCommand.java | 32 +-
.../broker/BrokerConsumeStatsSubCommad.java | 6 +-
.../command/broker/CleanUnusedTopicCommand.java | 2 +-
.../command/broker/GetBrokerConfigCommand.java | 22 +-
.../connection/ConsumerConnectionSubCommand.java | 14 +-
.../consumer/ConsumerProgressSubCommand.java | 18 +-
.../consumer/GetConsumerConfigSubCommand.java | 10 +-
.../tools/command/export/ExportConfigsCommand.java | 7 +-
.../command/export/ExportMetadataCommand.java | 73 +-
.../command/namesrv/GetNamesrvConfigCommand.java | 10 +-
.../command/offset/CloneGroupOffsetCommand.java | 2 +-
.../command/offset/ResetOffsetByTimeCommand.java | 2 +-
...ommand.java => SkipAccumulationSubCommand.java} | 61 +-
.../command/queue/QueryConsumeQueueCommand.java | 6 +-
.../tools/command/stats/StatsAllSubCommand.java | 12 +-
.../tools/command/topic/TopicListSubCommand.java | 6 +-
.../command/broker/BrokerStatusSubCommandTest.java | 68 +-
.../broker/CleanExpiredCQSubCommandTest.java | 56 +-
.../broker/CleanUnusedTopicCommandTest.java | 56 +-
.../command/broker/GetBrokerConfigCommandTest.java | 73 +-
.../broker/UpdateBrokerConfigSubCommandTest.java | 52 +-
.../ConsumerConnectionSubCommandTest.java | 87 +--
.../ProducerConnectionSubCommandTest.java | 80 +--
.../consumer/ConsumerProgressSubCommandTest.java | 103 +--
.../consumer/ConsumerStatusSubCommandTest.java | 113 +--
.../consumer/GetConsumerConfigSubCommandTest.java | 104 +--
.../command/message/ConsumeMessageCommandTest.java | 39 +-
.../message/QueryMsgByUniqueKeySubCommandTest.java | 6 +-
.../message/QueryMsgTraceByIdSubCommandTest.java | 126 ++--
.../namesrv/AddWritePermSubCommandTest.java | 38 +
.../namesrv/GetNamesrvConfigCommandTest.java | 77 +--
.../command/namesrv/UpdateKvConfigCommandTest.java | 56 +-
.../namesrv/WipeWritePermSubCommandTest.java | 82 +--
.../offset/GetConsumerStatusCommandTest.java | 70 +-
.../offset/ResetOffsetByTimeCommandTest.java | 88 +--
.../SkipAccumulationCommandTest.java} | 16 +-
.../tools/command/server/NameServerMocker.java | 67 ++
.../tools/command/server/ServerResponseMocker.java | 153 +++++
254 files changed, 8322 insertions(+), 3927 deletions(-)
diff --cc .travis.yml
index 88058f3,5f08c85..837ae1f
--- a/.travis.yml
+++ b/.travis.yml
@@@ -4,9 -4,11 +4,11 @@@ notifications
email:
recipients:
- dev@rocketmq.apache.org
+ if: branch = develop OR branch = master
on_success: change
on_failure: always
-
+
+
language: java
matrix:
@@@ -43,11 -48,11 +48,11 @@@ before_script
- ulimit -c unlimited
script:
- - mvn verify
- - mvn -B verify -DskipTests
++ - mvn verify -DskipTests
- travis_retry mvn -B clean apache-rat:check
- - travis_retry mvn -B package jacoco:report coveralls:report
+ - travis_retry mvn -B install jacoco:report coveralls:report
+ - travis_retry mvn -B clean install -pl test -Pit-test
after_success:
- - mvn clean install -Pit-test
- mvn sonar:sonar -Psonar-apache
- bash <(curl -s https://codecov.io/bash) || echo 'Codecov failed to upload'
diff --cc broker/pom.xml
index 2c5a407,fee6d9e..bb83991
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@@ -67,9 -67,9 +67,13 @@@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcpkix-jdk15on</artifactId>
+ </dependency>
++ <dependency>
+ <groupId>com.googlecode.concurrentlinkedhashmap</groupId>
+ <artifactId>concurrentlinkedhashmap-lru</artifactId>
+ </dependency>
</dependencies>
<build>
diff --cc broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 7cf48c9,7bfc618..ef67ade
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@@ -31,13 -47,13 +31,15 @@@ import org.apache.rocketmq.broker.filte
import org.apache.rocketmq.broker.filtersrv.FilterServerManager;
import org.apache.rocketmq.broker.latency.BrokerFastFailure;
import org.apache.rocketmq.broker.latency.BrokerFixedThreadPoolExecutor;
+import org.apache.rocketmq.broker.loadbalance.AssignmentManager;
+ import org.apache.rocketmq.broker.longpolling.LmqPullRequestHoldService;
import org.apache.rocketmq.broker.longpolling.NotifyMessageArrivingListener;
import org.apache.rocketmq.broker.longpolling.PullRequestHoldService;
import org.apache.rocketmq.broker.mqtrace.ConsumeMessageHook;
import org.apache.rocketmq.broker.mqtrace.SendMessageHook;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
+ import org.apache.rocketmq.broker.offset.LmqConsumerOffsetManager;
import org.apache.rocketmq.broker.out.BrokerOuterAPI;
import org.apache.rocketmq.broker.plugin.MessageStoreFactory;
import org.apache.rocketmq.broker.plugin.MessageStorePluginContext;
@@@ -54,10 -66,10 +56,12 @@@ import org.apache.rocketmq.broker.proce
import org.apache.rocketmq.broker.processor.ReplyMessageProcessor;
import org.apache.rocketmq.broker.processor.SendMessageProcessor;
import org.apache.rocketmq.broker.slave.SlaveSynchronize;
+ import org.apache.rocketmq.broker.subscription.LmqSubscriptionGroupManager;
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
+ import org.apache.rocketmq.broker.topic.LmqTopicConfigManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
+import org.apache.rocketmq.broker.topic.TopicQueueMappingCleanService;
+import org.apache.rocketmq.broker.topic.TopicQueueMappingManager;
import org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener;
import org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService;
import org.apache.rocketmq.broker.transaction.TransactionalMessageService;
@@@ -101,30 -110,7 +105,31 @@@ import org.apache.rocketmq.store.config
import org.apache.rocketmq.store.dledger.DLedgerCommitLog;
import org.apache.rocketmq.store.stats.BrokerStats;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+ import org.apache.rocketmq.store.stats.LmqBrokerStatsManager;
public class BrokerController {
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
@@@ -160,7 -137,7 +165,8 @@@
"BrokerControllerScheduledThread"));
private final SlaveSynchronize slaveSynchronize;
private final BlockingQueue<Runnable> sendThreadPoolQueue;
+ private final BlockingQueue<Runnable> putThreadPoolQueue;
+ private final BlockingQueue<Runnable> ackThreadPoolQueue;
private final BlockingQueue<Runnable> pullThreadPoolQueue;
private final BlockingQueue<Runnable> replyThreadPoolQueue;
private final BlockingQueue<Runnable> queryThreadPoolQueue;
@@@ -177,10 -153,9 +183,11 @@@
private RemotingServer remotingServer;
private RemotingServer fastRemotingServer;
private TopicConfigManager topicConfigManager;
+ private TopicQueueMappingManager topicQueueMappingManager;
private ExecutorService sendMessageExecutor;
+ private ExecutorService putMessageFutureExecutor;
private ExecutorService pullMessageExecutor;
+ private ExecutorService ackMessageExecutor;
private ExecutorService replyMessageExecutor;
private ExecutorService queryMessageExecutor;
private ExecutorService adminBrokerExecutor;
@@@ -213,17 -185,11 +220,16 @@@
this.nettyServerConfig = nettyServerConfig;
this.nettyClientConfig = nettyClientConfig;
this.messageStoreConfig = messageStoreConfig;
- this.consumerOffsetManager = new ConsumerOffsetManager(this);
- this.topicConfigManager = new TopicConfigManager(this);
- this.topicQueueMappingManager = new TopicQueueMappingManager(this);
+ this.consumerOffsetManager = messageStoreConfig.isEnableLmq() ? new LmqConsumerOffsetManager(this) : new ConsumerOffsetManager(this);
+ this.topicConfigManager = messageStoreConfig.isEnableLmq() ? new LmqTopicConfigManager(this) : new TopicConfigManager(this);
this.pullMessageProcessor = new PullMessageProcessor(this);
- this.pullRequestHoldService = new PullRequestHoldService(this);
+ this.pullRequestHoldService = messageStoreConfig.isEnableLmq() ? new LmqPullRequestHoldService(this) : new PullRequestHoldService(this);
- this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService);
+ this.popMessageProcessor = new PopMessageProcessor(this);
+ this.ackMessageProcessor = new AckMessageProcessor(this);
+ this.changeInvisibleTimeProcessor = new ChangeInvisibleTimeProcessor(this);
+ this.sendMessageProcessor = new SendMessageProcessor(this);
+ this.messageArrivingListener = new NotifyMessageArrivingListener(this.pullRequestHoldService,
+ this.popMessageProcessor);
this.consumerIdsChangeListener = new DefaultConsumerIdsChangeListener(this);
this.consumerManager = new ConsumerManager(this.consumerIdsChangeListener);
this.consumerFilterManager = new ConsumerFilterManager(this);
@@@ -231,18 -196,15 +237,19 @@@
this.producerManager = new ProducerManager();
this.clientHousekeepingService = new ClientHousekeepingService(this);
this.broker2Client = new Broker2Client(this);
- this.subscriptionGroupManager = new SubscriptionGroupManager(this);
+ this.subscriptionGroupManager = messageStoreConfig.isEnableLmq() ? new LmqSubscriptionGroupManager(this) : new SubscriptionGroupManager(this);
- this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
+ this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig, this);
this.filterServerManager = new FilterServerManager(this);
+ this.assignmentManager = new AssignmentManager(this);
+ this.queryAssignmentProcessor = new QueryAssignmentProcessor(this);
+ this.clientManageProcessor = new ClientManageProcessor(this);
this.slaveSynchronize = new SlaveSynchronize(this);
this.sendThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity());
+ this.putThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPutThreadPoolQueueCapacity());
this.pullThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getPullThreadPoolQueueCapacity());
+ this.ackThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getAckThreadPoolQueueCapacity());
this.replyThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getReplyThreadPoolQueueCapacity());
this.queryThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getQueryThreadPoolQueueCapacity());
this.clientManagerThreadPoolQueue = new LinkedBlockingQueue<Runnable>(this.brokerConfig.getClientManagerThreadPoolQueueCapacity());
@@@ -1410,27 -1261,7 +1430,27 @@@
}
}
- public ExecutorService getSendMessageExecutor() {
- return sendMessageExecutor;
+ public ExecutorService getPutMessageFutureExecutor() {
+ return putMessageFutureExecutor;
}
+
+ public long getShouldStartTime() {
+ return shouldStartTime;
+ }
+
+ public AssignmentManager getAssignmentManager() {
+ return assignmentManager;
+ }
+
+ public SendMessageProcessor getSendMessageProcessor() {
+ return sendMessageProcessor;
+ }
+
+ public QueryAssignmentProcessor getQueryAssignmentProcessor() {
+ return queryAssignmentProcessor;
+ }
+
+ public TopicQueueMappingCleanService getTopicQueueMappingCleanService() {
+ return topicQueueMappingCleanService;
+ }
}
diff --cc broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
index 6360d54,321c800..72739d8
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerPathConfigHelper.java
@@@ -35,18 -35,14 +35,14 @@@ public class BrokerPathConfigHelper
return rootDir + File.separator + "config" + File.separator + "topics.json";
}
- public static String getConsumerOffsetPath(final String rootDir) {
- return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
+ public static String getTopicQueueMappingPath(final String rootDir) {
+ return rootDir + File.separator + "config" + File.separator + "topicQueueMapping.json";
}
- public static String getLmqConsumerOffsetPath(final String rootDir) {
- return rootDir + File.separator + "config" + File.separator + "lmqConsumerOffset.json";
+ public static String getConsumerOffsetPath(final String rootDir) {
+ return rootDir + File.separator + "config" + File.separator + "consumerOffset.json";
}
- public static String getConsumerOrderInfoPath(final String rootDir) {
- return rootDir + File.separator + "config" + File.separator + "consumerOrderInfo.json";
- }
-
public static String getSubscriptionGroupPath(final String rootDir) {
return rootDir + File.separator + "config" + File.separator + "subscriptionGroup.json";
}
diff --cc broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
index 44edfe0,62fd1c5..699e43c
--- a/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/plugin/AbstractPluginMessageStore.java
@@@ -1,272 -1,272 +1,277 @@@
- /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
- package org.apache.rocketmq.broker.plugin;
-
- import java.util.HashMap;
- import java.util.LinkedList;
- import java.util.Set;
- import java.util.concurrent.CompletableFuture;
- import org.apache.rocketmq.common.message.MessageExt;
- import org.apache.rocketmq.store.CommitLogDispatcher;
- import org.apache.rocketmq.store.GetMessageResult;
- import org.apache.rocketmq.store.MessageExtBatch;
- import org.apache.rocketmq.store.MessageExtBrokerInner;
- import org.apache.rocketmq.store.MessageFilter;
- import org.apache.rocketmq.store.MessageStore;
- import org.apache.rocketmq.store.PutMessageResult;
- import org.apache.rocketmq.store.QueryMessageResult;
- import org.apache.rocketmq.store.SelectMappedBufferResult;
- import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
- import org.apache.rocketmq.store.stats.BrokerStatsManager;
-
- public abstract class AbstractPluginMessageStore implements MessageStore {
- protected MessageStore next = null;
- protected MessageStorePluginContext context;
-
- public AbstractPluginMessageStore(MessageStorePluginContext context, MessageStore next) {
- this.next = next;
- this.context = context;
- }
-
- @Override
- public long getEarliestMessageTime() {
- return next.getEarliestMessageTime();
- }
-
- @Override
- public long lockTimeMills() {
- return next.lockTimeMills();
- }
-
- @Override
- public boolean isOSPageCacheBusy() {
- return next.isOSPageCacheBusy();
- }
-
- @Override
- public boolean isTransientStorePoolDeficient() {
- return next.isTransientStorePoolDeficient();
- }
-
- @Override
- public boolean load() {
- return next.load();
- }
-
- @Override
- public void start() throws Exception {
- next.start();
- }
-
- @Override
- public void shutdown() {
- next.shutdown();
- }
-
- @Override
- public void destroy() {
- next.destroy();
- }
-
- @Override
- public PutMessageResult putMessage(MessageExtBrokerInner msg) {
- return next.putMessage(msg);
- }
-
- @Override
- public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
- return next.asyncPutMessage(msg);
- }
-
- @Override
- public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
- return next.asyncPutMessages(messageExtBatch);
- }
-
- @Override
- public GetMessageResult getMessage(String group, String topic, int queueId, long offset,
- int maxMsgNums, final MessageFilter messageFilter) {
- return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
- }
-
- @Override
- public long getMaxOffsetInQueue(String topic, int queueId) {
- return next.getMaxOffsetInQueue(topic, queueId);
- }
-
- @Override
- public long getMaxOffsetInQueue(String topic, int queueId, boolean committed) {
- return next.getMaxOffsetInQueue(topic, queueId, committed);
- }
-
- @Override
- public long getMinOffsetInQueue(String topic, int queueId) {
- return next.getMinOffsetInQueue(topic, queueId);
- }
-
- @Override
- public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) {
- return next.getCommitLogOffsetInQueue(topic, queueId, consumeQueueOffset);
- }
-
- @Override
- public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
- return next.getOffsetInQueueByTime(topic, queueId, timestamp);
- }
-
- @Override
- public MessageExt lookMessageByOffset(long commitLogOffset) {
- return next.lookMessageByOffset(commitLogOffset);
- }
-
- @Override
- public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
- return next.selectOneMessageByOffset(commitLogOffset);
- }
-
- @Override
- public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) {
- return next.selectOneMessageByOffset(commitLogOffset, msgSize);
- }
-
- @Override
- public String getRunningDataInfo() {
- return next.getRunningDataInfo();
- }
-
- @Override
- public HashMap<String, String> getRuntimeInfo() {
- return next.getRuntimeInfo();
- }
-
- @Override
- public long getMaxPhyOffset() {
- return next.getMaxPhyOffset();
- }
-
- @Override
- public long getMinPhyOffset() {
- return next.getMinPhyOffset();
- }
-
- @Override
- public long getEarliestMessageTime(String topic, int queueId) {
- return next.getEarliestMessageTime(topic, queueId);
- }
-
- @Override
- public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {
- return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset);
- }
-
- @Override
- public long getMessageTotalInQueue(String topic, int queueId) {
- return next.getMessageTotalInQueue(topic, queueId);
- }
-
- @Override
- public SelectMappedBufferResult getCommitLogData(long offset) {
- return next.getCommitLogData(offset);
- }
-
- @Override
- public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) {
- return next.appendToCommitLog(startOffset, data, dataStart, dataLength);
- }
-
- @Override
- public void executeDeleteFilesManually() {
- next.executeDeleteFilesManually();
- }
-
- @Override
- public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin,
- long end) {
- return next.queryMessage(topic, key, maxNum, begin, end);
- }
-
- @Override
- public void updateHaMasterAddress(String newAddr) {
- next.updateHaMasterAddress(newAddr);
- }
-
- @Override
- public long slaveFallBehindMuch() {
- return next.slaveFallBehindMuch();
- }
-
- @Override
- public long now() {
- return next.now();
- }
-
- @Override
- public int cleanUnusedTopic(Set<String> topics) {
- return next.cleanUnusedTopic(topics);
- }
-
- @Override
- public void cleanExpiredConsumerQueue() {
- next.cleanExpiredConsumerQueue();
- }
-
- @Override
- public boolean checkInDiskByConsumeOffset(String topic, int queueId, long consumeOffset) {
- return next.checkInDiskByConsumeOffset(topic, queueId, consumeOffset);
- }
-
- @Override
- public long dispatchBehindBytes() {
- return next.dispatchBehindBytes();
- }
-
- @Override
- public long flush() {
- return next.flush();
- }
-
- @Override
- public boolean resetWriteOffset(long phyOffset) {
- return next.resetWriteOffset(phyOffset);
- }
-
- @Override
- public long getConfirmOffset() {
- return next.getConfirmOffset();
- }
-
- @Override
- public void setConfirmOffset(long phyOffset) {
- next.setConfirmOffset(phyOffset);
- }
-
- @Override
- public LinkedList<CommitLogDispatcher> getDispatcherList() {
- return next.getDispatcherList();
- }
-
- @Override
- public ConsumeQueueInterface getConsumeQueue(String topic, int queueId) {
- return next.getConsumeQueue(topic, queueId);
- }
-
- @Override
- public BrokerStatsManager getBrokerStatsManager() {
- return next.getBrokerStatsManager();
- };
- }
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.rocketmq.broker.plugin;
+
+ import java.util.HashMap;
+ import java.util.LinkedList;
+ import java.util.Set;
+ import java.util.concurrent.CompletableFuture;
+ import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.message.MessageExtBatch;
+ import org.apache.rocketmq.store.CommitLogDispatcher;
-import org.apache.rocketmq.store.ConsumeQueue;
+ import org.apache.rocketmq.store.GetMessageResult;
++import org.apache.rocketmq.store.MessageExtBatch;
+ import org.apache.rocketmq.store.MessageExtBrokerInner;
+ import org.apache.rocketmq.store.MessageFilter;
+ import org.apache.rocketmq.store.MessageStore;
+ import org.apache.rocketmq.store.PutMessageResult;
+ import org.apache.rocketmq.store.QueryMessageResult;
+ import org.apache.rocketmq.store.SelectMappedBufferResult;
++import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
+ import org.apache.rocketmq.store.stats.BrokerStatsManager;
+
+ public abstract class AbstractPluginMessageStore implements MessageStore {
+ protected MessageStore next = null;
+ protected MessageStorePluginContext context;
+
+ public AbstractPluginMessageStore(MessageStorePluginContext context, MessageStore next) {
+ this.next = next;
+ this.context = context;
+ }
+
+ @Override
+ public long getEarliestMessageTime() {
+ return next.getEarliestMessageTime();
+ }
+
+ @Override
+ public long lockTimeMills() {
+ return next.lockTimeMills();
+ }
+
+ @Override
+ public boolean isOSPageCacheBusy() {
+ return next.isOSPageCacheBusy();
+ }
+
+ @Override
+ public boolean isTransientStorePoolDeficient() {
+ return next.isTransientStorePoolDeficient();
+ }
+
+ @Override
+ public boolean load() {
+ return next.load();
+ }
+
+ @Override
+ public void start() throws Exception {
+ next.start();
+ }
+
+ @Override
+ public void shutdown() {
+ next.shutdown();
+ }
+
+ @Override
+ public void destroy() {
+ next.destroy();
+ }
+
+ @Override
+ public PutMessageResult putMessage(MessageExtBrokerInner msg) {
+ return next.putMessage(msg);
+ }
+
+ @Override
+ public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {
+ return next.asyncPutMessage(msg);
+ }
+
+ @Override
+ public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
+ return next.asyncPutMessages(messageExtBatch);
+ }
+
+ @Override
+ public GetMessageResult getMessage(String group, String topic, int queueId, long offset,
+ int maxMsgNums, final MessageFilter messageFilter) {
+ return next.getMessage(group, topic, queueId, offset, maxMsgNums, messageFilter);
+ }
+
+ @Override
+ public long getMaxOffsetInQueue(String topic, int queueId) {
+ return next.getMaxOffsetInQueue(topic, queueId);
+ }
+
+ @Override
++ public long getMaxOffsetInQueue(String topic, int queueId, boolean committed) {
++ return next.getMaxOffsetInQueue(topic, queueId, committed);
++ }
++
++ @Override
+ public long getMinOffsetInQueue(String topic, int queueId) {
+ return next.getMinOffsetInQueue(topic, queueId);
+ }
+
+ @Override
+ public long getCommitLogOffsetInQueue(String topic, int queueId, long consumeQueueOffset) {
+ return next.getCommitLogOffsetInQueue(topic, queueId, consumeQueueOffset);
+ }
+
+ @Override
+ public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
+ return next.getOffsetInQueueByTime(topic, queueId, timestamp);
+ }
+
+ @Override
+ public MessageExt lookMessageByOffset(long commitLogOffset) {
+ return next.lookMessageByOffset(commitLogOffset);
+ }
+
+ @Override
+ public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {
+ return next.selectOneMessageByOffset(commitLogOffset);
+ }
+
+ @Override
+ public SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset, int msgSize) {
+ return next.selectOneMessageByOffset(commitLogOffset, msgSize);
+ }
+
+ @Override
+ public String getRunningDataInfo() {
+ return next.getRunningDataInfo();
+ }
+
+ @Override
+ public HashMap<String, String> getRuntimeInfo() {
+ return next.getRuntimeInfo();
+ }
+
+ @Override
+ public long getMaxPhyOffset() {
+ return next.getMaxPhyOffset();
+ }
+
+ @Override
+ public long getMinPhyOffset() {
+ return next.getMinPhyOffset();
+ }
+
+ @Override
+ public long getEarliestMessageTime(String topic, int queueId) {
+ return next.getEarliestMessageTime(topic, queueId);
+ }
+
+ @Override
+ public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {
+ return next.getMessageStoreTimeStamp(topic, queueId, consumeQueueOffset);
+ }
+
+ @Override
+ public long getMessageTotalInQueue(String topic, int queueId) {
+ return next.getMessageTotalInQueue(topic, queueId);
+ }
+
+ @Override
+ public SelectMappedBufferResult getCommitLogData(long offset) {
+ return next.getCommitLogData(offset);
+ }
+
+ @Override
+ public boolean appendToCommitLog(long startOffset, byte[] data, int dataStart, int dataLength) {
+ return next.appendToCommitLog(startOffset, data, dataStart, dataLength);
+ }
+
+ @Override
+ public void executeDeleteFilesManually() {
+ next.executeDeleteFilesManually();
+ }
+
+ @Override
+ public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin,
+ long end) {
+ return next.queryMessage(topic, key, maxNum, begin, end);
+ }
+
+ @Override
+ public void updateHaMasterAddress(String newAddr) {
+ next.updateHaMasterAddress(newAddr);
+ }
+
+ @Override
+ public long slaveFallBehindMuch() {
+ return next.slaveFallBehindMuch();
+ }
+
+ @Override
+ public long now() {
+ return next.now();
+ }
+
+ @Override
+ public int cleanUnusedTopic(Set<String> topics) {
+ return next.cleanUnusedTopic(topics);
+ }
+
+ @Override
+ public void cleanExpiredConsumerQueue() {
+ next.cleanExpiredConsumerQueue();
+ }
+
+ @Override
+ public boolean checkInDiskByConsumeOffset(String topic, int queueId, long consumeOffset) {
+ return next.checkInDiskByConsumeOffset(topic, queueId, consumeOffset);
+ }
+
+ @Override
+ public long dispatchBehindBytes() {
+ return next.dispatchBehindBytes();
+ }
+
+ @Override
+ public long flush() {
+ return next.flush();
+ }
+
+ @Override
+ public boolean resetWriteOffset(long phyOffset) {
+ return next.resetWriteOffset(phyOffset);
+ }
+
+ @Override
+ public long getConfirmOffset() {
+ return next.getConfirmOffset();
+ }
+
+ @Override
+ public void setConfirmOffset(long phyOffset) {
+ next.setConfirmOffset(phyOffset);
+ }
+
+ @Override
+ public LinkedList<CommitLogDispatcher> getDispatcherList() {
+ return next.getDispatcherList();
+ }
+
+ @Override
- public ConsumeQueue getConsumeQueue(String topic, int queueId) {
++ public ConsumeQueueInterface getConsumeQueue(String topic, int queueId) {
+ return next.getConsumeQueue(topic, queueId);
+ }
+
+ @Override
+ public BrokerStatsManager getBrokerStatsManager() {
+ return next.getBrokerStatsManager();
+ }
+
+ @Override
+ public void cleanUnusedLmqTopic(String topic) {
+ next.cleanUnusedLmqTopic(topic);
+ }
+ }
diff --cc broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index c1819a5,9d188ab..d79ae4a
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@@ -371,9 -292,14 +371,16 @@@ public class AdminBrokerProcessor exten
return response;
}
+ if (MixAll.isLmq(topic)) {
+ this.brokerController.getMessageStore().cleanUnusedLmqTopic(topic);
+ response.setCode(ResponseCode.SUCCESS);
+ response.setRemark(null);
+ return response;
+ }
+
this.brokerController.getTopicConfigManager().deleteTopicConfig(topic);
+ this.brokerController.getTopicQueueMappingManager().delete(topic);
+
this.brokerController.getMessageStore()
.cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
diff --cc broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
index 0f1ca23,20665a3..806dcd3
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PullMessageProcessor.java
@@@ -100,202 -89,9 +100,203 @@@ public class PullMessageProcessor exten
return false;
}
+
+
+ private RemotingCommand rewriteRequestForStaticTopic(PullMessageRequestHeader requestHeader, TopicQueueMappingContext mappingContext) {
+ try {
+ if (mappingContext.getMappingDetail() == null) {
+ return null;
+ }
+ TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
+ String topic = mappingContext.getTopic();
+ Integer globalId = mappingContext.getGlobalId();
+ // if the leader? consider the order consumer, which will lock the mq
+ if (!mappingContext.isLeader()) {
+ return buildErrorResponse(ResponseCode.NOT_LEADER_FOR_QUEUE, String.format("%s-%d cannot find mapping item in request process of current broker %s", topic, globalId, mappingDetail.getBname()));
+ }
+ Long globalOffset = requestHeader.getQueueOffset();
+
+ LogicQueueMappingItem mappingItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), globalOffset, true);
+ mappingContext.setCurrentItem(mappingItem);
+
+ if (globalOffset < mappingItem.getLogicOffset()) {
+ //handleOffsetMoved
+ //If the physical queue is reused, we should handle the PULL_OFFSET_MOVED independently
+ //Otherwise, we could just transfer it to the physical process
+ }
+ //below are physical info
+ String bname = mappingItem.getBname();
+ Integer phyQueueId = mappingItem.getQueueId();
+ Long phyQueueOffset = mappingItem.computePhysicalQueueOffset(globalOffset);
+ requestHeader.setQueueId(phyQueueId);
+ requestHeader.setQueueOffset(phyQueueOffset);
+ if (mappingItem.checkIfEndOffsetDecided()
+ && requestHeader.getMaxMsgNums() != null) {
+ requestHeader.setMaxMsgNums((int) Math.min(mappingItem.getEndOffset() - mappingItem.getStartOffset(), requestHeader.getMaxMsgNums()));
+ }
+
+ if (mappingDetail.getBname().equals(bname)) {
+ //just let it go, do the local pull process
+ return null;
+ }
+
+ int sysFlag = requestHeader.getSysFlag();
+ requestHeader.setLo(false);
+ requestHeader.setBname(bname);
+ sysFlag = PullSysFlag.clearSuspendFlag(sysFlag);
+ sysFlag = PullSysFlag.clearCommitOffsetFlag(sysFlag);
+ requestHeader.setSysFlag(sysFlag);
+ RpcRequest rpcRequest = new RpcRequest(RequestCode.PULL_MESSAGE, requestHeader, null);
+ RpcResponse rpcResponse = this.brokerController.getBrokerOuterAPI().getRpcClient().invoke(rpcRequest, this.brokerController.getBrokerConfig().getForwardTimeout()).get();
+ if (rpcResponse.getException() != null) {
+ throw rpcResponse.getException();
+ }
+
+ PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) rpcResponse.getHeader();
+ {
+ RemotingCommand rewriteResult = rewriteResponseForStaticTopic(requestHeader, responseHeader, mappingContext, rpcResponse.getCode());
+ if (rewriteResult != null) {
+ return rewriteResult;
+ }
+ }
+ return RpcClientUtils.createCommandForRpcResponse(rpcResponse);
+ } catch (Throwable t) {
+ return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+ }
+ }
+
+ private RemotingCommand rewriteResponseForStaticTopic(PullMessageRequestHeader requestHeader, PullMessageResponseHeader responseHeader,
+ TopicQueueMappingContext mappingContext, final int code) {
+ try {
+ if (mappingContext.getMappingDetail() == null) {
+ return null;
+ }
+ TopicQueueMappingDetail mappingDetail = mappingContext.getMappingDetail();
+ LogicQueueMappingItem leaderItem = mappingContext.getLeaderItem();
+
+ LogicQueueMappingItem currentItem = mappingContext.getCurrentItem();
+
+ LogicQueueMappingItem earlistItem = TopicQueueMappingUtils.findLogicQueueMappingItem(mappingContext.getMappingItemList(), 0L, true);
+
+ assert currentItem.getLogicOffset() >= 0;
+
+ long requestOffset = requestHeader.getQueueOffset();
+ long nextBeginOffset = responseHeader.getNextBeginOffset();
+ long minOffset = responseHeader.getMinOffset();
+ long maxOffset = responseHeader.getMaxOffset();
+ int responseCode = code;
+
+ //consider the following situations
+ // 1. read from slave, currently not supported
+ // 2. the middle queue is truncated because of deleting commitlog
+ if (code != ResponseCode.SUCCESS) {
+ //note the currentItem maybe both the leader and the earliest
+ boolean isRevised = false;
+ if (leaderItem.getGen() == currentItem.getGen()) {
+ //read the leader
+ if (requestOffset > maxOffset) {
+ //actually, we need do nothing, but keep the code structure here
+ if (code == ResponseCode.PULL_OFFSET_MOVED) {
+ responseCode = ResponseCode.PULL_OFFSET_MOVED;
+ nextBeginOffset = maxOffset;
+ } else {
+ //maybe current broker is the slave
+ responseCode = code;
+ }
+ } else if (requestOffset < minOffset) {
+ nextBeginOffset = minOffset;
+ responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
+ } else {
+ responseCode = code;
+ }
+ }
+ //note the currentItem maybe both the leader and the earliest
+ if (earlistItem.getGen() == currentItem.getGen()) {
+ //read the earliest one
+ if (requestOffset < minOffset) {
+ if (code == ResponseCode.PULL_OFFSET_MOVED) {
+ responseCode = ResponseCode.PULL_OFFSET_MOVED;
+ nextBeginOffset = minOffset;
+ } else {
+ //maybe read from slave, but we still set it to moved
+ responseCode = ResponseCode.PULL_OFFSET_MOVED;
+ nextBeginOffset = minOffset;
+ }
+ } else if (requestOffset >= maxOffset) {
+ //just move to another item
+ LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true);
+ if (nextItem != null) {
+ isRevised = true;
+ currentItem = nextItem;
+ nextBeginOffset = currentItem.getStartOffset();
+ minOffset = currentItem.getStartOffset();
+ maxOffset = minOffset;
+ responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
+ } else {
+ //maybe the next one's logic offset is -1
+ responseCode = ResponseCode.PULL_NOT_FOUND;
+ }
+ } else {
+ //let it go
+ responseCode = code;
+ }
+ }
+
+ //read from the middle item, ignore the PULL_OFFSET_MOVED
+ if (!isRevised
+ && leaderItem.getGen() != currentItem.getGen()
+ && earlistItem.getGen() != currentItem.getGen()) {
+ if (requestOffset < minOffset) {
+ nextBeginOffset = minOffset;
+ responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
+ } else if (requestOffset >= maxOffset) {
+ //just move to another item
+ LogicQueueMappingItem nextItem = TopicQueueMappingUtils.findNext(mappingContext.getMappingItemList(), currentItem, true);
+ if (nextItem != null) {
+ currentItem = nextItem;
+ nextBeginOffset = currentItem.getStartOffset();
+ minOffset = currentItem.getStartOffset();
+ maxOffset = minOffset;
+ responseCode = ResponseCode.PULL_RETRY_IMMEDIATELY;
+ } else {
+ //maybe the next one's logic offset is -1
+ responseCode = ResponseCode.PULL_NOT_FOUND;
+ }
+ } else {
+ responseCode = code;
+ }
+ }
+ }
+
+ //handle nextBeginOffset
+ //the next begin offset should no more than the end offset
+ if (currentItem.checkIfEndOffsetDecided()
+ && nextBeginOffset >= currentItem.getEndOffset()) {
+ nextBeginOffset = currentItem.getEndOffset();
+ }
+ responseHeader.setNextBeginOffset(currentItem.computeStaticQueueOffsetStrictly(nextBeginOffset));
+ //handle min offset
+ responseHeader.setMinOffset(currentItem.computeStaticQueueOffsetStrictly(Math.max(currentItem.getStartOffset(), minOffset)));
+ //handle max offset
+ responseHeader.setMaxOffset(Math.max(currentItem.computeStaticQueueOffsetStrictly(maxOffset),
+ TopicQueueMappingDetail.computeMaxOffsetFromMapping(mappingDetail, mappingContext.getGlobalId())));
+ //set the offsetDelta
+ responseHeader.setOffsetDelta(currentItem.computeOffsetDelta());
+
+ if (code != ResponseCode.SUCCESS) {
+ return RemotingCommand.createResponseCommandWithHeader(responseCode, responseHeader);
+ } else {
+ return null;
+ }
+ } catch (Throwable t) {
+ return buildErrorResponse(ResponseCode.SYSTEM_ERROR, t.getMessage());
+ }
+ }
+
+
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)
throws RemotingCommandException {
+ final long beginTimeMills = this.brokerController.getMessageStore().now();
RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);
final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();
final PullMessageRequestHeader requestHeader =
diff --cc broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index a3f5c4e,c8ea4d3..f02358f
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@@ -60,8 -54,9 +60,10 @@@ import org.apache.rocketmq.remoting.exc
import org.apache.rocketmq.remoting.netty.NettyRequestProcessor;
import org.apache.rocketmq.remoting.netty.RemotingResponseCallback;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+ import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageExtBatch;
import org.apache.rocketmq.store.MessageExtBrokerInner;
+ import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
diff --cc broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
index acf05fb,a2abdc0..e4814f0
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java
@@@ -19,23 -20,35 +20,38 @@@ import com.alibaba.fastjson.JSON
import com.google.common.collect.Sets;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
++import java.net.InetSocketAddress;
import org.apache.rocketmq.broker.BrokerController;
+ import org.apache.rocketmq.broker.client.ConsumerGroupInfo;
+ import org.apache.rocketmq.broker.client.ConsumerManager;
+ import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
import org.apache.rocketmq.broker.topic.TopicConfigManager;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.TopicQueueId;
import org.apache.rocketmq.common.constant.PermName;
+ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.RequestCode;
import org.apache.rocketmq.common.protocol.ResponseCode;
+ import org.apache.rocketmq.common.protocol.body.LockBatchRequestBody;
+ import org.apache.rocketmq.common.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.common.protocol.header.CreateTopicRequestHeader;
import org.apache.rocketmq.common.protocol.header.DeleteTopicRequestHeader;
+ import org.apache.rocketmq.common.protocol.header.GetAllTopicConfigResponseHeader;
+ import org.apache.rocketmq.common.protocol.header.GetEarliestMsgStoretimeRequestHeader;
+ import org.apache.rocketmq.common.protocol.header.GetMaxOffsetRequestHeader;
+ import org.apache.rocketmq.common.protocol.header.GetMinOffsetRequestHeader;
+import org.apache.rocketmq.common.protocol.header.GetTopicConfigRequestHeader;
import org.apache.rocketmq.common.protocol.header.ResumeCheckHalfMessageRequestHeader;
+ import org.apache.rocketmq.common.protocol.header.SearchOffsetRequestHeader;
+ import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
+ import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+ import org.apache.rocketmq.common.subscription.SubscriptionGroupConfig;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.netty.NettyClientConfig;
@@@ -43,13 -56,16 +59,16 @@@ import org.apache.rocketmq.remoting.net
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
+ import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.MappedFile;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
import org.apache.rocketmq.store.PutMessageStatus;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.logfile.DefaultMappedFile;
+ import org.apache.rocketmq.store.schedule.ScheduleMessageService;
+ import org.apache.rocketmq.store.stats.BrokerStats;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@@ -57,12 -73,12 +76,14 @@@ import org.mockito.Mock
import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
- import java.net.InetSocketAddress;
+ import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
+ import java.util.HashMap;
+ import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.LongAdder;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
@@@ -88,21 -104,28 +112,36 @@@ public class AdminBrokerProcessorTest
@Mock
private MessageStore messageStore;
- private Set<String> systemTopicSet;
+ @Mock
+ private SendMessageProcessor sendMessageProcessor;
@Mock
- private Channel channel;
+ private ConcurrentMap<TopicQueueId, LongAdder> inFlyWritingCouterMap;
+
+ private Set<String> systemTopicSet;
+ private String topic;
+
+ @Mock
+ private SocketAddress socketAddress;
+ @Mock
+ private BrokerStats brokerStats;
+ @Mock
+ private TopicConfigManager topicConfigManager;
+ @Mock
+ private ConsumerManager consumerManager;
+ @Mock
+ private ConsumerOffsetManager consumerOffsetManager;
+ @Mock
+ private DefaultMessageStore defaultMessageStore;
+ @Mock
+ private ScheduleMessageService scheduleMessageService;
+
@Before
- public void init() {
+ public void init() throws Exception {
brokerController.setMessageStore(messageStore);
+
+ //doReturn(sendMessageProcessor).when(brokerController).getSendMessageProcessor();
+
adminBrokerProcessor = new AdminBrokerProcessor(brokerController);
systemTopicSet = Sets.newHashSet(
@@@ -184,33 -201,224 +223,252 @@@
}
@Test
+ public void testGetAllTopicConfig() throws Exception {
+ GetAllTopicConfigResponseHeader getAllTopicConfigResponseHeader = new GetAllTopicConfigResponseHeader();
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, getAllTopicConfigResponseHeader);
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testUpdateBrokerConfig() throws Exception {
+ handlerContext = mock(ChannelHandlerContext.class);
+ channel = mock(Channel.class);
+ when(handlerContext.channel()).thenReturn(channel);
+ socketAddress = mock(SocketAddress.class);
+ when(channel.remoteAddress()).thenReturn(socketAddress);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_BROKER_CONFIG, null);
+ Map<String, String> bodyMap = new HashMap<>();
+ bodyMap.put("key", "value");
+ request.setBody(bodyMap.toString().getBytes());
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetBrokerConfig() throws Exception {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_CONFIG, null);
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testSearchOffsetByTimestamp() throws Exception {
+ messageStore = mock(MessageStore.class);
+ when(messageStore.getOffsetInQueueByTime(anyString(), anyInt(), anyLong())).thenReturn(Long.MIN_VALUE);
+ when(brokerController.getMessageStore()).thenReturn(messageStore);
+ SearchOffsetRequestHeader searchOffsetRequestHeader = new SearchOffsetRequestHeader();
+ searchOffsetRequestHeader.setTopic("topic");
+ searchOffsetRequestHeader.setQueueId(0);
+ searchOffsetRequestHeader.setTimestamp(System.currentTimeMillis());
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.SEARCH_OFFSET_BY_TIMESTAMP, searchOffsetRequestHeader);
+ request.addExtField("topic", "topic");
+ request.addExtField("queueId", "0");
+ request.addExtField("timestamp", System.currentTimeMillis() + "");
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetMaxOffset() throws Exception {
+ messageStore = mock(MessageStore.class);
+ when(messageStore.getMaxOffsetInQueue(anyString(), anyInt())).thenReturn(Long.MIN_VALUE);
+ when(brokerController.getMessageStore()).thenReturn(messageStore);
+ GetMaxOffsetRequestHeader getMaxOffsetRequestHeader = new GetMaxOffsetRequestHeader();
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MAX_OFFSET, getMaxOffsetRequestHeader);
+ request.addExtField("topic", "topic");
+ request.addExtField("queueId", "0");
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetMinOffset() throws Exception {
+ messageStore = mock(MessageStore.class);
+ when(messageStore.getMinOffsetInQueue(anyString(), anyInt())).thenReturn(Long.MIN_VALUE);
+ when(brokerController.getMessageStore()).thenReturn(messageStore);
+ GetMinOffsetRequestHeader getMinOffsetRequestHeader = new GetMinOffsetRequestHeader();
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_MIN_OFFSET, getMinOffsetRequestHeader);
+ request.addExtField("topic", "topic");
+ request.addExtField("queueId", "0");
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetEarliestMsgStoretime() throws Exception {
+ messageStore = mock(MessageStore.class);
+ when(brokerController.getMessageStore()).thenReturn(messageStore);
+ GetEarliestMsgStoretimeRequestHeader getEarliestMsgStoretimeRequestHeader = new GetEarliestMsgStoretimeRequestHeader();
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_EARLIEST_MSG_STORETIME, getEarliestMsgStoretimeRequestHeader);
+ request.addExtField("topic", "topic");
+ request.addExtField("queueId", "0");
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetBrokerRuntimeInfo() throws Exception {
+ brokerStats = mock(BrokerStats.class);
+ when(brokerController.getBrokerStats()).thenReturn(brokerStats);
+ when(brokerStats.getMsgPutTotalYesterdayMorning()).thenReturn(Long.MIN_VALUE);
+ when(brokerStats.getMsgPutTotalTodayMorning()).thenReturn(Long.MIN_VALUE);
+ when(brokerStats.getMsgPutTotalTodayNow()).thenReturn(Long.MIN_VALUE);
+ when(brokerStats.getMsgGetTotalTodayMorning()).thenReturn(Long.MIN_VALUE);
+ when(brokerStats.getMsgGetTotalTodayNow()).thenReturn(Long.MIN_VALUE);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_BROKER_RUNTIME_INFO, null);
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testLockBatchMQ() throws Exception {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);
+ LockBatchRequestBody lockBatchRequestBody = new LockBatchRequestBody();
+ lockBatchRequestBody.setClientId("1111");
+ lockBatchRequestBody.setConsumerGroup("group");
+ request.setBody(JSON.toJSON(lockBatchRequestBody).toString().getBytes());
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testUnlockBatchMQ() throws Exception {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNLOCK_BATCH_MQ, null);
+ UnlockBatchRequestBody unlockBatchRequestBody = new UnlockBatchRequestBody();
+ unlockBatchRequestBody.setClientId("11111");
+ unlockBatchRequestBody.setConsumerGroup("group");
+ request.setBody(JSON.toJSON(unlockBatchRequestBody).toString().getBytes());
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testUpdateAndCreateSubscriptionGroup() throws RemotingCommandException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_SUBSCRIPTIONGROUP, null);
+ SubscriptionGroupConfig subscriptionGroupConfig = new SubscriptionGroupConfig();
+ subscriptionGroupConfig.setBrokerId(1);
+ subscriptionGroupConfig.setGroupName("groupId");
+ subscriptionGroupConfig.setConsumeEnable(Boolean.TRUE);
+ subscriptionGroupConfig.setConsumeBroadcastEnable(Boolean.TRUE);
+ subscriptionGroupConfig.setRetryMaxTimes(111);
+ subscriptionGroupConfig.setConsumeFromMinEnable(Boolean.TRUE);
+ request.setBody(JSON.toJSON(subscriptionGroupConfig).toString().getBytes());
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetAllSubscriptionGroup() throws RemotingCommandException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null);
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testDeleteSubscriptionGroup() throws RemotingCommandException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP, null);
+ request.addExtField("groupName", "GID-Group-Name");
+ request.addExtField("removeOffset", "true");
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetTopicStatsInfo() throws RemotingCommandException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_STATS_INFO, null);
+ request.addExtField("topic", "topicTest");
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST);
+ topicConfigManager = mock(TopicConfigManager.class);
+ when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
+ TopicConfig topicConfig = new TopicConfig();
+ topicConfig.setTopicName("topicTest");
+ when(topicConfigManager.selectTopicConfig(anyString())).thenReturn(topicConfig);
+ RemotingCommand responseSuccess = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(responseSuccess.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetConsumerConnectionList() throws RemotingCommandException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_CONNECTION_LIST, null);
+ request.addExtField("consumerGroup", "GID-group-test");
+ consumerManager = mock(ConsumerManager.class);
+ when(brokerController.getConsumerManager()).thenReturn(consumerManager);
+ ConsumerGroupInfo consumerGroupInfo = new ConsumerGroupInfo("GID-group-test", ConsumeType.CONSUME_ACTIVELY, MessageModel.CLUSTERING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+ when(consumerManager.getConsumerGroupInfo(anyString())).thenReturn(consumerGroupInfo);
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetProducerConnectionList() throws RemotingCommandException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_PRODUCER_CONNECTION_LIST, null);
+ request.addExtField("producerGroup", "ProducerGroupId");
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SYSTEM_ERROR);
+ }
+
+ @Test
+ public void testGetConsumeStats() throws RemotingCommandException {
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUME_STATS, null);
+ request.addExtField("topic", "topicTest");
+ request.addExtField("consumerGroup", "GID-test");
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetAllConsumerOffset() throws RemotingCommandException {
+ consumerOffsetManager = mock(ConsumerOffsetManager.class);
+ when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
+ ConsumerOffsetManager consumerOffset = new ConsumerOffsetManager();
+ when(consumerOffsetManager.encode()).thenReturn(JSON.toJSONString(consumerOffset, false));
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null);
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
+ @Test
+ public void testGetAllDelayOffset() throws Exception {
+ defaultMessageStore = mock(DefaultMessageStore.class);
+ scheduleMessageService = mock(ScheduleMessageService.class);
+ when(brokerController.getMessageStore()).thenReturn(defaultMessageStore);
+ when(defaultMessageStore.getScheduleMessageService()).thenReturn(scheduleMessageService);
+ when(scheduleMessageService.encode()).thenReturn("content");
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null);
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ }
+
++ @Test
+ public void testGetTopicConfig() throws Exception {
+ String topic = "foobar";
+ brokerController.getTopicConfigManager().updateTopicConfig(new TopicConfig(topic));
+
+ {
+ GetTopicConfigRequestHeader requestHeader = new GetTopicConfigRequestHeader();
+ requestHeader.setTopic(topic);
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+ assertThat(response.getBody()).isNotEmpty();
+ }
+ {
+ GetTopicConfigRequestHeader requestHeader = new GetTopicConfigRequestHeader();
+ requestHeader.setTopic("aaaaaaa");
+ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_TOPIC_CONFIG, requestHeader);
+ request.makeCustomHeaderToNet();
+ RemotingCommand response = adminBrokerProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.TOPIC_NOT_EXIST);
+ assertThat(response.getRemark()).contains("No topic in this broker.");
+ }
+ }
+
+
+
+
private RemotingCommand buildCreateTopicRequest(String topic) {
CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
requestHeader.setTopic(topic);
diff --cc broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
index b4ae34d,1f81bdb..0a35851
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/SendMessageProcessorTest.java
@@@ -90,10 -88,10 +90,12 @@@ public class SendMessageProcessorTest
Channel mockChannel = mock(Channel.class);
when(mockChannel.remoteAddress()).thenReturn(new InetSocketAddress(1024));
when(handlerContext.channel()).thenReturn(mockChannel);
- when(messageStore.lookMessageByOffset(anyLong())).thenReturn(new MessageExt());
+ MessageExt messageExt = new MessageExt();
+ messageExt.setTopic(topic);
+ when(messageStore.lookMessageByOffset(anyLong())).thenReturn(messageExt);
sendMessageProcessor = new SendMessageProcessor(brokerController);
+
+ brokerController.getTopicConfigManager().updateTopicConfig(new TopicConfig(topic, 8, 8, PermName.PERM_WRITE|PermName.PERM_READ));
}
@Test
diff --cc client/pom.xml
index 80a58e6,51e5319..ca071ce
--- a/client/pom.xml
+++ b/client/pom.xml
@@@ -59,19 -59,5 +54,9 @@@
<version>0.33.0</version>
<scope>test</scope>
</dependency>
+ <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
</dependencies>
</project>
diff --cc client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
index fc0b29c,15b5bec..3740f9e
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStore.java
@@@ -228,11 -226,11 +228,11 @@@ public class RemoteBrokerOffsetStore im
private long fetchConsumeOffsetFromBroker(MessageQueue mq) throws RemotingException, MQBrokerException,
InterruptedException, MQClientException {
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
- FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, true);
++ FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
if (null == findBrokerResult) {
this.mQClientFactory.updateTopicRouteInfoFromNameServer(mq.getTopic());
- findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(this.mQClientFactory.getBrokerNameFromMessageQueue(mq));
- findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll.MASTER_ID, false);
++ findBrokerResult = this.mQClientFactory.findBrokerAddressInAdmin(mq.getBrokerName());
}
if (findBrokerResult != null) {
diff --cc client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
index 73e35c5,ba4eafa..9dfd10e
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQAdminImpl.java
@@@ -21,8 -21,8 +21,9 @@@ import java.util.ArrayList
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+ import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
diff --cc client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index 0181951,0cc5e3e..376e7dc
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@@ -44,10 -64,11 +44,13 @@@ import org.apache.rocketmq.common.Servi
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.filter.ExpressionType;
+ import org.apache.rocketmq.common.protocol.NamespaceUtil;
+ import org.apache.rocketmq.common.topic.TopicValidator;
+ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.message.MessageQueueAssignment;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
import org.apache.rocketmq.common.protocol.body.ConsumeMessageDirectlyResult;
import org.apache.rocketmq.common.protocol.body.ConsumerRunningInfo;
import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
@@@ -662,16 -638,8 +665,16 @@@ public class MQClientInstance
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
}
+ // Update endpoint map
+ {
+ ConcurrentMap<MessageQueue, String> mqEndPoints = topicRouteData2EndpointsForStaticTopic(topic, topicRouteData);
+ if (mqEndPoints != null && !mqEndPoints.isEmpty()) {
+ topicEndPointsTable.put(topic, mqEndPoints);
+ }
+ }
+
// Update Pub info
- {
+ if (!producerTable.isEmpty()) {
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
diff --cc client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
index 9b9b7e4,227ea44..af2e16e
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultLitePullConsumerTest.java
@@@ -95,9 -96,9 +95,7 @@@ public class DefaultLitePullConsumerTes
@Before
public void init() throws Exception {
ConcurrentMap<String, MQClientInstance> factoryTable = (ConcurrentMap<String, MQClientInstance>) FieldUtils.readDeclaredField(MQClientManager.getInstance(), "factoryTable", true);
- for (MQClientInstance instance : factoryTable.values()) {
- instance.shutdown();
- for (Map.Entry<String, MQClientInstance> entry : factoryTable.entrySet()) {
- entry.getValue().shutdown();
-- }
++ factoryTable.forEach((s, instance) -> instance.shutdown());
factoryTable.clear();
Field field = MQClientInstance.class.getDeclaredField("rebalanceService");
diff --cc client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
index abdd6bf,bfc87f1..2482ac1
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumerTest.java
@@@ -57,10 -56,9 +58,10 @@@ import org.apache.rocketmq.common.messa
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.protocol.header.PullMessageRequestHeader;
+import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.remoting.exception.RemotingException;
- import org.junit.After;
+ import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@@ -92,10 -88,9 +93,10 @@@ public class DefaultMQPushConsumerTest
@Mock
private MQClientAPIImpl mQClientAPIImpl;
+ private PullAPIWrapper pullAPIWrapper;
private RebalanceImpl rebalanceImpl;
- private DefaultMQPushConsumer pushConsumer;
- private RebalancePushImpl rebalancePushImpl;
+ private static DefaultMQPushConsumer pushConsumer;
+ private AtomicLong queueOffset = new AtomicLong(1024);;
@Before
public void init() throws Exception {
@@@ -105,6 -100,26 +106,27 @@@
}
factoryTable.clear();
+ when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
+ anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
+ .thenAnswer(new Answer<PullResult>() {
+ @Override
+ public PullResult answer(InvocationOnMock mock) throws Throwable {
+ PullMessageRequestHeader requestHeader = mock.getArgument(1);
+ MessageClientExt messageClientExt = new MessageClientExt();
+ messageClientExt.setTopic(topic);
+ messageClientExt.setQueueId(0);
+ messageClientExt.setMsgId("123");
+ messageClientExt.setBody(new byte[] {'a'});
+ messageClientExt.setOffsetMsgId("234");
+ messageClientExt.setBornHost(new InetSocketAddress(8080));
+ messageClientExt.setStoreHost(new InetSocketAddress(8080));
+ PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
+ ((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
+ return pullResult;
+ }
+ });
+
++
consumerGroup = "FooBarGroup" + System.currentTimeMillis();
pushConsumer = new DefaultMQPushConsumer(consumerGroup);
pushConsumer.setNamesrvAddr("127.0.0.1:9876");
@@@ -145,40 -154,12 +167,40 @@@
messageQueueSet.add(createPullRequest().getMessageQueue());
pushConsumerImpl.updateTopicSubscribeInfo(topic, messageQueueSet);
+ pushConsumerImpl.setmQClientFactory(mQClientFactory);
+
+ pullAPIWrapper = spy(new PullAPIWrapper(mQClientFactory, consumerGroup, false));
+ FieldUtils.writeDeclaredField(pushConsumerImpl, "pullAPIWrapper", pullAPIWrapper, true);
+
+ when(mQClientAPIImpl.pullMessage(anyString(), any(PullMessageRequestHeader.class),
+ anyLong(), any(CommunicationMode.class), nullable(PullCallback.class)))
+ .thenAnswer(new Answer<PullResult>() {
+ @Override
+ public PullResult answer(InvocationOnMock mock) throws Throwable {
+ PullMessageRequestHeader requestHeader = mock.getArgument(1);
+ MessageClientExt messageClientExt = new MessageClientExt();
+ messageClientExt.setTopic(topic);
+ messageClientExt.setQueueId(0);
+ messageClientExt.setQueueOffset(queueOffset.getAndIncrement());
+ messageClientExt.setMsgId("1024");
+ messageClientExt.setBody(msgBody);
+ messageClientExt.setOffsetMsgId("234");
+ messageClientExt.setBornHost(new InetSocketAddress(8080));
+ messageClientExt.setStoreHost(new InetSocketAddress(8080));
+ PullResult pullResult = createPullResult(requestHeader, PullStatus.FOUND, Collections.<MessageExt>singletonList(messageClientExt));
+ ((PullCallback) mock.getArgument(4)).onSuccess(pullResult);
+ return pullResult;
+ }
+ });
+
pushConsumer.subscribe(topic, "*");
pushConsumer.start();
+
+ mQClientFactory.registerConsumer(consumerGroup, pushConsumerImpl);
}
- @After
- public void terminate() {
+ @AfterClass
+ public static void terminate() {
pushConsumer.shutdown();
}
diff --cc client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
index 73cfefb,ec7a4cf..f4aead1
--- a/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/consumer/store/RemoteBrokerOffsetStoreTest.java
@@@ -24,8 -23,8 +24,9 @@@ import org.apache.rocketmq.client.excep
import org.apache.rocketmq.client.impl.FindBrokerResult;
import org.apache.rocketmq.client.impl.MQClientAPIImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+ import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.common.protocol.ResponseCode;
import org.apache.rocketmq.common.protocol.header.QueryConsumerOffsetRequestHeader;
import org.apache.rocketmq.common.protocol.header.UpdateConsumerOffsetRequestHeader;
import org.apache.rocketmq.remoting.exception.RemotingException;
@@@ -60,9 -59,8 +61,9 @@@ public class RemoteBrokerOffsetStoreTes
System.setProperty("rocketmq.client.localOffsetStoreDir", System.getProperty("java.io.tmpdir") + ".rocketmq_offsets");
String clientId = new ClientConfig().buildMQClientId() + "#TestNamespace" + System.currentTimeMillis();
when(mQClientFactory.getClientId()).thenReturn(clientId);
- when(mQClientFactory.findBrokerAddressInAdmin(brokerName)).thenReturn(new FindBrokerResult("127.0.0.1", false));
+ when(mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, false)).thenReturn(new FindBrokerResult("127.0.0.1", false));
when(mQClientFactory.getMQClientAPIImpl()).thenReturn(mqClientAPI);
+ when(mQClientFactory.getBrokerNameFromMessageQueue(any())).thenReturn(brokerName);
}
@Test
diff --cc client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
index 17b48eb,a60d88e..f0d6b42
--- a/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
+++ b/client/src/test/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImplTest.java
@@@ -22,29 -23,30 +23,31 @@@ import java.util.Set
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.consumer.store.OffsetStore;
+ import org.apache.rocketmq.client.consumer.store.ReadOffsetType;
+import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
+ import org.apache.rocketmq.client.impl.MQAdminImpl;
import org.apache.rocketmq.client.impl.factory.MQClientInstance;
+ import org.apache.rocketmq.common.MixAll;
+ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageQueue;
- import org.apache.rocketmq.common.message.MessageQueueAssignment;
- import org.apache.rocketmq.common.message.MessageRequestMode;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
+import org.apache.rocketmq.remoting.exception.RemotingException;
+import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
- import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Spy;
-import org.mockito.invocation.InvocationOnMock;
import org.mockito.junit.MockitoJUnitRunner;
-import org.mockito.stubbing.Answer;
import static org.assertj.core.api.Assertions.assertThat;
+ import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
- import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.doAnswer;
+ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
diff --cc common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 1b74eaa,401b457..7ce0456
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@@ -61,8 -60,8 +61,9 @@@ public class BrokerConfig
* thread numbers for send message thread pool.
*/
private int sendMessageThreadPoolNums = Math.min(Runtime.getRuntime().availableProcessors(), 4);
+ private int putMessageFutureThreadPoolNums = Math.min(Runtime.getRuntime().availableProcessors(), 4);
private int pullMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
+ private int ackMessageThreadPoolNums = 3;
private int processReplyMessageThreadPoolNums = 16 + Runtime.getRuntime().availableProcessors() * 2;
private int queryMessageThreadPoolNums = 8 + Runtime.getRuntime().availableProcessors();
@@@ -87,8 -85,8 +88,9 @@@
@ImportantField
private boolean fetchNamesrvAddrByAddressServer = false;
private int sendThreadPoolQueueCapacity = 10000;
+ private int putThreadPoolQueueCapacity = 10000;
private int pullThreadPoolQueueCapacity = 100000;
+ private int ackThreadPoolQueueCapacity = 100000;
private int replyThreadPoolQueueCapacity = 10000;
private int queryThreadPoolQueueCapacity = 20000;
private int clientManagerThreadPoolQueueCapacity = 1000000;
@@@ -962,59 -838,11 +993,67 @@@
this.autoDeleteUnusedStats = autoDeleteUnusedStats;
}
+ public long getLoadBalancePollNameServerInterval() {
+ return loadBalancePollNameServerInterval;
+ }
+
+ public void setLoadBalancePollNameServerInterval(long loadBalancePollNameServerInterval) {
+ this.loadBalancePollNameServerInterval = loadBalancePollNameServerInterval;
+ }
+
+ public int getCleanOfflineBrokerInterval() {
+ return cleanOfflineBrokerInterval;
+ }
+
+ public void setCleanOfflineBrokerInterval(int cleanOfflineBrokerInterval) {
+ this.cleanOfflineBrokerInterval = cleanOfflineBrokerInterval;
+ }
+
+ public int getLoadBalanceProcessorThreadPoolNums() {
+ return loadBalanceProcessorThreadPoolNums;
+ }
+
+ public void setLoadBalanceProcessorThreadPoolNums(int loadBalanceProcessorThreadPoolNums) {
+ this.loadBalanceProcessorThreadPoolNums = loadBalanceProcessorThreadPoolNums;
+ }
+
+ public boolean isServerLoadBalancerEnabled() {
+ return serverLoadBalancerEnabled;
+ }
+
+ public void setServerLoadBalancerEnabled(boolean serverLoadBalancerEnabled) {
+ this.serverLoadBalancerEnabled = serverLoadBalancerEnabled;
+ }
+
+ public MessageRequestMode getDefaultMessageRequestMode() {
+ return defaultMessageRequestMode;
+ }
+
+ public void setDefaultMessageRequestMode(String defaultMessageRequestMode) {
+ this.defaultMessageRequestMode = MessageRequestMode.valueOf(defaultMessageRequestMode);
+ }
+
+ public int getDefaultPopShareQueueNum() {
+ return defaultPopShareQueueNum;
+ }
+
+ public void setDefaultPopShareQueueNum(int defaultPopShareQueueNum) {
+ this.defaultPopShareQueueNum = defaultPopShareQueueNum;
+ }
+
+ public long getForwardTimeout() {
+ return forwardTimeout;
+ }
+
+ public void setForwardTimeout(long timeout) {
+ this.forwardTimeout = timeout;
+ }
++
+ public boolean isIsolateLogEnable() {
+ return isolateLogEnable;
+ }
+
+ public void setIsolateLogEnable(boolean isolateLogEnable) {
+ this.isolateLogEnable = isolateLogEnable;
+ }
}
diff --cc common/src/main/java/org/apache/rocketmq/common/MixAll.java
index cc12077,c2300d3..a15294f
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@@ -84,10 -83,9 +84,12 @@@ public class MixAll
public static final String CID_SYS_RMQ_TRANS = "CID_RMQ_SYS_TRANS";
public static final String ACL_CONF_TOOLS_FILE = "/conf/tools.yml";
public static final String REPLY_MESSAGE_FLAG = "reply";
+ public static final String LMQ_PREFIX = "%LMQ%";
+ public static final String MULTI_DISPATCH_QUEUE_SPLITTER = ",";
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+ public static final String LOGICAL_QUEUE_MOCK_BROKER_PREFIX = "__syslo__";
+ public static final String METADATA_SCOPE_GLOBAL = "__global__";
+ public static final String LOGICAL_QUEUE_MOCK_BROKER_NAME_NOT_EXIST = "__syslo__none__";
public static String getWSAddr() {
String wsDomainName = System.getProperty("rocketmq.namesrv.domain", DEFAULT_NAMESRV_ADDR_LOOKUP);
@@@ -448,11 -446,7 +450,14 @@@
return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
}
+ public static int compareInteger(int x, int y) {
+ return (x < y) ? -1 : ((x == y) ? 0 : 1);
+ }
+
+ public static int compareLong(long x, long y) {
+ return (x < y) ? -1 : ((x == y) ? 0 : 1);
+ }
+ public static boolean isLmq(String lmqMetaData) {
+ return lmqMetaData != null && lmqMetaData.startsWith(LMQ_PREFIX);
+ }
}
diff --cc common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
index b5c14e9,81b7823..105ef5b
--- a/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
+++ b/common/src/main/java/org/apache/rocketmq/common/message/MessageConst.java
@@@ -54,10 -52,8 +54,12 @@@ public class MessageConst
public static final String PROPERTY_PUSH_REPLY_TIME = "PUSH_REPLY_TIME";
public static final String PROPERTY_CLUSTER = "CLUSTER";
public static final String PROPERTY_MESSAGE_TYPE = "MSG_TYPE";
+ public static final String PROPERTY_POP_CK = "POP_CK";
+ public static final String PROPERTY_FIRST_POP_TIME = "1ST_POP_TIME";
+ public static final String PROPERTY_FORWARD_QUEUE_ID = "PROPERTY_FORWARD_QUEUE_ID";
+ public static final String PROPERTY_REDIRECT = "REDIRECT";
+ public static final String PROPERTY_INNER_MULTI_DISPATCH = "INNER_MULTI_DISPATCH";
+ public static final String PROPERTY_INNER_MULTI_QUEUE_OFFSET = "INNER_MULTI_QUEUE_OFFSET";
public static final String KEY_SEPARATOR = " ";
diff --cc common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
index 10d6f4d,0c3df0d..0543add
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/body/ConsumerRunningInfo.java
@@@ -268,30 -266,8 +268,30 @@@ public class ConsumerRunningInfo extend
}
{
+ sb.append("\n\n#Consumer Pop Detail#\n");
+ sb.append(String.format("%-32s %-32s %-4s %-20s%n",
+ "#Topic",
+ "#Broker Name",
+ "#QID",
+ "#ProcessQueueInfo"
+ ));
+
+ Iterator<Entry<MessageQueue, PopProcessQueueInfo>> it = this.mqPopTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<MessageQueue, PopProcessQueueInfo> next = it.next();
+ String item = String.format("%-32s %-32s %-4d %s%n",
+ next.getKey().getTopic(),
+ next.getKey().getBrokerName(),
+ next.getKey().getQueueId(),
+ next.getValue().toString());
+
+ sb.append(item);
+ }
+ }
+
+ {
sb.append("\n\n#Consumer RT&TPS#\n");
- sb.append(String.format("%-32s %14s %14s %14s %14s %18s %25s%n",
+ sb.append(String.format("%-64s %14s %14s %14s %14s %18s %25s%n",
"#Topic",
"#Pull RT",
"#Pull TPS",
diff --cc namesrv/pom.xml
index b1d3936,7ecbcd3..1da3fa2
--- a/namesrv/pom.xml
+++ b/namesrv/pom.xml
@@@ -49,9 -49,8 +49,13 @@@
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>test</scope>
+ </dependency>
++ <dependency>
+ <groupId>org.bouncycastle</groupId>
+ <artifactId>bcpkix-jdk15on</artifactId>
+ </dependency>
</dependencies>
</project>
diff --cc pom.xml
index 6541d8c,ec05130..a5ae2ae
--- a/pom.xml
+++ b/pom.xml
@@@ -562,14 -568,9 +568,9 @@@
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <version>31.0.1-jre</version>
+ <version>19.0</version>
</dependency>
<dependency>
- <groupId>com.googlecode.concurrentlinkedhashmap</groupId>
- <artifactId>concurrentlinkedhashmap-lru</artifactId>
- <version>1.4.2</version>
- </dependency>
- <dependency>
<groupId>io.openmessaging</groupId>
<artifactId>openmessaging-api</artifactId>
<version>0.3.1-alpha</version>
diff --cc remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index c8a0e9e,39bbb0d..0894ea6
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@@ -172,18 -191,9 +190,10 @@@ public class RemotingHelper
public static String parseSocketAddressAddr(SocketAddress socketAddress) {
if (socketAddress != null) {
+ // Default toString of InetSocketAddress is "hostName/IP:port"
final String addr = socketAddress.toString();
- if (addr.length() > 0) {
- if (addr.contains("/")) {
- String[] segments = addr.split("/");
- if (segments.length > 1) {
- return segments[1];
- }
- }
- return addr.substring(1);
- }
- return addr;
+ int index = addr.lastIndexOf("/");
+ return (index != -1) ? addr.substring(index + 1) : addr;
}
return "";
}
diff --cc remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
index 03b1640,d43fe8d..5a1ea3f
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingCommand.java
@@@ -28,12 -21,15 +28,18 @@@ import java.lang.annotation.Annotation
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+ import org.apache.rocketmq.logging.InternalLogger;
+ import org.apache.rocketmq.logging.InternalLoggerFactory;
+ import org.apache.rocketmq.remoting.CommandCustomHeader;
+ import org.apache.rocketmq.remoting.annotation.CFNotNull;
+ import org.apache.rocketmq.remoting.common.RemotingHelper;
+ import org.apache.rocketmq.remoting.exception.RemotingCommandException;
public class RemotingCommand {
public static final String SERIALIZE_TYPE_PROPERTY = "rocketmq.serialize.type";
diff --cc store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 2ae84eb,bf66af5..01857cc
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@@ -16,27 -16,7 +16,27 @@@
*/
package org.apache.rocketmq.store;
+import java.net.Inet4Address;
+import java.net.Inet6Address;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
- import java.util.ArrayList;
+import java.util.Collections;
++import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
++import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
- import java.util.concurrent.TimeoutException;
+import java.util.function.Supplier;
+
import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.TopicConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageAccessor;
@@@ -79,40 -76,45 +80,47 @@@ public class CommitLog implements Swapp
protected final PutMessageLock putMessageLock;
+ protected final TopicQueueLock topicQueueLock;
+
private volatile Set<String> fullStorePaths = Collections.emptySet();
+ private final MultiDispatch multiDispatch;
+ private final FlushDiskWatcher flushDiskWatcher;
+
+ protected int commitLogSize;
+
- public CommitLog(final MessageStore messageStore) {
- String storePath = messageStore.getMessageStoreConfig().getStorePathCommitLog();
+ public CommitLog(final DefaultMessageStore defaultMessageStore) {
+ String storePath = defaultMessageStore.getMessageStoreConfig().getStorePathCommitLog();
if (storePath.contains(MessageStoreConfig.MULTI_PATH_SPLITTER)) {
- this.mappedFileQueue = new MultiPathMappedFileQueue(messageStore.getMessageStoreConfig(),
- messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
- messageStore.getAllocateMappedFileService(), this::getFullStorePaths);
+ this.mappedFileQueue = new MultiPathMappedFileQueue(defaultMessageStore.getMessageStoreConfig(),
- defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
- defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);
++ defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
++ defaultMessageStore.getAllocateMappedFileService(), this::getFullStorePaths);
} else {
this.mappedFileQueue = new MappedFileQueue(storePath,
- messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
- messageStore.getAllocateMappedFileService());
- defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
- defaultMessageStore.getAllocateMappedFileService());
++ defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
++ defaultMessageStore.getAllocateMappedFileService());
}
- this.defaultMessageStore = messageStore;
+ this.defaultMessageStore = defaultMessageStore;
- if (FlushDiskType.SYNC_FLUSH == defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
- this.flushCommitLogService = new GroupCommitService();
- } else {
- this.flushCommitLogService = new FlushRealTimeService();
- }
-
- this.commitLogService = new CommitRealTimeService();
+ this.flushManager = new DefaultFlushManager();
- this.appendMessageCallback = new DefaultAppendMessageCallback(messageStore.getMessageStoreConfig().getMaxMessageSize());
+ this.appendMessageCallback = new DefaultAppendMessageCallback(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
putMessageThreadLocal = new ThreadLocal<PutMessageThreadLocal>() {
@Override
protected PutMessageThreadLocal initialValue() {
return new PutMessageThreadLocal(defaultMessageStore.getMessageStoreConfig().getMaxMessageSize());
}
};
- this.putMessageLock = messageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
+ this.putMessageLock = defaultMessageStore.getMessageStoreConfig().isUseReentrantLockWhenPutMessage() ? new PutMessageReentrantLock() : new PutMessageSpinLock();
+
+ this.multiDispatch = new MultiDispatch(defaultMessageStore, this);
+
+ flushDiskWatcher = new FlushDiskWatcher();
+
+ this.topicQueueLock = new TopicQueueLock();
+
- this.commitLogSize = messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
++ this.commitLogSize = defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
}
public void setFullStorePaths(Set<String> fullStorePaths) {
@@@ -123,21 -125,36 +131,28 @@@
return fullStorePaths;
}
+ public ThreadLocal<PutMessageThreadLocal> getPutMessageThreadLocal() {
+ return putMessageThreadLocal;
+ }
+
public boolean load() {
boolean result = this.mappedFileQueue.load();
+ this.mappedFileQueue.checkSelf();
log.info("load commit log " + (result ? "OK" : "Failed"));
return result;
}
public void start() {
- this.flushCommitLogService.start();
-
+ this.flushManager.start();
+ log.info("start commitLog successfully. storeRoot: {}", this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
+ flushDiskWatcher.setDaemon(true);
+ flushDiskWatcher.start();
-
-
- if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
- this.commitLogService.start();
- }
}
public void shutdown() {
- if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
- this.commitLogService.shutdown();
- }
-
- this.flushCommitLogService.shutdown();
-
+ this.flushManager.shutdown();
+ log.info("shutdown commitLog successfully. storeRoot: {}", this.defaultMessageStore.getMessageStoreConfig().getStorePathRootDir());
+ flushDiskWatcher.shutdown(true);
}
public long flush() {
@@@ -407,26 -410,21 +422,26 @@@
return new DispatchRequest(totalSize, false/* success */);
}
- return new DispatchRequest(
+ DispatchRequest dispatchRequest = new DispatchRequest(
- topic,
- queueId,
- physicOffset,
- totalSize,
- tagsCode,
- storeTimestamp,
- queueOffset,
- keys,
- uniqKey,
- sysFlag,
- preparedTransactionOffset,
- propertiesMap
+ topic,
+ queueId,
+ physicOffset,
+ totalSize,
+ tagsCode,
+ storeTimestamp,
+ queueOffset,
+ keys,
+ uniqKey,
+ sysFlag,
+ preparedTransactionOffset,
+ propertiesMap
);
+
+ setBatchSizeIfNeeded(propertiesMap, dispatchRequest);
+
+ return dispatchRequest;
} catch (Exception e) {
+ log.error("CheckMessageAndReturnSizeOld", e);
}
return new DispatchRequest(-1, false /* success */);
@@@ -632,11 -616,10 +647,10 @@@
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
- int queueId = msg.getQueueId();
-
+ // int queueId msg.getQueueId();
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
-- || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
++ || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
@@@ -671,76 -660,52 +685,77 @@@
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
- putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+ topicQueueLock.lock(topicQueueKey);
try {
- MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
- long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
- this.beginTimeInLock = beginLockTimestamp;
+ defaultMessageStore.assignOffset(topicQueueKey, msg, getMessageNum(msg));
- // Here settings are stored timestamp, in order to ensure an orderly
- // global
- msg.setStoreTimestamp(beginLockTimestamp);
-
- if (null == mappedFile || mappedFile.isFull()) {
- mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
- }
- if (null == mappedFile) {
- log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
+ PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
+ if (encodeResult != null) {
+ return CompletableFuture.completedFuture(encodeResult);
}
+ msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
+ PutMessageContext putMessageContext = new PutMessageContext(topicQueueKey);
- result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
- switch (result.getStatus()) {
- case PUT_OK:
- break;
- case END_OF_FILE:
- unlockMappedFile = mappedFile;
- // Create a new file, re-write the message
- mappedFile = this.mappedFileQueue.getLastMappedFile(0);
- if (null == mappedFile) {
- // XXX: warn and notify me
- log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
- }
- result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
- break;
- case MESSAGE_SIZE_EXCEEDED:
- case PROPERTIES_SIZE_EXCEEDED:
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
- case UNKNOWN_ERROR:
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
- default:
- return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
- }
+ putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
+ try {
+ MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
+ long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
+ this.beginTimeInLock = beginLockTimestamp;
+
+ // Here settings are stored timestamp, in order to ensure an orderly
+ // global
+ msg.setStoreTimestamp(beginLockTimestamp);
+
+ if (null == mappedFile || mappedFile.isFull()) {
+ mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
+ }
+ if (null == mappedFile) {
+ log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
+ beginTimeInLock = 0;
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
+ }
+
+ result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
+ switch (result.getStatus()) {
+ case PUT_OK:
+ onCommitLogAppend(msg, result, mappedFile);
+ break;
+ case END_OF_FILE:
+ onCommitLogAppend(msg, result, mappedFile);
+ unlockMappedFile = mappedFile;
+ // Create a new file, re-write the message
+ mappedFile = this.mappedFileQueue.getLastMappedFile(0);
+ if (null == mappedFile) {
+ // XXX: warn and notify me
+ log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
+ beginTimeInLock = 0;
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
+ }
+ result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
+ if (AppendMessageStatus.PUT_OK.equals(result.getStatus())) {
+ onCommitLogAppend(msg, result, mappedFile);
+ }
+ break;
+ case MESSAGE_SIZE_EXCEEDED:
+ case PROPERTIES_SIZE_EXCEEDED:
+ beginTimeInLock = 0;
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
+ case UNKNOWN_ERROR:
+ beginTimeInLock = 0;
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
+ default:
+ beginTimeInLock = 0;
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
+ }
- elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
+ elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
+ beginTimeInLock = 0;
+ } finally {
++ beginTimeInLock = 0;
+ putMessageLock.unlock();
+ }
} finally {
- beginTimeInLock = 0;
- putMessageLock.unlock();
+ topicQueueLock.unlock(topicQueueKey);
}
if (elapsedTimeInLock > 500) {
@@@ -902,58 -846,44 +909,58 @@@
}
public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, MessageExt messageExt) {
- // Synchronization flush
- if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
- final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
+ return this.flushManager.handleDiskFlush(result, messageExt);
+ }
+
+ public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
+ if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
+ HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
- GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
+ if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
+ GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
- this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
+ this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
- flushDiskWatcher.add(request);
- service.putRequest(request);
- return request.future();
- } else {
- service.wakeup();
- return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
- }
- }
- // Asynchronous flush
- else {
- if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
- flushCommitLogService.wakeup();
- } else {
- commitLogService.wakeup();
++ flushDiskWatcher.add(request);
+ service.putRequest(request);
+ service.getWaitNotifyObject().wakeupAll();
+ return request.future();
- }
- else {
++ } else {
+ return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
+ }
}
- return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
+ return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
}
-
- public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, MessageExt messageExt) {
+ public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
+ this.flushManager.handleDiskFlush(result, putMessageResult, messageExt);
+ }
+
+ public void handleHA(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
HAService service = this.defaultMessageStore.getHaService();
if (messageExt.isWaitStoreMsgOK()) {
- if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
+ // Determine whether to wait
+ if (service.isSlaveOK(result.getWroteOffset() + result.getWroteBytes())) {
- GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
+ GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
- this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
++ this.defaultMessageStore.getMessageStoreConfig().getSlaveTimeout());
service.putRequest(request);
service.getWaitNotifyObject().wakeupAll();
- return request.future();
+ PutMessageStatus replicaStatus = null;
+ try {
+ replicaStatus = request.future().get(this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
- TimeUnit.MILLISECONDS);
++ TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ }
+ if (replicaStatus != PutMessageStatus.PUT_OK) {
+ log.error("do sync transfer other node, wait return, but failed, topic: " + messageExt.getTopic() + " tags: "
+ + messageExt.getTags() + " client address: " + messageExt.getBornHostNameString());
+ putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_SLAVE_TIMEOUT);
+ }
}
+ // Slave problem
else {
- return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
+ // Tell the producer, slave not available
+ putMessageResult.setPutMessageStatus(PutMessageStatus.SLAVE_NOT_AVAILABLE);
}
}
}
@@@ -1050,26 -998,10 +1057,30 @@@
return diff;
}
+ protected short getMessageNum(MessageExtBrokerInner msgInner) {
+ short messageNum = 1;
+ // IF inner batch, build batchQueueOffset and batchNum property.
+ CQType cqType = getCqType(msgInner);
+
+ if (MessageSysFlag.check(msgInner.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG) || CQType.BatchCQ.equals(cqType)) {
+ if (msgInner.getProperty(MessageConst.PROPERTY_INNER_NUM) != null) {
+ messageNum = Short.parseShort(msgInner.getProperty(MessageConst.PROPERTY_INNER_NUM));
+ messageNum = messageNum >= 1 ? messageNum : 1;
+ }
+ }
+
+ return messageNum;
+ }
+
+ private CQType getCqType(MessageExtBrokerInner msgInner) {
+ Optional<TopicConfig> topicConfig = this.defaultMessageStore.getTopicConfig(msgInner.getTopic());
+ return QueueTypeUtils.getCQType(topicConfig);
+ }
+
+ public Map<String, Long> getLmqTopicQueueTable() {
+ return this.lmqTopicQueueTable;
+ }
+
abstract class FlushCommitLogService extends ServiceThread {
protected static final int RETRY_TIMES_OVER = 10;
}
@@@ -1105,9 -1037,10 +1116,9 @@@
long end = System.currentTimeMillis();
if (!result) {
this.lastCommitTimestamp = end; // result = false means some data committed.
- //now wake up flush thread.
- flushCommitLogService.wakeup();
+ CommitLog.this.flushManager.wakeUpFlush();
}
- CommitLog.this.getMessageStore().getPerfCounter().flowOnce("COMMIT_DATA_TIME_MS", (int)(end - begin));
-
++ CommitLog.this.getMessageStore().getPerfCounter().flowOnce("COMMIT_DATA_TIME_MS", (int) (end - begin));
if (end - begin > 500) {
log.info("Commit data to file costs {} ms", end - begin);
}
@@@ -1170,7 -1103,6 +1181,7 @@@
CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
}
long past = System.currentTimeMillis() - begin;
- CommitLog.this.getMessageStore().getPerfCounter().flowOnce("FLUSH_DATA_TIME_MS", (int)past);
++ CommitLog.this.getMessageStore().getPerfCounter().flowOnce("FLUSH_DATA_TIME_MS", (int) past);
if (past > 500) {
log.info("Flush data to disk costs {} ms", past);
}
@@@ -1216,14 -1147,13 +1226,12 @@@
public GroupCommitRequest(long nextOffset, long timeoutMillis) {
this.nextOffset = nextOffset;
- this.timeoutMillis = timeoutMillis;
+ this.deadLine = System.nanoTime() + (timeoutMillis * 1_000_000);
}
- public GroupCommitRequest(long nextOffset) {
- this.nextOffset = nextOffset;
+ public long getDeadLine() {
+ return deadLine;
}
-
--
public long getNextOffset() {
return nextOffset;
}
@@@ -1339,114 -1269,6 +1347,114 @@@
}
}
+ class GroupCheckService extends FlushCommitLogService {
+ private volatile List<GroupCommitRequest> requestsWrite = new ArrayList<GroupCommitRequest>();
+ private volatile List<GroupCommitRequest> requestsRead = new ArrayList<GroupCommitRequest>();
+
+ public boolean isAynscRequestsFull() {
+ return requestsWrite.size() > CommitLog.this.defaultMessageStore.getMessageStoreConfig().getMaxAsyncPutMessageRequests() * 2;
+ }
+
+ public synchronized boolean putRequest(final GroupCommitRequest request) {
+ synchronized (this.requestsWrite) {
+ this.requestsWrite.add(request);
+ }
+ if (hasNotified.compareAndSet(false, true)) {
+ waitPoint.countDown(); // notify
+ }
+ boolean flag = this.requestsWrite.size() >
- CommitLog.this.defaultMessageStore.getMessageStoreConfig().getMaxAsyncPutMessageRequests();
++ CommitLog.this.defaultMessageStore.getMessageStoreConfig().getMaxAsyncPutMessageRequests();
+ if (flag) {
+ log.info("Async requests {} exceeded the threshold {}", requestsWrite.size(),
- CommitLog.this.defaultMessageStore.getMessageStoreConfig().getMaxAsyncPutMessageRequests());
++ CommitLog.this.defaultMessageStore.getMessageStoreConfig().getMaxAsyncPutMessageRequests());
+ }
+
+ return flag;
+ }
+
+ private void swapRequests() {
+ List<GroupCommitRequest> tmp = this.requestsWrite;
+ this.requestsWrite = this.requestsRead;
+ this.requestsRead = tmp;
+ }
+
+ private void doCommit() {
+ synchronized (this.requestsRead) {
+ if (!this.requestsRead.isEmpty()) {
+ for (GroupCommitRequest req : this.requestsRead) {
+ // There may be a message in the next file, so a maximum of
+ // two times the flush
+ boolean flushOK = false;
+ for (int i = 0; i < 1000; i++) {
+ flushOK = CommitLog.this.mappedFileQueue.getFlushedWhere() >= req.getNextOffset();
+ if (flushOK) {
+ break;
+ } else {
+ try {
+ Thread.sleep(1);
+ } catch (Throwable ignored) {
+
+ }
+ }
+ }
+ req.wakeupCustomer(flushOK ? PutMessageStatus.PUT_OK : PutMessageStatus.FLUSH_DISK_TIMEOUT);
+ }
+
+ long storeTimestamp = CommitLog.this.mappedFileQueue.getStoreTimestamp();
+ if (storeTimestamp > 0) {
+ CommitLog.this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimestamp);
+ }
+
+ this.requestsRead.clear();
+ }
+ }
+ }
+
+ public void run() {
+ CommitLog.log.info(this.getServiceName() + " service started");
+
+ while (!this.isStopped()) {
+ try {
+ this.waitForRunning(1);
+ this.doCommit();
+ } catch (Exception e) {
+ CommitLog.log.warn(this.getServiceName() + " service has exception. ", e);
+ }
+ }
+
+ // Under normal circumstances shutdown, wait for the arrival of the
+ // request, and then flush
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ CommitLog.log.warn("GroupCommitService Exception, ", e);
+ }
+
+ synchronized (this) {
+ this.swapRequests();
+ }
+
+ this.doCommit();
+
+ CommitLog.log.info(this.getServiceName() + " service end");
+ }
+
+ @Override
+ protected void onWaitEnd() {
+ this.swapRequests();
+ }
+
+ @Override
+ public String getServiceName() {
+ return CommitLog.GroupCheckService.class.getSimpleName();
+ }
+
+ @Override
+ public long getJointime() {
+ return 1000 * 60 * 5;
+ }
+ }
+
class DefaultAppendMessageCallback implements AppendMessageCallback {
// File at the end of the minimum fixed length empty
private static final int END_FILE_MIN_BLANK_LENGTH = 4 + 4;
@@@ -1482,11 -1304,18 +1490,16 @@@
};
// Record ConsumeQueue information
- String key = putMessageContext.getTopicQueueTableKey();
- Long queueOffset = CommitLog.this.topicQueueTable.get(key);
- if (null == queueOffset) {
- queueOffset = 0L;
- CommitLog.this.topicQueueTable.put(key, queueOffset);
- }
+ Long queueOffset = msgInner.getQueueOffset();
+
+ // this msg maybe a inner-batch msg.
+ short messageNum = getMessageNum(msgInner);
+ boolean multiDispatchWrapResult = CommitLog.this.multiDispatch.wrapMultiDispatch(msgInner);
+ if (!multiDispatchWrapResult) {
+ return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+ }
+
// Transaction messages that require special handling
final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
switch (tranType) {
@@@ -1517,9 -1346,9 +1530,9 @@@
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, 8);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset,
-- maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
-- msgIdSupplier, msgInner.getStoreTimestamp(),
-- queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
++ maxBlank, /* only wrote 8 bytes, but declare wrote maxBlank for compute write position */
++ msgIdSupplier, msgInner.getStoreTimestamp(),
++ queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
int pos = 4 + 4 + 4 + 4 + 4;
@@@ -1534,15 -1363,28 +1547,29 @@@
// refresh store time stamp in lock
preEncodeBuffer.putLong(pos, msgInner.getStoreTimestamp());
--
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
+ CommitLog.this.getMessageStore().getPerfCounter().startTick("WRITE_MEMORY_TIME_MS");
// Write messages to the queue buffer
byteBuffer.put(preEncodeBuffer);
+ CommitLog.this.getMessageStore().getPerfCounter().endTick("WRITE_MEMORY_TIME_MS");
msgInner.setEncodedBuff(null);
- return new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
- msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills, messageNum);
+ AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgIdSupplier,
+ msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
+
+ switch (tranType) {
+ case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
+ case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
+ break;
+ case MessageSysFlag.TRANSACTION_NOT_TYPE:
+ case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
+ // The next update ConsumeQueue information
+ CommitLog.this.topicQueueTable.put(key, ++queueOffset);
+ CommitLog.this.multiDispatch.updateMultiQueueOffset(msgInner);
+ break;
+ default:
+ break;
+ }
+ return result;
}
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
@@@ -1673,7 -1521,7 +1700,7 @@@
* Serialize message
*/
final byte[] propertiesData =
-- msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
++ msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
@@@ -1694,7 -1540,7 +1721,7 @@@
// Exceeds the maximum message
if (msgLen > this.maxMessageSize) {
CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
-- + ", maxMessageSize: " + this.maxMessageSize);
++ + ", maxMessageSize: " + this.maxMessageSize);
return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null);
}
@@@ -1719,11 -1565,11 +1746,11 @@@
// 9 BORNTIMESTAMP
this.encoderBuffer.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
-- socketAddress2ByteBuffer(msgInner.getBornHost() ,this.encoderBuffer);
++ socketAddress2ByteBuffer(msgInner.getBornHost(), this.encoderBuffer);
// 11 STORETIMESTAMP
this.encoderBuffer.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
-- socketAddress2ByteBuffer(msgInner.getStoreHost() ,this.encoderBuffer);
++ socketAddress2ByteBuffer(msgInner.getStoreHost(), this.encoderBuffer);
// 13 RECONSUMETIMES
this.encoderBuffer.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
@@@ -1858,155 -1715,55 +1896,164 @@@
byteBuffer.limit(limit);
}
+ public ByteBuffer getEncoderBuffer() {
+ return encoderBuffer;
+ }
}
- static class PutMessageThreadLocal {
- private MessageExtEncoder encoder;
- private StringBuilder keyBuilder;
- PutMessageThreadLocal(int size) {
- encoder = new MessageExtEncoder(size);
- keyBuilder = new StringBuilder();
- }
+ interface FlushManager {
+ void start();
+
- public MessageExtEncoder getEncoder() {
- return encoder;
+ void shutdown();
++
+ void wakeUpFlush();
++
+ void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt);
++
+ CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt);
+ }
+
+ class DefaultFlushManager implements FlushManager {
+
+ private final FlushCommitLogService flushCommitLogService;
+
+ //If TransientStorePool enabled, we must flush message to FileChannel at fixed periods
+ private final FlushCommitLogService commitLogService;
+
+ public DefaultFlushManager() {
+ if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
+ this.flushCommitLogService = new CommitLog.GroupCommitService();
+ } else {
+ this.flushCommitLogService = new CommitLog.FlushRealTimeService();
+ }
+
+ this.commitLogService = new CommitLog.CommitRealTimeService();
}
- public StringBuilder getKeyBuilder() {
- return keyBuilder;
+ @Override
+ public void start() {
+ this.flushCommitLogService.start();
+
+ if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+ this.commitLogService.start();
+ }
}
- }
- public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
- static class PutMessageContext {
- private String topicQueueTableKey;
- private long[] phyPos;
- private int batchSize;
++ public void handleDiskFlush(AppendMessageResult result, PutMessageResult putMessageResult,
++ MessageExt messageExt) {
+ // Synchronization flush
+ if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
+ final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
+ if (messageExt.isWaitStoreMsgOK()) {
+ GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes());
+ service.putRequest(request);
+ CompletableFuture<PutMessageStatus> flushOkFuture = request.future();
+ PutMessageStatus flushStatus = null;
+ try {
+ flushStatus = flushOkFuture.get(CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout(),
- TimeUnit.MILLISECONDS);
++ TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ //flushOK=false;
+ }
+ if (flushStatus != PutMessageStatus.PUT_OK) {
+ log.error("do groupcommit, wait for flush failed, topic: " + messageExt.getTopic() + " tags: " + messageExt.getTags()
- + " client address: " + messageExt.getBornHostString());
++ + " client address: " + messageExt.getBornHostString());
+ putMessageResult.setPutMessageStatus(PutMessageStatus.FLUSH_DISK_TIMEOUT);
+ }
+ } else {
+ service.wakeup();
+ }
+ }
+ // Asynchronous flush
+ else {
+ if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+ flushCommitLogService.wakeup();
+ } else {
+ commitLogService.wakeup();
+ }
+ }
+ }
- public PutMessageContext(String topicQueueTableKey) {
- this.topicQueueTableKey = topicQueueTableKey;
+ @Override
+ public CompletableFuture<PutMessageStatus> handleDiskFlush(AppendMessageResult result, MessageExt messageExt) {
+ // Synchronization flush
+ if (FlushDiskType.SYNC_FLUSH == CommitLog.this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
+ final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
+ if (messageExt.isWaitStoreMsgOK()) {
+ GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
- CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
++ CommitLog.this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
+ service.putRequest(request);
+ return request.future();
+ } else {
+ service.wakeup();
+ return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
+ }
+ }
+ // Asynchronous flush
+ else {
+ if (!CommitLog.this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+ flushCommitLogService.wakeup();
- } else {
++ } else {
+ commitLogService.wakeup();
+ }
+ return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
+ }
}
- public String getTopicQueueTableKey() {
- return topicQueueTableKey;
+ @Override
+ public void wakeUpFlush() {
+ // now wake up flush thread.
+ flushCommitLogService.wakeup();
}
- public long[] getPhyPos() {
- return phyPos;
+ @Override
+ public void shutdown() {
+ if (defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
+ this.commitLogService.shutdown();
+ }
+
+ this.flushCommitLogService.shutdown();
}
- public void setPhyPos(long[] phyPos) {
- this.phyPos = phyPos;
+ }
+
+ public int getCommitLogSize() {
+ return commitLogSize;
+ }
+
+ public MappedFileQueue getMappedFileQueue() {
+ return mappedFileQueue;
+ }
+
+ public MessageStore getMessageStore() {
+ return defaultMessageStore;
+ }
+
+ @Override
+ public void swapMap(int reserveNum, long forceSwapIntervalMs, long normalSwapIntervalMs) {
+ this.getMappedFileQueue().swapMap(reserveNum, forceSwapIntervalMs, normalSwapIntervalMs);
+ }
+
+ @Override
+ public void cleanSwappedMap(long forceCleanSwapIntervalMs) {
+ this.getMappedFileQueue().cleanSwappedMap(forceCleanSwapIntervalMs);
+ }
+
+ static class PutMessageThreadLocal {
+ private MessageExtEncoder encoder;
+ private StringBuilder keyBuilder;
++
+ PutMessageThreadLocal(int size) {
+ encoder = new MessageExtEncoder(size);
+ keyBuilder = new StringBuilder();
}
- public int getBatchSize() {
- return batchSize;
+ public MessageExtEncoder getEncoder() {
+ return encoder;
}
- public void setBatchSize(int batchSize) {
- this.batchSize = batchSize;
+ public StringBuilder getKeyBuilder() {
+ return keyBuilder;
}
}
}
diff --cc store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index a1fc870,fdc725d..d4d5ef3
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@@ -439,13 -429,52 +445,59 @@@ public class ConsumeQueue implements Co
this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();
}
+ private void multiDispatchLmqQueue(DispatchRequest request, int maxRetries) {
+ Map<String, String> prop = request.getPropertiesMap();
+ String multiDispatchQueue = prop.get(MessageConst.PROPERTY_INNER_MULTI_DISPATCH);
+ String multiQueueOffset = prop.get(MessageConst.PROPERTY_INNER_MULTI_QUEUE_OFFSET);
+ String[] queues = multiDispatchQueue.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+ String[] queueOffsets = multiQueueOffset.split(MixAll.MULTI_DISPATCH_QUEUE_SPLITTER);
+ if (queues.length != queueOffsets.length) {
+ log.error("[bug] queues.length!=queueOffsets.length ", request.getTopic());
+ return;
+ }
+ for (int i = 0; i < queues.length; i++) {
+ String queueName = queues[i];
+ long queueOffset = Long.parseLong(queueOffsets[i]);
+ int queueId = request.getQueueId();
+ if (this.defaultMessageStore.getMessageStoreConfig().isEnableLmq() && MixAll.isLmq(queueName)) {
+ queueId = 0;
+ }
+ doDispatchLmqQueue(request, maxRetries, queueName, queueOffset, queueId);
+
+ }
+ return;
+ }
+
+ private void doDispatchLmqQueue(DispatchRequest request, int maxRetries, String queueName, long queueOffset,
+ int queueId) {
+ ConsumeQueue cq = this.defaultMessageStore.findConsumeQueue(queueName, queueId);
+ boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
+ for (int i = 0; i < maxRetries && canWrite; i++) {
+ boolean result = cq.putMessagePositionInfo(request.getCommitLogOffset(), request.getMsgSize(),
+ request.getTagsCode(),
+ queueOffset);
+ if (result) {
+ break;
+ } else {
+ log.warn("[BUG]put commit log position info to " + queueName + ":" + queueId + " " + request.getCommitLogOffset()
+ + " failed, retry " + i + " times");
+
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ log.warn("", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public void assignQueueOffset(QueueOffsetAssigner queueOffsetAssigner, MessageExtBrokerInner msg, short messageNum) {
+ String topicQueueKey = getTopic() + "-" + getQueueId();
+ long queueOffset = queueOffsetAssigner.assignQueueOffset(topicQueueKey, messageNum);
+ msg.setQueueOffset(queueOffset);
+ }
+
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
final long cqOffset) {
diff --cc store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 76165d9,f11d5f3..9b9590a
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@@ -454,20 -455,12 +477,26 @@@ public class DefaultMessageStore implem
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
}
+ if (msg.getProperties().containsKey(MessageConst.PROPERTY_INNER_NUM)
+ && !MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
+ log.warn("[BUG]The message had property {} but is not an inner batch", MessageConst.PROPERTY_INNER_NUM);
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+ }
+
+ if (MessageSysFlag.check(msg.getSysFlag(), MessageSysFlag.INNER_BATCH_FLAG)) {
+ Optional<TopicConfig> topicConfig = this.getTopicConfig(msg.getTopic());
+ if (!QueueTypeUtils.isBatchCq(topicConfig)) {
+ log.error("[BUG]The message is an inner batch but cq type is not batch cq");
+ return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, null));
+ }
+ }
+
+ PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);
+ if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED) {
+ return CompletableFuture.completedFuture(new PutMessageResult(lmqMsgCheckStatus, null));
+ }
+
+
long beginTime = this.getSystemClock().now();
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
@@@ -485,8 -478,6 +514,7 @@@
return putResultFuture;
}
+
- @Override
public CompletableFuture<PutMessageResult> asyncPutMessages(MessageExtBatch messageExtBatch) {
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK) {
@@@ -1084,10 -1058,11 +1125,11 @@@
String topic = next.getKey();
if (!topics.contains(topic) && !topic.equals(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC)
- && !topic.equals(TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC)) {
+ && !topic.equals(TopicValidator.RMQ_SYS_TRANS_OP_HALF_TOPIC)
+ && !MixAll.isLmq(topic)) {
- ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue();
- for (ConsumeQueue cq : queueTable.values()) {
- cq.destroy();
+ ConcurrentMap<Integer, ConsumeQueueInterface> queueTable = next.getValue();
+ for (ConsumeQueueInterface cq : queueTable.values()) {
+ this.consumeQueueStore.destroy(cq);
log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned",
cq.getTopic(),
cq.getQueueId()
@@@ -1598,43 -1678,7 +1663,43 @@@
private volatile boolean cleanImmediately = false;
+ double getDiskSpaceWarningLevelRatio() {
+ double finalDiskSpaceWarningLevelRatio;
+ if ("".equals(diskSpaceWarningLevelRatio)) {
+ finalDiskSpaceWarningLevelRatio = DefaultMessageStore.this.getMessageStoreConfig().getDiskSpaceWarningLevelRatio() / 100.0;
+ } else {
+ finalDiskSpaceWarningLevelRatio = Double.parseDouble(diskSpaceWarningLevelRatio);
+ }
+
+ if (finalDiskSpaceWarningLevelRatio > 0.90) {
+ finalDiskSpaceWarningLevelRatio = 0.90;
+ }
+ if (finalDiskSpaceWarningLevelRatio < 0.35) {
+ finalDiskSpaceWarningLevelRatio = 0.35;
+ }
+
+ return finalDiskSpaceWarningLevelRatio;
+ }
+
+ double getDiskSpaceCleanForciblyRatio() {
+ double finalDiskSpaceCleanForciblyRatio;
+ if ("".equals(diskSpaceCleanForciblyRatio)) {
+ finalDiskSpaceCleanForciblyRatio = DefaultMessageStore.this.getMessageStoreConfig().getDiskSpaceCleanForciblyRatio() / 100.0;
+ } else {
+ finalDiskSpaceCleanForciblyRatio = Double.parseDouble(diskSpaceCleanForciblyRatio);
+ }
+
+ if (finalDiskSpaceCleanForciblyRatio > 0.85) {
+ finalDiskSpaceCleanForciblyRatio = 0.85;
+ }
+ if (finalDiskSpaceCleanForciblyRatio < 0.30) {
+ finalDiskSpaceCleanForciblyRatio = 0.30;
+ }
+
+ return finalDiskSpaceCleanForciblyRatio;
+ }
+
- public void excuteDeleteFilesManualy() {
+ public void executeDeleteFilesManually() {
this.manualDeleteFileSeveralTimes = MAX_MANUAL_DELETE_FILE_TIMES;
DefaultMessageStore.log.info("executeDeleteFilesManually was invoked");
}
@@@ -1761,10 -1802,9 +1826,9 @@@
}
{
- String storePathLogics = StorePathConfigHelper
- .getStorePathConsumeQueue(DefaultMessageStore.this.getMessageStoreConfig().getStorePathRootDir());
+ String storePathLogics = DefaultMessageStore.this.getStorePathLogic();
double logicsRatio = UtilAll.getDiskPartitionSpaceUsedPercent(storePathLogics);
- if (logicsRatio > diskSpaceWarningLevelRatio) {
+ if (logicsRatio > getDiskSpaceWarningLevelRatio()) {
boolean diskok = DefaultMessageStore.this.runningFlags.getAndMakeDiskFull();
if (diskok) {
DefaultMessageStore.log.error("logics disk maybe full soon " + logicsRatio + ", so mark disk full");
diff --cc store/src/main/java/org/apache/rocketmq/store/MessageStore.java
index 341a29f,6771ede..9b8a9a7
--- a/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MessageStore.java
@@@ -445,138 -399,12 +445,148 @@@ public interface MessageStore
void handleScheduleMessageService(BrokerRole brokerRole);
/**
+ * Will be triggered when a new message is appended to commit log.
+ * @param msg the msg that is appended to commit log
+ * @param result append message result
+ * @param commitLogFile commit log file
+ */
+ void onCommitLogAppend(MessageExtBrokerInner msg, AppendMessageResult result, MappedFile commitLogFile);
+
+ /**
+ * Will be triggered when a new dispatch request is sent to message store.
+ * @param dispatchRequest dispatch request
+ * @param doDispatch do dispatch if true
+ * @param commitLogFile commit log file
+ * @param isRecover is from recover process
+ * @param isFileEnd if the dispatch request represents 'file end'
+ */
+ void onCommitLogDispatch(DispatchRequest dispatchRequest, boolean doDispatch, MappedFile commitLogFile, boolean isRecover, boolean isFileEnd);
+
+ /**
+ * Get the message store config
+ * @return the message store config
+ */
+ MessageStoreConfig getMessageStoreConfig();
+
+ /**
+ * Get the statistics service
+ * @return the statistics service
+ */
+ StoreStatsService getStoreStatsService();
+
+ /**
+ * Get the store checkpoint component
+ * @return the checkpoint component
+ */
+ StoreCheckpoint getStoreCheckpoint();
+
+ /**
+ * Get the system clock
+ * @return the system clock
+ */
+ SystemClock getSystemClock();
+
+ /**
+ * Get the commit log
+ * @return the commit log
+ */
+ CommitLog getCommitLog();
+
+ /**
+ * Get running flags
+ * @return running flags
+ */
+ RunningFlags getRunningFlags();
+
+ /**
+ * Get the transient store pool
+ * @return the transient store pool
+ */
+ TransientStorePool getTransientStorePool();
+
+ /**
+ * Get the HA service
+ * @return the HA service
+ */
+ HAService getHaService();
+
+ /**
+ * Register clean file hook
+ * @param logicalQueueCleanHook logical queue clean hook
+ */
+ void registerCleanFileHook(CleanFilesHook logicalQueueCleanHook);
+
+ /**
+ * Get the allocate-mappedFile service
+ * @return the allocate-mappedFile service
+ */
+ AllocateMappedFileService getAllocateMappedFileService();
+
+ /**
+ * Truncate dirty logic files
+ * @param phyOffset physical offset
+ */
+ void truncateDirtyLogicFiles(long phyOffset);
+
+ /**
+ * Destroy logics files
+ */
+ void destroyLogics();
+
+ /**
+ * Unlock mappedFile
+ * @param unlockMappedFile the file that needs to be unlocked
+ */
+ void unlockMappedFile(MappedFile unlockMappedFile);
+
+ /**
+ * Get the perf counter component
+ * @return the perf counter component
+ */
+ PerfCounter.Ticks getPerfCounter();
+
+ /**
+ * Get the queue store
+ * @return the queue store
+ */
+ ConsumeQueueStore getQueueStore();
+
+ /**
+ * If 'sync disk flush' is configured in this message store
+ * @return yes if true, no if false
+ */
+ boolean isSyncDiskFlush();
+
+ /**
+ * If this message store is sync master role
+ * @return yes if true, no if false
+ */
+ boolean isSyncMaster();
+
+ /**
+ * Assign an queue offset and increase it.
+ * If there is a race condition, you need to lock/unlock this method yourself.
+ *
+ * @param topicQueueKey topic-queue key
+ * @param msg message
+ * @param messageNum message num
+ */
+ void assignOffset(String topicQueueKey, MessageExtBrokerInner msg, short messageNum);
+
+ /**
+ * get topic config
+ * @param topic topic name
+ * @return topic config info
+ */
+ Optional<TopicConfig> getTopicConfig(String topic);
++
++ /**
+ * Clean unused lmq topic.
+ * When calling to clean up the lmq topic,
+ * the lmq topic cannot be used to write messages at the same time,
+ * otherwise the messages of the cleaning lmq topic may be lost,
+ * please call this method with caution
+ * @param topic lmq topic
+ */
+ void cleanUnusedLmqTopic(String topic);
}
diff --cc store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index e58e695,45293e6..12ff598
--- a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@@ -167,48 -160,14 +168,56 @@@ public class MessageStoreConfig
private boolean enableScheduleMessageStats = true;
+ private int maxBatchDeleteFilesNum = 50;
+ //Polish dispatch
+ private int dispatchCqThreads = 10;
+ private int dispatchCqCacheNum = 1024 * 4;
+ private boolean enableAsyncReput = true;
+ //For recheck the reput
+ private boolean recheckReputOffsetFromCq = false;
+
+ // Maximum length of topic
+ private int maxTopicLength = 1000;
+ private int travelCqFileNumWhenGetMessage = 1;
+ // Sleep interval between to corrections
+ private int correctLogicMinOffsetSleepInterval = 1;
+ // Force correct min offset interval
+ private int correctLogicMinOffsetForceInterval = 5 * 60 * 1000;
+ // swap
+ private boolean mappedFileSwapEnable = true;
+ private long commitLogForceSwapMapInterval = 12L * 60 * 60 * 1000;
+ private long commitLogSwapMapInterval = 1L * 60 * 60 * 1000;
+ private int commitLogSwapMapReserveFileNum = 100;
+ private long logicQueueForceSwapMapInterval = 12L * 60 * 60 * 1000;
+ private long logicQueueSwapMapInterval = 1L * 60 * 60 * 1000;
+ private long cleanSwapedMapInterval = 5L * 60 * 1000;
+ private int logicQueueSwapMapReserveFileNum = 20;
+
+ private boolean searchBcqByCacheEnable = true;
+
+ @ImportantField
+ private boolean dispatchFromSenderThread = false;
+
+ @ImportantField
+ private boolean wakeCommitWhenPutMessage = true;
+ @ImportantField
+ private boolean wakeFlushWhenPutMessage = false;
+
+ @ImportantField
+ private boolean enableCleanExpiredOffset = false;
+
+ private int maxAsyncPutMessageRequests = 5000;
+
+ private int pullBatchMaxMessageCount = 160;
+
+ private boolean enableLmq = false;
+ private boolean enableMultiDispatch = false;
+ private int maxLmqConsumeQueueNum = 20000;
+
+ private boolean enableScheduleAsyncDeliver = false;
+ private int scheduleAsyncDeliverMaxPendingLimit = 2000;
+ private int scheduleAsyncDeliverMaxResendNum2Blocked = 3;
+
public boolean isDebugLockEnable() {
return debugLockEnable;
}
@@@ -858,163 -762,51 +875,211 @@@
this.enableScheduleMessageStats = enableScheduleMessageStats;
}
+ public int getMaxAsyncPutMessageRequests() {
+ return maxAsyncPutMessageRequests;
+ }
+
+ public void setMaxAsyncPutMessageRequests(int maxAsyncPutMessageRequests) {
+ this.maxAsyncPutMessageRequests = maxAsyncPutMessageRequests;
+ }
+
+ public int getMaxRecoveryCommitlogFiles() {
+ return maxRecoveryCommitlogFiles;
+ }
+
+ public void setMaxRecoveryCommitlogFiles(final int maxRecoveryCommitlogFiles) {
+ this.maxRecoveryCommitlogFiles = maxRecoveryCommitlogFiles;
+ }
+
+ public boolean isDispatchFromSenderThread() {
+ return dispatchFromSenderThread;
+ }
+
+ public void setDispatchFromSenderThread(boolean dispatchFromSenderThread) {
+ this.dispatchFromSenderThread = dispatchFromSenderThread;
+ }
+
+ public int getDispatchCqThreads() {
+ return dispatchCqThreads;
+ }
+
+ public void setDispatchCqThreads(final int dispatchCqThreads) {
+ this.dispatchCqThreads = dispatchCqThreads;
+ }
+
+ public int getDispatchCqCacheNum() {
+ return dispatchCqCacheNum;
+ }
+
+ public void setDispatchCqCacheNum(final int dispatchCqCacheNum) {
+ this.dispatchCqCacheNum = dispatchCqCacheNum;
+ }
+
+ public boolean isEnableAsyncReput() {
+ return enableAsyncReput;
+ }
+
+ public void setEnableAsyncReput(final boolean enableAsyncReput) {
+ this.enableAsyncReput = enableAsyncReput;
+ }
+
+ public boolean isRecheckReputOffsetFromCq() {
+ return recheckReputOffsetFromCq;
+ }
+
+ public void setRecheckReputOffsetFromCq(final boolean recheckReputOffsetFromCq) {
+ this.recheckReputOffsetFromCq = recheckReputOffsetFromCq;
+ }
+
+ public long getCommitLogForceSwapMapInterval() {
+ return commitLogForceSwapMapInterval;
+ }
+
+ public void setCommitLogForceSwapMapInterval(long commitLogForceSwapMapInterval) {
+ this.commitLogForceSwapMapInterval = commitLogForceSwapMapInterval;
+ }
+
+ public int getCommitLogSwapMapReserveFileNum() {
+ return commitLogSwapMapReserveFileNum;
+ }
+
+ public void setCommitLogSwapMapReserveFileNum(int commitLogSwapMapReserveFileNum) {
+ this.commitLogSwapMapReserveFileNum = commitLogSwapMapReserveFileNum;
+ }
+
+ public long getLogicQueueForceSwapMapInterval() {
+ return logicQueueForceSwapMapInterval;
+ }
+
+ public void setLogicQueueForceSwapMapInterval(long logicQueueForceSwapMapInterval) {
+ this.logicQueueForceSwapMapInterval = logicQueueForceSwapMapInterval;
+ }
+
+ public int getLogicQueueSwapMapReserveFileNum() {
+ return logicQueueSwapMapReserveFileNum;
+ }
+
+ public void setLogicQueueSwapMapReserveFileNum(int logicQueueSwapMapReserveFileNum) {
+ this.logicQueueSwapMapReserveFileNum = logicQueueSwapMapReserveFileNum;
+ }
+
+ public long getCleanSwapedMapInterval() {
+ return cleanSwapedMapInterval;
+ }
+
+ public void setCleanSwapedMapInterval(long cleanSwapedMapInterval) {
+ this.cleanSwapedMapInterval = cleanSwapedMapInterval;
+ }
+
+ public long getCommitLogSwapMapInterval() {
+ return commitLogSwapMapInterval;
+ }
+
+ public void setCommitLogSwapMapInterval(long commitLogSwapMapInterval) {
+ this.commitLogSwapMapInterval = commitLogSwapMapInterval;
+ }
+
+ public long getLogicQueueSwapMapInterval() {
+ return logicQueueSwapMapInterval;
+ }
+
+ public void setLogicQueueSwapMapInterval(long logicQueueSwapMapInterval) {
+ this.logicQueueSwapMapInterval = logicQueueSwapMapInterval;
+ }
+
+ public int getMaxBatchDeleteFilesNum() {
+ return maxBatchDeleteFilesNum;
+ }
+
+ public void setMaxBatchDeleteFilesNum(int maxBatchDeleteFilesNum) {
+ this.maxBatchDeleteFilesNum = maxBatchDeleteFilesNum;
+ }
+
+ public boolean isSearchBcqByCacheEnable() {
+ return searchBcqByCacheEnable;
+ }
+
+ public void setSearchBcqByCacheEnable(boolean searchBcqByCacheEnable) {
+ this.searchBcqByCacheEnable = searchBcqByCacheEnable;
+ }
+
+ public int getDiskSpaceWarningLevelRatio() {
+ return diskSpaceWarningLevelRatio;
+ }
+
+ public void setDiskSpaceWarningLevelRatio(int diskSpaceWarningLevelRatio) {
+ this.diskSpaceWarningLevelRatio = diskSpaceWarningLevelRatio;
+ }
+
+ public int getDiskSpaceCleanForciblyRatio() {
+ return diskSpaceCleanForciblyRatio;
+ }
+
+ public void setDiskSpaceCleanForciblyRatio(int diskSpaceCleanForciblyRatio) {
+ this.diskSpaceCleanForciblyRatio = diskSpaceCleanForciblyRatio;
+ }
+
+ public boolean isMappedFileSwapEnable() {
+ return mappedFileSwapEnable;
+ }
+
+ public void setMappedFileSwapEnable(boolean mappedFileSwapEnable) {
+ this.mappedFileSwapEnable = mappedFileSwapEnable;
+ }
+
+ public int getPullBatchMaxMessageCount() {
+ return pullBatchMaxMessageCount;
+ }
+
+ public void setPullBatchMaxMessageCount(int pullBatchMaxMessageCount) {
+ this.pullBatchMaxMessageCount = pullBatchMaxMessageCount;
+ }
++
+ public boolean isEnableLmq() {
+ return enableLmq;
+ }
+
+ public void setEnableLmq(boolean enableLmq) {
+ this.enableLmq = enableLmq;
+ }
+
+ public boolean isEnableMultiDispatch() {
+ return enableMultiDispatch;
+ }
+
+ public void setEnableMultiDispatch(boolean enableMultiDispatch) {
+ this.enableMultiDispatch = enableMultiDispatch;
+ }
+
+ public int getMaxLmqConsumeQueueNum() {
+ return maxLmqConsumeQueueNum;
+ }
+
+ public void setMaxLmqConsumeQueueNum(int maxLmqConsumeQueueNum) {
+ this.maxLmqConsumeQueueNum = maxLmqConsumeQueueNum;
+ }
+
+ public boolean isEnableScheduleAsyncDeliver() {
+ return enableScheduleAsyncDeliver;
+ }
+
+ public void setEnableScheduleAsyncDeliver(boolean enableScheduleAsyncDeliver) {
+ this.enableScheduleAsyncDeliver = enableScheduleAsyncDeliver;
+ }
+
+ public int getScheduleAsyncDeliverMaxPendingLimit() {
+ return scheduleAsyncDeliverMaxPendingLimit;
+ }
+
+ public void setScheduleAsyncDeliverMaxPendingLimit(int scheduleAsyncDeliverMaxPendingLimit) {
+ this.scheduleAsyncDeliverMaxPendingLimit = scheduleAsyncDeliverMaxPendingLimit;
+ }
+
+ public int getScheduleAsyncDeliverMaxResendNum2Blocked() {
+ return scheduleAsyncDeliverMaxResendNum2Blocked;
+ }
+
+ public void setScheduleAsyncDeliverMaxResendNum2Blocked(int scheduleAsyncDeliverMaxResendNum2Blocked) {
+ this.scheduleAsyncDeliverMaxResendNum2Blocked = scheduleAsyncDeliverMaxResendNum2Blocked;
+ }
}
diff --cc store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index 1ee8a88,774896b..8daa4d6
--- a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@@ -257,22 -231,14 +257,20 @@@ public class DefaultMappedFile extends
return this.fileFromOffset;
}
+ @Override
public boolean appendMessage(final byte[] data) {
+ return appendMessage(data, 0, data.length);
+ }
+
+ @Override
+ public boolean appendMessage(ByteBuffer data) {
int currentPos = this.wrotePosition.get();
+ int remaining = data.remaining();
- if ((currentPos + data.length) <= this.fileSize) {
+ if ((currentPos + remaining) <= this.fileSize) {
try {
- ByteBuffer buf = this.mappedByteBuffer.slice();
- buf.position(currentPos);
- buf.put(data);
+ this.fileChannel.position(currentPos);
- while (data.hasRemaining()) {
- this.fileChannel.write(data);
- }
++ this.fileChannel.write(ByteBuffer.wrap(data));
} catch (Throwable e) {
log.error("Error occurred when append message to mappedFile.", e);
}
diff --cc store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
index 677c983,d5b4e8d..4989bd3
--- a/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/schedule/ScheduleMessageService.java
@@@ -16,29 -16,35 +16,36 @@@
*/
package org.apache.rocketmq.store.schedule;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
- import java.util.Timer;
- import java.util.TimerTask;
+ import java.util.Queue;
+ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.LinkedBlockingQueue;
+ import java.util.concurrent.ScheduledExecutorService;
+ import java.util.concurrent.ScheduledThreadPoolExecutor;
+ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
import org.apache.rocketmq.common.ConfigManager;
import org.apache.rocketmq.common.MixAll;
+ import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.TopicFilterType;
import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.common.message.MessageAccessor;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.running.RunningStats;
-import org.apache.rocketmq.store.ConsumeQueue;
-import org.apache.rocketmq.store.ConsumeQueueExt;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
+ import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.MessageExtBrokerInner;
import org.apache.rocketmq.store.MessageStore;
import org.apache.rocketmq.store.PutMessageResult;
@@@ -313,158 -376,435 +379,419 @@@ public class ScheduleMessageService ext
}
public void executeOnTimeup() {
- ConsumeQueue cq =
- ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
+ ConsumeQueueInterface cq =
+ ScheduleMessageService.this.defaultMessageStore.getConsumeQueue(TopicValidator.RMQ_SYS_SCHEDULE_TOPIC,
delayLevel2QueueId(delayLevel));
- long failScheduleOffset = offset;
+ if (cq == null) {
+ this.scheduleNextTimerTask(this.offset, DELAY_FOR_A_WHILE);
+ return;
+ }
- SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
- if (bufferCQ == null) {
- long resetOffset;
- if ((resetOffset = cq.getMinOffsetInQueue()) > this.offset) {
- log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, queueId={}",
- this.offset, resetOffset, cq.getQueueId());
- } else if ((resetOffset = cq.getMaxOffsetInQueue()) < this.offset) {
- log.error("schedule CQ offset invalid. offset={}, cqMaxOffset={}, queueId={}",
- this.offset, resetOffset, cq.getQueueId());
- } else {
- resetOffset = this.offset;
- }
-
- this.scheduleNextTimerTask(resetOffset, DELAY_FOR_A_WHILE);
- return;
- }
-
- long nextOffset = this.offset;
- try {
- int i = 0;
- ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
- for (; i < bufferCQ.getSize() && isStarted(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
- long offsetPy = bufferCQ.getByteBuffer().getLong();
- int sizePy = bufferCQ.getByteBuffer().getInt();
- long tagsCode = bufferCQ.getByteBuffer().getLong();
-
- if (cq.isExtAddr(tagsCode)) {
- if (cq.getExt(tagsCode, cqExtUnit)) {
- tagsCode = cqExtUnit.getTagsCode();
- } else {
- //can't find ext content.So re compute tags code.
- log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
- tagsCode, offsetPy, sizePy);
- long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
- tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
- }
- }
-
- long now = System.currentTimeMillis();
- long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
- nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+ if (cq != null) {
+ ReferredIterator<CqUnit> bufferCQ = cq.iterateFrom(this.offset);
+ if (bufferCQ != null) {
+ try {
+ long nextOffset = offset;
+ while (bufferCQ.hasNext()) {
+ CqUnit cqUnit = bufferCQ.next();
+ long offsetPy = cqUnit.getPos();
+ int sizePy = cqUnit.getSize();
+ long tagsCode = cqUnit.getTagsCode();
+
+ if (!cqUnit.isTagsCodeValid()) {
+ //can't find ext content.So re compute tags code.
+ log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}",
+ tagsCode, offsetPy, sizePy);
+ long msgStoreTime = defaultMessageStore.getCommitLog().pickupStoreTimestamp(offsetPy, sizePy);
+ tagsCode = computeDeliverTimestamp(delayLevel, msgStoreTime);
+ }
+
+ long now = System.currentTimeMillis();
+ long deliverTimestamp = this.correctDeliverTimestamp(now, tagsCode);
+
+ long currOffset = cqUnit.getQueueOffset();
+ assert cqUnit.getBatchNum() == 1;
+ nextOffset = currOffset + cqUnit.getBatchNum();
- long countdown = deliverTimestamp - now;
-
- if (countdown <= 0) {
- MessageExt msgExt =
- ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(
- offsetPy, sizePy);
-
- if (msgExt != null) {
- try {
- MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
- if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
- log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
- msgInner.getTopic(), msgInner);
- continue;
- }
- PutMessageResult putMessageResult =
- ScheduleMessageService.this.writeMessageStore
- .putMessage(msgInner);
-
- if (putMessageResult != null
- && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
- if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
- ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getMsgNum());
- ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, putMessageResult.getAppendMessageResult().getWroteBytes());
- ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getMsgNum());
- ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, putMessageResult.getAppendMessageResult().getWroteBytes());
- ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
- ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(),
- putMessageResult.getAppendMessageResult().getWroteBytes());
- ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
- }
- continue;
- } else {
- // XXX: warn and notify me
- log.error(
- "ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
- msgExt.getTopic(), msgExt.getMsgId());
- ScheduleMessageService.this.timer.schedule(
- new DeliverDelayedMessageTimerTask(this.delayLevel,
- currOffset), DELAY_FOR_A_PERIOD);
- ScheduleMessageService.this.updateOffset(this.delayLevel,
- currOffset);
- return;
- }
- } catch (Exception e) {
- /*
- * XXX: warn and notify me
- */
- log.error(
- "ScheduleMessageService, messageTimeup execute error, drop it. msgExt={}, nextOffset={}, offsetPy={}, sizePy={}", msgExt, nextOffset, offsetPy, sizePy, e);
- }
- }
- } else {
- ScheduleMessageService.this.timer.schedule(
- new DeliverDelayedMessageTimerTask(this.delayLevel, currOffset),
- countdown);
- ScheduleMessageService.this.updateOffset(this.delayLevel, currOffset);
- return;
- }
- } // end of for
-
- ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
- this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
- ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
+ long countdown = deliverTimestamp - now;
+ if (countdown > 0) {
+ this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
return;
- } finally {
+ }
- bufferCQ.release();
+ MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
+ if (msgExt == null) {
+ continue;
}
- } // end of if (bufferCQ != null)
- else {
-
- long cqMinOffset = cq.getMinOffsetInQueue();
- long cqMaxOffset = cq.getMaxOffsetInQueue();
- if (offset < cqMinOffset) {
- failScheduleOffset = cqMinOffset;
- log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
- offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+
+ MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+ if (TopicValidator.RMQ_SYS_TRANS_HALF_TOPIC.equals(msgInner.getTopic())) {
+ log.error("[BUG] the real topic of schedule msg is {}, discard the msg. msg={}",
+ msgInner.getTopic(), msgInner);
+ continue;
}
- if (offset > cqMaxOffset) {
- failScheduleOffset = cqMaxOffset;
- log.error("schedule CQ offset invalid. offset={}, cqMinOffset={}, cqMaxOffset={}, queueId={}",
- offset, cqMinOffset, cqMaxOffset, cq.getQueueId());
+ boolean deliverSuc;
+ if (ScheduleMessageService.this.enableAsyncDeliver) {
+ deliverSuc = this.asyncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
+ } else {
+ deliverSuc = this.syncDeliver(msgInner, msgExt.getMsgId(), offset, offsetPy, sizePy);
+ }
+
+ if (!deliverSuc) {
+ this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
+ return;
}
}
- } // end of if (cq != null)
- ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
- failScheduleOffset), DELAY_FOR_A_WHILE);
+ nextOffset = this.offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
+ } catch (Exception e) {
+ log.error("ScheduleMessageService, messageTimeup execute error, offset = {}", nextOffset, e);
+ } finally {
+ bufferCQ.release();
+ }
+
+ this.scheduleNextTimerTask(nextOffset, DELAY_FOR_A_WHILE);
}
- private MessageExtBrokerInner messageTimeup(MessageExt msgExt) {
- MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
- msgInner.setBody(msgExt.getBody());
- msgInner.setFlag(msgExt.getFlag());
- MessageAccessor.setProperties(msgInner, msgExt.getProperties());
+ public void scheduleNextTimerTask(long offset, long delay) {
+ ScheduleMessageService.this.deliverExecutorService.schedule(new DeliverDelayedMessageTimerTask(
+ this.delayLevel, offset), delay, TimeUnit.MILLISECONDS);
+ }
+
+ private boolean syncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
+ int sizePy) {
+ PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, false);
+ PutMessageResult result = resultProcess.get();
+ boolean sendStatus = result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK;
+ if (sendStatus) {
+ ScheduleMessageService.this.updateOffset(this.delayLevel, resultProcess.getNextOffset());
+ }
+ return sendStatus;
+ }
+
+ private boolean asyncDeliver(MessageExtBrokerInner msgInner, String msgId, long offset, long offsetPy,
+ int sizePy) {
+ Queue<PutResultProcess> processesQueue = ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+ //Flow Control
+ int currentPendingNum = processesQueue.size();
+ int maxPendingLimit = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+ .getScheduleAsyncDeliverMaxPendingLimit();
+ if (currentPendingNum > maxPendingLimit) {
+ log.warn("Asynchronous deliver triggers flow control, " +
+ "currentPendingNum={}, maxPendingLimit={}", currentPendingNum, maxPendingLimit);
+ return false;
+ }
+
+ //Blocked
+ PutResultProcess firstProcess = processesQueue.peek();
+ if (firstProcess != null && firstProcess.need2Blocked()) {
+ log.warn("Asynchronous deliver block. info={}", firstProcess.toString());
+ return false;
+ }
+
+ PutResultProcess resultProcess = deliverMessage(msgInner, msgId, offset, offsetPy, sizePy, true);
+ processesQueue.add(resultProcess);
+ return true;
+ }
+
+ private PutResultProcess deliverMessage(MessageExtBrokerInner msgInner, String msgId, long offset,
+ long offsetPy, int sizePy, boolean autoResend) {
+ CompletableFuture<PutMessageResult> future =
+ ScheduleMessageService.this.writeMessageStore.asyncPutMessage(msgInner);
+ return new PutResultProcess()
+ .setTopic(msgInner.getTopic())
+ .setDelayLevel(this.delayLevel)
+ .setOffset(offset)
+ .setPhysicOffset(offsetPy)
+ .setPhysicSize(sizePy)
+ .setMsgId(msgId)
+ .setAutoResend(autoResend)
+ .setFuture(future)
+ .thenProcess();
+ }
+ }
+
+ public class HandlePutResultTask implements Runnable {
+ private final int delayLevel;
+
+ public HandlePutResultTask(int delayLevel) {
+ this.delayLevel = delayLevel;
+ }
- TopicFilterType topicFilterType = MessageExt.parseTopicFilterType(msgInner.getSysFlag());
- long tagsCodeValue =
- MessageExtBrokerInner.tagsString2tagsCode(topicFilterType, msgInner.getTags());
- msgInner.setTagsCode(tagsCodeValue);
- msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
+ @Override
+ public void run() {
+ LinkedBlockingQueue<PutResultProcess> pendingQueue =
+ ScheduleMessageService.this.deliverPendingTable.get(this.delayLevel);
+
+ PutResultProcess putResultProcess;
+ while ((putResultProcess = pendingQueue.peek()) != null) {
+ try {
+ switch (putResultProcess.getStatus()) {
+ case SUCCESS:
+ ScheduleMessageService.this.updateOffset(this.delayLevel, putResultProcess.getNextOffset());
+ pendingQueue.remove();
+ break;
+ case RUNNING:
+ break;
+ case EXCEPTION:
+ if (!isStarted()) {
+ log.warn("HandlePutResultTask shutdown, info={}", putResultProcess.toString());
+ return;
+ }
+ log.warn("putResultProcess error, info={}", putResultProcess.toString());
+ putResultProcess.onException();
+ break;
+ case SKIP:
+ log.warn("putResultProcess skip, info={}", putResultProcess.toString());
+ pendingQueue.remove();
+ break;
+ }
+ } catch (Exception e) {
+ log.error("HandlePutResultTask exception. info={}", putResultProcess.toString(), e);
+ putResultProcess.onException();
+ }
+ }
+
+ if (isStarted()) {
+ ScheduleMessageService.this.handleExecutorService
+ .schedule(new HandlePutResultTask(this.delayLevel), DELAY_FOR_A_SLEEP, TimeUnit.MILLISECONDS);
+ }
+ }
+ }
- msgInner.setSysFlag(msgExt.getSysFlag());
- msgInner.setBornTimestamp(msgExt.getBornTimestamp());
- msgInner.setBornHost(msgExt.getBornHost());
- msgInner.setStoreHost(msgExt.getStoreHost());
- msgInner.setReconsumeTimes(msgExt.getReconsumeTimes());
+ public class PutResultProcess {
+ private String topic;
+ private long offset;
+ private long physicOffset;
+ private int physicSize;
+ private int delayLevel;
+ private String msgId;
+ private boolean autoResend = false;
+ private CompletableFuture<PutMessageResult> future;
+
+ private volatile int resendCount = 0;
+ private volatile ProcessStatus status = ProcessStatus.RUNNING;
+
+ public PutResultProcess setTopic(String topic) {
+ this.topic = topic;
+ return this;
+ }
- msgInner.setWaitStoreMsgOK(false);
- MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
+ public PutResultProcess setOffset(long offset) {
+ this.offset = offset;
+ return this;
+ }
- msgInner.setTopic(msgInner.getProperty(MessageConst.PROPERTY_REAL_TOPIC));
+ public PutResultProcess setPhysicOffset(long physicOffset) {
+ this.physicOffset = physicOffset;
+ return this;
+ }
- String queueIdStr = msgInner.getProperty(MessageConst.PROPERTY_REAL_QUEUE_ID);
- int queueId = Integer.parseInt(queueIdStr);
- msgInner.setQueueId(queueId);
+ public PutResultProcess setPhysicSize(int physicSize) {
+ this.physicSize = physicSize;
+ return this;
+ }
- return msgInner;
+ public PutResultProcess setDelayLevel(int delayLevel) {
+ this.delayLevel = delayLevel;
+ return this;
}
+
+ public PutResultProcess setMsgId(String msgId) {
+ this.msgId = msgId;
+ return this;
+ }
+
+ public PutResultProcess setAutoResend(boolean autoResend) {
+ this.autoResend = autoResend;
+ return this;
+ }
+
+ public PutResultProcess setFuture(CompletableFuture<PutMessageResult> future) {
+ this.future = future;
+ return this;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public long getOffset() {
+ return offset;
+ }
+
+ public long getNextOffset() {
+ return offset + 1;
+ }
+
+ public long getPhysicOffset() {
+ return physicOffset;
+ }
+
+ public int getPhysicSize() {
+ return physicSize;
+ }
+
+ public Integer getDelayLevel() {
+ return delayLevel;
+ }
+
+ public String getMsgId() {
+ return msgId;
+ }
+
+ public boolean isAutoResend() {
+ return autoResend;
+ }
+
+ public CompletableFuture<PutMessageResult> getFuture() {
+ return future;
+ }
+
+ public int getResendCount() {
+ return resendCount;
+ }
+
+ public PutResultProcess thenProcess() {
+ this.future.thenAccept(result -> {
+ this.handleResult(result);
+ });
+
+ this.future.exceptionally(e -> {
+ log.error("ScheduleMessageService put message exceptionally, info: {}",
+ PutResultProcess.this.toString(), e);
+
+ onException();
+ return null;
+ });
+ return this;
+ }
+
+ private void handleResult(PutMessageResult result) {
+ if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+ onSuccess(result);
+ } else {
+ log.warn("ScheduleMessageService put message failed. info: {}.", result);
+ onException();
+ }
+ }
+
+ public void onSuccess(PutMessageResult result) {
+ this.status = ProcessStatus.SUCCESS;
+ if (ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig().isEnableScheduleMessageStats()) {
+ ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, result.getAppendMessageResult().getMsgNum());
+ ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incQueueGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, delayLevel - 1, result.getAppendMessageResult().getWroteBytes());
+ ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetNums(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getMsgNum());
+ ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incGroupGetSize(MixAll.SCHEDULE_CONSUMER_GROUP, TopicValidator.RMQ_SYS_SCHEDULE_TOPIC, result.getAppendMessageResult().getWroteBytes());
+ ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutNums(this.topic, result.getAppendMessageResult().getMsgNum(), 1);
+ ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incTopicPutSize(this.topic, result.getAppendMessageResult().getWroteBytes());
+ ScheduleMessageService.this.defaultMessageStore.getBrokerStatsManager().incBrokerPutNums(result.getAppendMessageResult().getMsgNum());
+ }
+ }
+
+ public void onException() {
+ log.warn("ScheduleMessageService onException, info: {}", this.toString());
+ if (this.autoResend) {
+ this.resend();
+ } else {
+ this.status = ProcessStatus.SKIP;
+ }
+ }
+
+ public ProcessStatus getStatus() {
+ return this.status;
+ }
+
+ public PutMessageResult get() {
+ try {
+ return this.future.get();
+ } catch (InterruptedException | ExecutionException e) {
+ return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, null);
+ }
+ }
+
+ private void resend() {
+ log.info("Resend message, info: {}", this.toString());
+
+ // Gradually increase the resend interval.
+ try {
+ Thread.sleep(Math.min(this.resendCount++ * 100, 60 * 1000));
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ try {
+ MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(this.physicOffset, this.physicSize);
+ if (msgExt == null) {
+ log.warn("ScheduleMessageService resend not found message. info: {}", this.toString());
+ this.status = need2Skip() ? ProcessStatus.SKIP : ProcessStatus.EXCEPTION;
+ return;
+ }
+
+ MessageExtBrokerInner msgInner = ScheduleMessageService.this.messageTimeup(msgExt);
+ PutMessageResult result = ScheduleMessageService.this.writeMessageStore.putMessage(msgInner);
+ this.handleResult(result);
+ if (result != null && result.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
+ log.info("Resend message success, info: {}", this.toString());
+ }
+ } catch (Exception e) {
+ this.status = ProcessStatus.EXCEPTION;
+ log.error("Resend message error, info: {}", this.toString(), e);
+ }
+ }
+
+ public boolean need2Blocked() {
+ int maxResendNum2Blocked = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+ .getScheduleAsyncDeliverMaxResendNum2Blocked();
+ return this.resendCount > maxResendNum2Blocked;
+ }
+
+ public boolean need2Skip() {
+ int maxResendNum2Blocked = ScheduleMessageService.this.defaultMessageStore.getMessageStoreConfig()
+ .getScheduleAsyncDeliverMaxResendNum2Blocked();
+ return this.resendCount > maxResendNum2Blocked * 2;
+ }
+
+ @Override
+ public String toString() {
+ return "PutResultProcess{" +
+ "topic='" + topic + '\'' +
+ ", offset=" + offset +
+ ", physicOffset=" + physicOffset +
+ ", physicSize=" + physicSize +
+ ", delayLevel=" + delayLevel +
+ ", msgId='" + msgId + '\'' +
+ ", autoResend=" + autoResend +
+ ", resendCount=" + resendCount +
+ ", status=" + status +
+ '}';
+ }
+ }
+
+ public enum ProcessStatus {
+ /**
+ * In process, the processing result has not yet been returned.
+ * */
+ RUNNING,
+
+ /**
+ * Put message success.
+ * */
+ SUCCESS,
+
+ /**
+ * Put message exception.
+ * When autoResend is true, the message will be resend.
+ * */
+ EXCEPTION,
+
+ /**
+ * Skip put message.
+ * When the message cannot be looked, the message will be skipped.
+ * */
+ SKIP,
}
}
diff --cc store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index d19ab5e,668c069..4acd9a9
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@@ -26,13 -26,11 +26,14 @@@ import java.net.UnknownHostException
import java.util.Map;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
+ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.queue.ConsumeQueueInterface;
+import org.apache.rocketmq.store.queue.CqUnit;
+import org.apache.rocketmq.store.queue.ReferredIterator;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
+import org.junit.Assert;
import org.junit.Test;
import static org.assertj.core.api.Assertions.assertThat;
diff --cc store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
index 356e653,d8202eb..2636d40
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreCleanFilesTest.java
@@@ -481,9 -486,9 +481,9 @@@ public class DefaultMessageStoreCleanFi
private void initMessageStore(MessageStoreConfig messageStoreConfig, double diskSpaceCleanForciblyRatio) throws Exception {
messageStore = new DefaultMessageStore(messageStoreConfig,
- new BrokerStatsManager("test"), new MyMessageArrivingListener(), new BrokerConfig());
+ new BrokerStatsManager("test", true), new MyMessageArrivingListener(), new BrokerConfig());
- cleanCommitLogService = getCleanCommitLogService(diskSpaceCleanForciblyRatio);
+ cleanCommitLogService = getCleanCommitLogService();
cleanConsumeQueueService = getCleanConsumeQueueService();
assertTrue(messageStore.load());
diff --cc store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
index e27483e,788bdbd..2170759
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreShutDownTest.java
@@@ -71,8 -70,8 +70,8 @@@ public class DefaultMessageStoreShutDow
messageStoreConfig.setMaxHashSlotNum(10000);
messageStoreConfig.setMaxIndexNum(100 * 100);
messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
+ messageStoreConfig.setHaListenPort(StoreTestBase.nextPort());
- return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), null, new BrokerConfig());
+ return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), null, new BrokerConfig());
}
-
}
diff --cc store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
index 932fc2f,b565c5c..9e6a983
--- a/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/DefaultMessageStoreTest.java
@@@ -114,8 -113,7 +115,8 @@@ public class DefaultMessageStoreTest
messageStoreConfig.setMaxIndexNum(100 * 100);
messageStoreConfig.setFlushDiskType(FlushDiskType.SYNC_FLUSH);
messageStoreConfig.setFlushIntervalConsumeQueue(1);
+ messageStoreConfig.setHaListenPort(StoreTestBase.nextPort());
- return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest"), new MyMessageArrivingListener(), new BrokerConfig());
+ return new DefaultMessageStore(messageStoreConfig, new BrokerStatsManager("simpleTest", true), new MyMessageArrivingListener(), new BrokerConfig());
}
@Test
diff --cc test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
index 32af3bd,0f1c4bf..173f608
--- a/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
+++ b/test/src/test/java/org/apache/rocketmq/test/base/BaseConf.java
@@@ -95,31 -68,14 +95,34 @@@ public class BaseConf
}
+ // This method can't be placed in the static block of BaseConf, which seems to lead to a strange dead lock.
+ public static void waitBrokerRegistered(final String nsAddr, final String clusterName, final int expectedBrokerNum) {
+ final DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt(500);
+ mqAdminExt.setNamesrvAddr(nsAddr);
+ try {
+ mqAdminExt.start();
+ await().atMost(30, TimeUnit.SECONDS).until(() -> {
+ List<BrokerData> brokerDatas = mqAdminExt.examineTopicRouteInfo(clusterName).getBrokerDatas();
+ return brokerDatas.size() == expectedBrokerNum;
+ });
+ for (BrokerController brokerController: brokerControllerList) {
+ brokerController.getBrokerOuterAPI().refreshMetadata();
+ }
+ } catch (Exception e) {
+ log.error("init failed, please check BaseConf", e);
+ Assert.fail(e.getMessage());
+ }
+ ForkJoinPool.commonPool().execute(mqAdminExt::shutdown);
+ }
+
public static String initTopic() {
- String topic = "tt-" + MQRandomUtils.getRandomTopic();
- IntegrationTestBase.initTopic(topic, nsAddr, clusterName, CQType.SimpleCQ);
+ String topic = MQRandomUtils.getRandomTopic();
+ return initTopicWithName(topic);
+ }
- return topic;
+ public static String initTopicWithName(String topicName) {
+ IntegrationTestBase.initTopic(topicName, nsAddr, clusterName);
+ return topicName;
}
public static String initConsumerGroup() {