You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ji...@apache.org on 2023/02/10 05:42:05 UTC

[rocketmq] branch dledger-controller-brokerId updated: [ISSUE #6023] Add a unit test to verify new register process in broker with controller mode (#6024)

This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch dledger-controller-brokerId
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/dledger-controller-brokerId by this push:
     new 65ae97d73 [ISSUE #6023] Add a unit test to verify new register process in broker with controller mode (#6024)
65ae97d73 is described below

commit 65ae97d73e442c4f55791176f60d1d528736c7ff
Author: TheR1sing3un <87...@users.noreply.github.com>
AuthorDate: Fri Feb 10 13:41:58 2023 +0800

    [ISSUE #6023] Add a unit test to verify new register process in broker with controller mode (#6024)
    
    * refactor: simplify getPID (#5962)
    
    * [ISSUE #5923] Add example tiered storage backend service provider (#5926)
    
    * implement example file segment
    
    * add metrics
    
    * add readme
    
    * fix license
    
    * fix tests
    
    * fix links in README.md
    
    * add comment to PosixFileSegment and mark as experimental
    
    * fix test
    
    * optimize image quality
    
    * Remove the useless exception class: MQRedirectException #5963
    
    * [ISSUE #5965] Fix lmqTopicQueueTable initialization (#5968)
    
    * [ISSUE #5965] Fix lmqTopicQueueTable initialization
    
    * [ISSUE #5965] Fix lmqTopicQueueTable initialization
    
    * [ISSUE #5890] Fix dledger logging (#5959)
    
    * Fix dledger logging
    
    * Add bridge into store module
    
    * [ISSUE #5860] Set the value of order when create or update topic (#5861)
    
    * [ISSUE #5939]Adjust the MQClientInstance#sendHeartbeatToAllBroker catch code block log print level from info to warn (#5940)
    
    * [ISSUE #5924] Optimize UtilAll#sleep method (#5925)
    
    * [ISSUE #5924]Optimize UtilAll#sleep method
    
    * polish code
    
    * [ISSUE #5986] optimize the BrokerOuterAPITest class code
    
    Co-authored-by: zhouyunpeng <24...@qq.com>
    
    * [ISSUE #5971] Make the internal logs related to the dledger in the controller print to a file separately (#5972)
    
    * Make the internal logs related to the dledger in the controller print to a file separately
    
    * Make the internal logs related to the dledger in the controller print to a file separately
    
    * [ISSUE #5969] Remvoe duplicate deleteUnusedStats in admin processor (#5973)
    
    * [ISSUE #5847] Add checkBlock for hasMsgFromQueue
    
    * [ISSUE #5983] Make consumer support flow control code better (#5984)
    
    * When encountering the flow control code, pull it after 20ms instead of 3s
    
    * When encountering the flow control code, pull it after 20ms instead of 3s
    
    * [ISSUE #5896] feat:add pop consumer example (#5991)
    
    * feat:add pop consumer
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * Update example/src/main/java/org/apache/rocketmq/example/simple/PushConsumer.java
    
    Co-authored-by: Oliver <wq...@163.com>
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    * feat:fix
    
    ---------
    
    Co-authored-by: mahaitao617 <ma...@mahaitao617deMacBook-Pro.local>
    Co-authored-by: Oliver <wq...@163.com>
    
    * [ISSUE #5942] Fix the produce count include the quantity of the system topic(#5943)
    
    * [ISSUE #5999] Fix the TopicQueueMappingUtils comments typo (#6000)
    
    * [ISSUE #5996] Optimize the RemotingSerializable class code (#5998)
    
    * simplified RemotingSerializable null check
    
    * optimize the RemotingSerializable class code
    
    * [ISSUE #5994] [RIP-46] add pop and timer metrics (#5995)
    
    * add pop and timer metrics
    * fix according to review comment
    
    * test(broker): add ReplicasManagerRegisterTest to test the register process
    
    1. add ReplicasManagerRegisterTest to test the register process
    
    * chore(pom): modify pom.xml to replace mockito with powermock
    
    1. modify pom.xml to replace mockito with powermock
    
    * build(bazel): export powermock in bazel
    
    1. export powermock in bazel
    
    ---------
    
    Co-authored-by: Xinda <xd...@gmail.com>
    Co-authored-by: SSpirits <ad...@lv5.moe>
    Co-authored-by: loboxu <lo...@tencent.com>
    Co-authored-by: pingww <pi...@gmail.com>
    Co-authored-by: Aaron Ai <ya...@gmail.com>
    Co-authored-by: Slideee <ye...@corp.netease.com>
    Co-authored-by: mxsm <lj...@gmail.com>
    Co-authored-by: hardyfish <85...@users.noreply.github.com>
    Co-authored-by: zhouyunpeng <24...@qq.com>
    Co-authored-by: rongtong <ji...@163.com>
    Co-authored-by: zhiliatom <87...@users.noreply.github.com>
    Co-authored-by: zhouxiang <zh...@alibaba-inc.com>
    Co-authored-by: mahaitao <15...@163.com>
    Co-authored-by: mahaitao617 <ma...@mahaitao617deMacBook-Pro.local>
    Co-authored-by: Oliver <wq...@163.com>
---
 BUILD.bazel                                        |   2 +
 WORKSPACE                                          |   3 +
 broker/pom.xml                                     |   4 +
 .../broker/controller/ReplicasManager.java         |  52 ++--
 .../broker/metrics/BrokerMetricsManager.java       |   5 +
 .../broker/metrics/PopMetricsConstant.java         |  33 +++
 .../rocketmq/broker/metrics/PopMetricsManager.java | 212 +++++++++++++++
 .../broker/metrics/PopReviveMessageType.java       |  24 +-
 .../broker/processor/AckMessageProcessor.java      |   8 +-
 .../broker/processor/AdminBrokerProcessor.java     |  29 +-
 .../processor/ChangeInvisibleTimeProcessor.java    |  14 +-
 .../broker/processor/NotificationProcessor.java    |   4 +
 .../broker/processor/PopBufferMergeService.java    |  16 +-
 .../broker/processor/PopReviveService.java         |  41 ++-
 .../broker/processor/ReplyMessageProcessor.java    |   2 +-
 .../broker/processor/SendMessageProcessor.java     |   2 +-
 .../broker/schedule/ScheduleMessageService.java    |   2 +-
 broker/src/main/resources/rmq.broker.logback.xml   |  34 +++
 .../apache/rocketmq/broker/BrokerOuterAPITest.java |   5 +-
 .../controller/ReplicasManagerRegisterTest.java    | 302 +++++++++++++++++++++
 .../broker/processor/PopReviveServiceTest.java     |  22 +-
 .../impl/consumer/DefaultLitePullConsumerImpl.java |  23 +-
 .../impl/consumer/DefaultMQPushConsumerImpl.java   |  18 +-
 .../client/impl/factory/MQClientInstance.java      |   6 +-
 .../java/org/apache/rocketmq/common/MixAll.java    |   4 +-
 .../java/org/apache/rocketmq/common/UtilAll.java   |  10 +-
 container/pom.xml                                  |   6 -
 controller/pom.xml                                 |   4 +-
 .../src/main/resources/rmq.controller.logback.xml  |  28 ++
 example/pom.xml                                    |   8 +
 .../rocketmq/example/quickstart/Producer.java      |   2 +-
 .../rocketmq/example/simple/PopConsumer.java       |  62 +++++
 pom.xml                                            |  40 ++-
 .../remoting/protocol/RemotingSerializable.java    |   8 +-
 .../controller/ElectMasterResponseHeader.java      |   7 +
 .../statictopic/TopicQueueMappingUtils.java        |   8 +-
 store/pom.xml                                      |   6 +-
 .../store/ha/autoswitch/BrokerMetadata.java        |  15 +
 .../rocketmq/store/ha/autoswitch/MetadataFile.java |  13 +-
 .../store/metrics/DefaultStoreMetricsConstant.java |  10 +
 .../store/metrics/DefaultStoreMetricsManager.java  |  86 ++++++
 .../rocketmq/store/queue/ConsumeQueueStore.java    |   1 +
 .../rocketmq/store/queue/QueueOffsetAssigner.java  |  13 +
 .../apache/rocketmq/store/stats/BrokerStats.java   |   4 +-
 .../rocketmq/store/stats/BrokerStatsManager.java   |  27 +-
 .../rocketmq/store/timer/TimerMessageStore.java    |  93 ++++---
 .../store/stats/BrokerStatsManagerTest.java        |  13 +
 test/pom.xml                                       |   4 -
 tieredstore/README.md                              |  64 +++++
 tieredstore/pom.xml                                |  10 +-
 .../rocketmq/tieredstore/TieredDispatcher.java     |  11 +-
 .../common/TieredMessageStoreConfig.java           |  12 +-
 .../tieredstore/container/TieredIndexFile.java     |   4 +-
 .../metadata/TieredMetadataManager.java            |  16 +-
 .../metadata/TieredMetadataSerializeWrapper.java   |  32 ++-
 .../metrics/TieredStoreMetricsConstant.java        |   4 +-
 .../metrics/TieredStoreMetricsManager.java         |  12 +-
 .../tieredstore/provider/TieredFileSegment.java    |  29 +-
 ...ckendProvider.java => TieredStoreProvider.java} |   2 +-
 .../provider/posix/PosixFileSegment.java           | 237 ++++++++++++++++
 .../rocketmq/tieredstore/util/TieredStoreUtil.java |   2 +-
 .../tieredstore/TieredMessageFetcherTest.java      |   3 +
 .../tieredstore/container/TieredIndexFileTest.java |   2 +
 .../tieredstore/metadata/MetadataStoreTest.java    | 112 +++++---
 .../provider/posix/PosixFileSegmentTest.java       |  69 +++++
 tieredstore/tiered_storage_arch.png                | Bin 0 -> 440317 bytes
 66 files changed, 1683 insertions(+), 273 deletions(-)

diff --git a/BUILD.bazel b/BUILD.bazel
index 0663d5774..358527c31 100644
--- a/BUILD.bazel
+++ b/BUILD.bazel
@@ -37,6 +37,8 @@ java_library(
         "@maven//:org_assertj_assertj_core",
         "@maven//:org_hamcrest_hamcrest_library",
         "@maven//:org_mockito_mockito_core",
+        "@maven//:org_powermock_powermock_module_junit4",
+        "@maven//:org_powermock_powermock_api_mockito2",
         "@maven//:org_hamcrest_hamcrest_core",
         "@maven//:ch_qos_logback_logback_classic",
         "@maven//:org_awaitility_awaitility",
diff --git a/WORKSPACE b/WORKSPACE
index 267959878..d3994ca21 100644
--- a/WORKSPACE
+++ b/WORKSPACE
@@ -45,6 +45,9 @@ maven_install(
         "io.netty:netty-all:4.1.65.Final",
         "org.assertj:assertj-core:3.22.0",
         "org.mockito:mockito-core:3.10.0",
+        "org.powermock:powermock-module-junit4:2.0.9",
+        "org.powermock:powermock-api-mockito2:2.0.9",
+
         "com.github.luben:zstd-jni:1.5.2-2",
         "org.lz4:lz4-java:1.8.0",
         "commons-validator:commons-validator:1.7",
diff --git a/broker/pom.xml b/broker/pom.xml
index 9642cef3c..6d4c99e53 100644
--- a/broker/pom.xml
+++ b/broker/pom.xml
@@ -34,6 +34,10 @@
             <groupId>${project.groupId}</groupId>
             <artifactId>rocketmq-store</artifactId>
         </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-tiered-store</artifactId>
+        </dependency>
         <dependency>
             <groupId>io.github.aliyunmq</groupId>
             <artifactId>rocketmq-slf4j-api</artifactId>
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
index 78f40495d..d009726a7 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/controller/ReplicasManager.java
@@ -185,7 +185,7 @@ public class ReplicasManager {
                 }
             }
             // register 5 times but still unsuccessful
-            if (this.state == State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE) {
+            if (this.state != State.FIRST_TIME_REGISTER_TO_CONTROLLER_DONE) {
                 return false;
             }
         }
@@ -208,6 +208,7 @@ public class ReplicasManager {
 
     public void shutdown() {
         this.state = State.SHUTDOWN;
+        this.registerState = RegisterState.INITIAL;
         this.executorService.shutdown();
         this.scheduledService.shutdown();
     }
@@ -369,37 +370,6 @@ public class ReplicasManager {
         }
     }
 
-//    private boolean registerBrokerToController() {
-//        // Register this broker to controller to get a stable and credible broker id, and persist metadata to local file.
-//        try {
-//            final RegisterBrokerToControllerResponseHeader registerResponse = this.brokerOuterAPI.registerBrokerToController(this.controllerLeaderAddress,
-//                this.brokerConfig.getBrokerClusterName(), this.brokerConfig.getBrokerName(), this.localAddress, this.brokerId, this.brokerConfig.getControllerHeartBeatTimeoutMills(),
-//                this.haService.getLastEpoch(), this.brokerController.getMessageStore().getMaxPhyOffset(), this.brokerConfig.getBrokerElectionPriority());
-//            final String newMasterAddress = registerResponse.getMasterAddress();
-//            if (StringUtils.isNoneEmpty(newMasterAddress)) {
-//                if (StringUtils.equals(newMasterAddress, this.localAddress)) {
-//                    changeToMaster(registerResponse.getMasterEpoch(), registerResponse.getSyncStateSetEpoch());
-//                } else {
-//                    changeToSlave(newMasterAddress, registerResponse.getMasterEpoch(), registerResponse.getBrokerId());
-//                }
-//                // Set isolated to false, make broker can register to namesrv regularly
-//                brokerController.setIsolated(false);
-//            } else {
-//                // if master address is empty, just apply the brokerId
-//                if (registerResponse.getBrokerId() <= 0) {
-//                    // wrong broker id
-//                    LOGGER.error("Register to controller but receive a invalid broker id = {}", registerResponse.getBrokerId());
-//                    return false;
-//                }
-//                this.brokerConfig.setBrokerId(registerResponse.getBrokerId());
-//            }
-//            return true;
-//        } catch (final Exception e) {
-//            LOGGER.error("Failed to register broker to controller", e);
-//            return false;
-//        }
-//    }
-
     /**
      * Register broker to controller, and persist the metadata to file
      * @return whether registering process succeeded
@@ -489,7 +459,7 @@ public class ReplicasManager {
             return true;
 
         } catch (Exception e) {
-            LOGGER.error("fail to apply broker id", e);
+            LOGGER.error("fail to apply broker id: {}", e, tempBrokerMetadata.getBrokerId());
             return false;
         }
     }
@@ -776,4 +746,20 @@ public class ReplicasManager {
     public Long getBrokerId() {
         return brokerId;
     }
+
+    public RegisterState getRegisterState() {
+        return registerState;
+    }
+
+    public State getState() {
+        return state;
+    }
+
+    public BrokerMetadata getBrokerMetadata() {
+        return brokerMetadata;
+    }
+
+    public TempBrokerMetadata getTempBrokerMetadata() {
+        return tempBrokerMetadata;
+    }
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index 9fffb1eda..060b051ff 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -336,6 +336,10 @@ public class BrokerMetricsManager {
         for (Pair<InstrumentSelector, View> selectorViewPair : messageStore.getMetricsView()) {
             providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2());
         }
+
+        for (Pair<InstrumentSelector, View> selectorViewPair : PopMetricsManager.getMetricsView()) {
+            providerBuilder.registerView(selectorViewPair.getObject1(), selectorViewPair.getObject2());
+        }
     }
 
     private void initStatsMetrics() {
@@ -494,6 +498,7 @@ public class BrokerMetricsManager {
     private void initOtherMetrics() {
         RemotingMetricsManager.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
         messageStore.initMetrics(brokerMeter, BrokerMetricsManager::newAttributesBuilder);
+        PopMetricsManager.initMetrics(brokerMeter, brokerController, BrokerMetricsManager::newAttributesBuilder);
     }
 
     public void shutdown() {
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsConstant.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsConstant.java
new file mode 100644
index 000000000..41917ed50
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsConstant.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.metrics;
+
+public class PopMetricsConstant {
+    public static final String HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME = "rocketmq_pop_buffer_scan_time_consume";
+    public static final String COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL = "rocketmq_pop_revive_in_message_total";
+    public static final String COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL = "rocketmq_pop_revive_out_message_total";
+    public static final String COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL = "rocketmq_pop_revive_retry_messages_total";
+
+    public static final String GAUGE_POP_REVIVE_LAG = "rocketmq_pop_revive_lag";
+    public static final String GAUGE_POP_REVIVE_LATENCY = "rocketmq_pop_revive_latency";
+    public static final String GAUGE_POP_OFFSET_BUFFER_SIZE = "rocketmq_pop_offset_buffer_size";
+    public static final String GAUGE_POP_CHECKPOINT_BUFFER_SIZE = "rocketmq_pop_checkpoint_buffer_size";
+
+    public static final String LABEL_REVIVE_MESSAGE_TYPE = "revive_message_type";
+    public static final String LABEL_PUT_STATUS = "put_status";
+    public static final String LABEL_QUEUE_ID = "queue_id";
+}
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
new file mode 100644
index 000000000..463371d7e
--- /dev/null
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopMetricsManager.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.broker.metrics;
+
+import com.google.common.collect.Lists;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import io.opentelemetry.sdk.metrics.Aggregation;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.View;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.function.Supplier;
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.processor.PopBufferMergeService;
+import org.apache.rocketmq.broker.processor.PopReviveService;
+import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.metrics.NopLongCounter;
+import org.apache.rocketmq.common.metrics.NopLongHistogram;
+import org.apache.rocketmq.store.PutMessageStatus;
+import org.apache.rocketmq.store.pop.AckMsg;
+import org.apache.rocketmq.store.pop.PopCheckPoint;
+
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_CONSUMER_GROUP;
+import static org.apache.rocketmq.broker.metrics.BrokerMetricsConstant.LABEL_TOPIC;
+import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL;
+import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL;
+import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL;
+import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.GAUGE_POP_CHECKPOINT_BUFFER_SIZE;
+import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.GAUGE_POP_OFFSET_BUFFER_SIZE;
+import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.GAUGE_POP_REVIVE_LAG;
+import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.GAUGE_POP_REVIVE_LATENCY;
+import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME;
+import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.LABEL_PUT_STATUS;
+import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.LABEL_QUEUE_ID;
+import static org.apache.rocketmq.broker.metrics.PopMetricsConstant.LABEL_REVIVE_MESSAGE_TYPE;
+
+public class PopMetricsManager {
+    public static Supplier<AttributesBuilder> attributesBuilderSupplier;
+
+    private static LongHistogram popBufferScanTimeConsume = new NopLongHistogram();
+    private static LongCounter popRevivePutTotal = new NopLongCounter();
+    private static LongCounter popReviveGetTotal = new NopLongCounter();
+    private static LongCounter popReviveRetryMessageTotal = new NopLongCounter();
+
+    public static List<Pair<InstrumentSelector, View>> getMetricsView() {
+        List<Double> rpcCostTimeBuckets = Arrays.asList(
+            (double) Duration.ofMillis(1).toMillis(),
+            (double) Duration.ofMillis(10).toMillis(),
+            (double) Duration.ofMillis(100).toMillis(),
+            (double) Duration.ofSeconds(1).toMillis(),
+            (double) Duration.ofSeconds(2).toMillis(),
+            (double) Duration.ofSeconds(3).toMillis()
+        );
+        InstrumentSelector popBufferScanTimeConsumeSelector = InstrumentSelector.builder()
+            .setType(InstrumentType.HISTOGRAM)
+            .setName(HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME)
+            .build();
+        View popBufferScanTimeConsumeView = View.builder()
+            .setAggregation(Aggregation.explicitBucketHistogram(rpcCostTimeBuckets))
+            .build();
+        return Lists.newArrayList(new Pair<>(popBufferScanTimeConsumeSelector, popBufferScanTimeConsumeView));
+    }
+
+    public static void initMetrics(Meter meter, BrokerController brokerController,
+        Supplier<AttributesBuilder> attributesBuilderSupplier) {
+        PopMetricsManager.attributesBuilderSupplier = attributesBuilderSupplier;
+
+        popBufferScanTimeConsume = meter.histogramBuilder(HISTOGRAM_POP_BUFFER_SCAN_TIME_CONSUME)
+            .setDescription("Time consuming of pop buffer scan")
+            .setUnit("milliseconds")
+            .ofLongs()
+            .build();
+        popRevivePutTotal = meter.counterBuilder(COUNTER_POP_REVIVE_IN_MESSAGE_TOTAL)
+            .setDescription("Total number of put message to revive topic")
+            .build();
+        popReviveGetTotal = meter.counterBuilder(COUNTER_POP_REVIVE_OUT_MESSAGE_TOTAL)
+            .setDescription("Total number of get message from revive topic")
+            .build();
+        popReviveRetryMessageTotal = meter.counterBuilder(COUNTER_POP_REVIVE_RETRY_MESSAGES_TOTAL)
+            .setDescription("Total number of put message to pop retry topic")
+            .build();
+
+        meter.gaugeBuilder(GAUGE_POP_OFFSET_BUFFER_SIZE)
+            .setDescription("Time number of buffered offset")
+            .ofLongs()
+            .buildWithCallback(measurement -> calculatePopBufferOffsetSize(brokerController, measurement));
+        meter.gaugeBuilder(GAUGE_POP_CHECKPOINT_BUFFER_SIZE)
+            .setDescription("The number of buffered checkpoint")
+            .ofLongs()
+            .buildWithCallback(measurement -> calculatePopBufferCkSize(brokerController, measurement));
+        meter.gaugeBuilder(GAUGE_POP_REVIVE_LAG)
+            .setDescription("The processing lag of revive topic")
+            .setUnit("milliseconds")
+            .ofLongs()
+            .buildWithCallback(measurement -> calculatePopReviveLag(brokerController, measurement));
+        meter.gaugeBuilder(GAUGE_POP_REVIVE_LATENCY)
+            .setDescription("The processing latency of revive topic")
+            .setUnit("milliseconds")
+            .ofLongs()
+            .buildWithCallback(measurement -> calculatePopReviveLatency(brokerController, measurement));
+    }
+
+    private static void calculatePopBufferOffsetSize(BrokerController brokerController,
+        ObservableLongMeasurement measurement) {
+        PopBufferMergeService popBufferMergeService = brokerController.getPopMessageProcessor().getPopBufferMergeService();
+        measurement.record(popBufferMergeService.getOffsetTotalSize(), newAttributesBuilder().build());
+    }
+
+    private static void calculatePopBufferCkSize(BrokerController brokerController,
+        ObservableLongMeasurement measurement) {
+        PopBufferMergeService popBufferMergeService = brokerController.getPopMessageProcessor().getPopBufferMergeService();
+        measurement.record(popBufferMergeService.getBufferedCKSize(), newAttributesBuilder().build());
+    }
+
+    private static void calculatePopReviveLatency(BrokerController brokerController,
+        ObservableLongMeasurement measurement) {
+        PopReviveService[] popReviveServices = brokerController.getAckMessageProcessor().getPopReviveServices();
+        for (PopReviveService popReviveService : popReviveServices) {
+            measurement.record(popReviveService.getReviveBehindMillis(), newAttributesBuilder()
+                .put(LABEL_QUEUE_ID, popReviveService.getQueueId())
+                .build());
+        }
+    }
+
+    private static void calculatePopReviveLag(BrokerController brokerController,
+        ObservableLongMeasurement measurement) {
+        PopReviveService[] popReviveServices = brokerController.getAckMessageProcessor().getPopReviveServices();
+        for (PopReviveService popReviveService : popReviveServices) {
+            measurement.record(popReviveService.getReviveBehindMessages(), newAttributesBuilder()
+                .put(LABEL_QUEUE_ID, popReviveService.getQueueId())
+                .build());
+        }
+    }
+
+    public static void incPopReviveAckPutCount(AckMsg ackMsg, PutMessageStatus status) {
+        incPopRevivePutCount(ackMsg.getConsumerGroup(), ackMsg.getTopic(), PopReviveMessageType.ACK, status, 1);
+    }
+
+    public static void incPopReviveCkPutCount(PopCheckPoint checkPoint, PutMessageStatus status) {
+        incPopRevivePutCount(checkPoint.getCId(), checkPoint.getTopic(), PopReviveMessageType.CK, status, 1);
+    }
+
+    public static void incPopRevivePutCount(String group, String topic, PopReviveMessageType messageType,
+        PutMessageStatus status, int num) {
+        Attributes attributes = newAttributesBuilder()
+            .put(LABEL_CONSUMER_GROUP, group)
+            .put(LABEL_TOPIC, topic)
+            .put(LABEL_REVIVE_MESSAGE_TYPE, messageType.name())
+            .put(LABEL_PUT_STATUS, status.name())
+            .build();
+        popRevivePutTotal.add(num, attributes);
+    }
+
+    public static void incPopReviveAckGetCount(AckMsg ackMsg, int queueId) {
+        incPopReviveGetCount(ackMsg.getConsumerGroup(), ackMsg.getTopic(), PopReviveMessageType.ACK, queueId, 1);
+    }
+
+    public static void incPopReviveCkGetCount(PopCheckPoint checkPoint, int queueId) {
+        incPopReviveGetCount(checkPoint.getCId(), checkPoint.getTopic(), PopReviveMessageType.CK, queueId, 1);
+    }
+
+    public static void incPopReviveGetCount(String group, String topic, PopReviveMessageType messageType, int queueId,
+        int num) {
+        AttributesBuilder builder = newAttributesBuilder();
+        Attributes attributes = builder
+            .put(LABEL_CONSUMER_GROUP, group)
+            .put(LABEL_TOPIC, topic)
+            .put(LABEL_QUEUE_ID, queueId)
+            .put(LABEL_REVIVE_MESSAGE_TYPE, messageType.name())
+            .build();
+        popReviveGetTotal.add(num, attributes);
+    }
+
+    public static void incPopReviveRetryMessageCount(PopCheckPoint checkPoint, PutMessageStatus status) {
+        AttributesBuilder builder = newAttributesBuilder();
+        Attributes attributes = builder
+            .put(LABEL_CONSUMER_GROUP, checkPoint.getCId())
+            .put(LABEL_TOPIC, checkPoint.getTopic())
+            .put(LABEL_PUT_STATUS, status.name())
+            .build();
+        popReviveRetryMessageTotal.add(1, attributes);
+    }
+
+    public static void recordPopBufferScanTimeConsume(long time) {
+        popBufferScanTimeConsume.record(time, newAttributesBuilder().build());
+    }
+
+    public static AttributesBuilder newAttributesBuilder() {
+        return attributesBuilderSupplier != null ? attributesBuilderSupplier.get() : Attributes.builder();
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/client/exception/MQRedirectException.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopReviveMessageType.java
similarity index 55%
rename from client/src/main/java/org/apache/rocketmq/client/exception/MQRedirectException.java
rename to broker/src/main/java/org/apache/rocketmq/broker/metrics/PopReviveMessageType.java
index 6ed395ead..3f6fe9c47 100644
--- a/client/src/main/java/org/apache/rocketmq/client/exception/MQRedirectException.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/PopReviveMessageType.java
@@ -14,25 +14,9 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.client.exception;
+package org.apache.rocketmq.broker.metrics;
 
-public class MQRedirectException extends MQBrokerException {
-    private static final StackTraceElement[] UNASSIGNED_STACK = new StackTraceElement[0];
-
-    private final byte[] body;
-
-    public MQRedirectException(byte[] responseBody) {
-        this.body = responseBody;
-    }
-
-    // This exception class is used as a flow control item, so stack trace is useless and performance killer.
-    @Override
-    public synchronized Throwable fillInStackTrace() {
-        this.setStackTrace(UNASSIGNED_STACK);
-        return this;
-    }
-
-    public byte[] getBody() {
-        return body;
-    }
+public enum PopReviveMessageType {
+    CK,
+    ACK
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
index 2653de0f5..1985c22d6 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java
@@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSON;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.TopicConfig;
@@ -58,6 +59,10 @@ public class AckMessageProcessor implements NettyRequestProcessor {
         }
     }
 
+    public PopReviveService[] getPopReviveServices() {
+        return popReviveServices;
+    }
+
     public void startPopReviveService() {
         for (PopReviveService popReviveService : popReviveServices) {
             popReviveService.start();
@@ -159,7 +164,7 @@ public class AckMessageProcessor implements NettyRequestProcessor {
             }
             try {
                 oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
-                        requestHeader.getTopic(), requestHeader.getQueueId());
+                    requestHeader.getTopic(), requestHeader.getQueueId());
                 if (requestHeader.getOffset() < oldOffset) {
                     return response;
                 }
@@ -216,6 +221,7 @@ public class AckMessageProcessor implements NettyRequestProcessor {
             && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
             POP_LOGGER.error("put ack msg error:" + putMessageResult);
         }
+        PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus());
         decInFlightMessageNum(requestHeader);
         return response;
     }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 01fb084dd..a3279fd31 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -428,6 +428,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         topicConfig.setTopicFilterType(requestHeader.getTopicFilterTypeEnum());
         topicConfig.setPerm(requestHeader.getPerm());
         topicConfig.setTopicSysFlag(requestHeader.getTopicSysFlag() == null ? 0 : requestHeader.getTopicSysFlag());
+        topicConfig.setOrder(requestHeader.getOrder());
         String attributesModification = requestHeader.getAttributes();
         topicConfig.setAttributes(AttributeParser.parseToMap(attributesModification));
 
@@ -526,10 +527,6 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         this.brokerController.getConsumerOffsetManager().cleanOffsetByTopic(requestHeader.getTopic());
         this.brokerController.getPopInflightMessageCounter().clearInFlightMessageNumByTopicName(requestHeader.getTopic());
         this.brokerController.getMessageStore().deleteTopics(Sets.newHashSet(requestHeader.getTopic()));
-        if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
-            this.brokerController.getBrokerStatsManager().onTopicDeleted(requestHeader.getTopic());
-        }
-
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
         return response;
@@ -1429,10 +1426,11 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         return response;
     }
 
-    private RemotingCommand getAllProducerInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
+    private RemotingCommand getAllProducerInfo(ChannelHandlerContext ctx,
+        RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
         final GetAllProducerInfoRequestHeader requestHeader =
-                (GetAllProducerInfoRequestHeader) request.decodeCommandCustomHeader(GetAllProducerInfoRequestHeader.class);
+            (GetAllProducerInfoRequestHeader) request.decodeCommandCustomHeader(GetAllProducerInfoRequestHeader.class);
 
         ProducerTableInfo producerTable = this.brokerController.getProducerManager().getProducerTable();
         if (producerTable != null) {
@@ -1446,6 +1444,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         response.setCode(ResponseCode.SYSTEM_ERROR);
         return response;
     }
+
     private RemotingCommand getProducerConnectionList(ChannelHandlerContext ctx,
         RemotingCommand request) throws RemotingCommandException {
         final RemotingCommand response = RemotingCommand.createResponseCommand(null);
@@ -1695,13 +1694,13 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
     /**
      * Reset consumer offset.
      *
-     * @param topic         Required, not null.
-     * @param group         Required, not null.
-     * @param queueId       if target queue ID is negative, all message queues will be reset;
-     *                      otherwise, only the target queue would get reset.
-     * @param timestamp     if timestamp is negative, offset would be reset to broker offset at the time being;
-     *                      otherwise, binary search is performed to locate target offset.
-     * @param offset        Target offset to reset to if target queue ID is properly provided.
+     * @param topic     Required, not null.
+     * @param group     Required, not null.
+     * @param queueId   if target queue ID is negative, all message queues will be reset;
+     *                  otherwise, only the target queue would get reset.
+     * @param timestamp if timestamp is negative, offset would be reset to broker offset at the time being;
+     *                  otherwise, binary search is performed to locate target offset.
+     * @param offset    Target offset to reset to if target queue ID is properly provided.
      * @return Affected queues and their new offset
      */
     private RemotingCommand resetOffsetInner(String topic, String group, int queueId, long timestamp, Long offset) {
@@ -2263,8 +2262,8 @@ public class AdminBrokerProcessor implements NettyRequestProcessor {
         runtimeInfo.put("startAcceptSendRequestTimeStamp", String.valueOf(this.brokerController.getBrokerConfig().getStartAcceptSendRequestTimeStamp()));
 
         if (this.brokerController.getMessageStoreConfig().isTimerWheelEnable()) {
-            runtimeInfo.put("timerReadBehind", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getReadBehind()));
-            runtimeInfo.put("timerOffsetBehind", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getOffsetBehind()));
+            runtimeInfo.put("timerReadBehind", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getDequeueBehind()));
+            runtimeInfo.put("timerOffsetBehind", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getEnqueueBehindMessages()));
             runtimeInfo.put("timerCongestNum", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getAllCongestNum()));
             runtimeInfo.put("timerEnqueueTps", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getEnqueueTps()));
             runtimeInfo.put("timerDequeueTps", String.valueOf(this.brokerController.getMessageStore().getTimerMessageStore().getDequeueTps()));
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
index 91e176f8c..f4a472028 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java
@@ -20,6 +20,7 @@ import com.alibaba.fastjson.JSON;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.TopicConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -127,7 +128,8 @@ public class ChangeInvisibleTimeProcessor implements NettyRequestProcessor {
         return response;
     }
 
-    protected RemotingCommand processChangeInvisibleTimeForOrder(ChangeInvisibleTimeRequestHeader requestHeader, String[] extraInfo, RemotingCommand response, ChangeInvisibleTimeResponseHeader responseHeader) {
+    protected RemotingCommand processChangeInvisibleTimeForOrder(ChangeInvisibleTimeRequestHeader requestHeader,
+        String[] extraInfo, RemotingCommand response, ChangeInvisibleTimeResponseHeader responseHeader) {
         long popTime = ExtraInfoUtil.getPopTime(extraInfo);
         long oldOffset = this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(),
             requestHeader.getTopic(), requestHeader.getQueueId());
@@ -194,6 +196,7 @@ public class ChangeInvisibleTimeProcessor implements NettyRequestProcessor {
             && putMessageResult.getPutMessageStatus() != PutMessageStatus.SLAVE_NOT_AVAILABLE) {
             POP_LOGGER.error("change Invisible, put ack msg fail: {}, {}", ackMsg, putMessageResult);
         }
+        PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus());
     }
 
     private PutMessageResult appendCheckPoint(final ChangeInvisibleTimeRequestHeader requestHeader, int reviveQid,
@@ -229,9 +232,12 @@ public class ChangeInvisibleTimeProcessor implements NettyRequestProcessor {
                 ck.getReviveTime(), putMessageResult);
         }
 
-        if (putMessageResult != null && putMessageResult.isOk()) {
-            this.brokerController.getBrokerStatsManager().incBrokerCkNums(1);
-            this.brokerController.getBrokerStatsManager().incGroupCkNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), 1);
+        if (putMessageResult != null) {
+            PopMetricsManager.incPopReviveCkPutCount(ck, putMessageResult.getPutMessageStatus());
+            if (putMessageResult.isOk()) {
+                this.brokerController.getBrokerStatsManager().incBrokerCkNums(1);
+                this.brokerController.getBrokerStatsManager().incGroupCkNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(), 1);
+            }
         }
 
         return putMessageResult;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
index e10d72328..3b306ca2d 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/NotificationProcessor.java
@@ -195,6 +195,7 @@ public class NotificationProcessor implements NettyRequestProcessor {
             response.setRemark(errorInfo);
             return response;
         }
+
         SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
         if (null == subscriptionGroupConfig) {
             response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
@@ -263,6 +264,9 @@ public class NotificationProcessor implements NettyRequestProcessor {
     }
 
     private boolean hasMsgFromQueue(boolean isRetry, NotificationRequestHeader requestHeader, int queueId) {
+        if (this.brokerController.getConsumerOrderInfoManager().checkBlock(requestHeader.getTopic(), requestHeader.getConsumerGroup(), queueId, 0)) {
+            return false;
+        }
         String topic = isRetry ? KeyBuilder.buildPopRetryTopic(requestHeader.getTopic(), requestHeader.getConsumerGroup()) : requestHeader.getTopic();
         long offset = getPopOffset(topic, requestHeader.getConsumerGroup(), queueId);
         long restNum = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId) - offset;
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 4167438e9..e933f5347 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -24,16 +24,17 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.common.KeyBuilder;
 import org.apache.rocketmq.common.PopAckConstants;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.common.utils.DataConverter;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
 import org.apache.rocketmq.store.PutMessageResult;
 import org.apache.rocketmq.store.PutMessageStatus;
 import org.apache.rocketmq.store.config.BrokerRole;
@@ -104,7 +105,7 @@ public class PopBufferMergeService extends ServiceThread {
 
                 this.waitForRunning(interval);
 
-                if (!this.serving && this.buffer.size() == 0 && totalSize() == 0) {
+                if (!this.serving && this.buffer.size() == 0 && getOffsetTotalSize() == 0) {
                     this.serving = true;
                 }
             } catch (Throwable e) {
@@ -121,7 +122,7 @@ public class PopBufferMergeService extends ServiceThread {
         if (!isShouldRunning()) {
             return;
         }
-        while (this.buffer.size() > 0 || totalSize() > 0) {
+        while (this.buffer.size() > 0 || getOffsetTotalSize() > 0) {
             scan();
         }
     }
@@ -304,6 +305,7 @@ public class PopBufferMergeService extends ServiceThread {
                     eclipse, count, countCk, counter.get(), offsetBufferSize);
             }
         }
+        PopMetricsManager.recordPopBufferScanTimeConsume(eclipse);
         scanTimes++;
 
         if (scanTimes >= countOfMinute1) {
@@ -312,7 +314,7 @@ public class PopBufferMergeService extends ServiceThread {
         }
     }
 
-    private int totalSize() {
+    public int getOffsetTotalSize() {
         int count = 0;
         Iterator<Map.Entry<String, QueueWithTime<PopCheckPointWrapper>>> iterator = this.commitOffsets.entrySet().iterator();
         while (iterator.hasNext()) {
@@ -323,6 +325,10 @@ public class PopBufferMergeService extends ServiceThread {
         return count;
     }
 
+    public int getBufferedCKSize() {
+        return this.counter.get();
+    }
+
     private void markBitCAS(AtomicInteger setBits, int index) {
         while (true) {
             int bits = setBits.get();
@@ -540,6 +546,7 @@ public class PopBufferMergeService extends ServiceThread {
         }
         MessageExtBrokerInner msgInner = popMessageProcessor.buildCkMsg(pointWrapper.getCk(), pointWrapper.getReviveQueueId());
         PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+        PopMetricsManager.incPopReviveCkPutCount(pointWrapper.getCk(), putMessageResult.getPutMessageStatus());
         if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
             && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT
             && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT
@@ -584,6 +591,7 @@ public class PopBufferMergeService extends ServiceThread {
 
         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
         PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+        PopMetricsManager.incPopReviveAckPutCount(ackMsg, putMessageResult.getPutMessageStatus());
         if (putMessageResult.getPutMessageStatus() != PutMessageStatus.PUT_OK
             && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_DISK_TIMEOUT
             && putMessageResult.getPutMessageStatus() != PutMessageStatus.FLUSH_SLAVE_TIMEOUT
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
index d0e9dbc36..fe654fe64 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopReviveService.java
@@ -30,6 +30,7 @@ import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
+import org.apache.rocketmq.broker.metrics.PopMetricsManager;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
 import org.apache.rocketmq.common.KeyBuilder;
@@ -66,6 +67,7 @@ public class PopReviveService extends ServiceThread {
     private int queueId;
     private BrokerController brokerController;
     private String reviveTopic;
+    private long currentReviveMessageTimestamp = -1;
     private volatile boolean shouldRunPopRevive = false;
 
     private final NavigableMap<PopCheckPoint/* oldCK */, Pair<Long/* timestamp */, Boolean/* result */>> inflightReviveRequestMap = Collections.synchronizedNavigableMap(new TreeMap<>());
@@ -86,6 +88,10 @@ public class PopReviveService extends ServiceThread {
         return "PopReviveService_" + this.queueId;
     }
 
+    public int getQueueId() {
+        return queueId;
+    }
+
     public void setShouldRunPopRevive(final boolean shouldRunPopRevive) {
         this.shouldRunPopRevive = shouldRunPopRevive;
     }
@@ -120,6 +126,7 @@ public class PopReviveService extends ServiceThread {
         msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
         addRetryTopicIfNoExit(msgInner.getTopic(), popCheckPoint.getCId());
         PutMessageResult putMessageResult = brokerController.getEscapeBridge().putMessageToSpecificQueue(msgInner);
+        PopMetricsManager.incPopReviveRetryMessageCount(popCheckPoint, putMessageResult.getPutMessageStatus());
         if (brokerController.getBrokerConfig().isEnablePopLog()) {
             POP_LOGGER.info("reviveQueueId={},retry msg , ck={}, msg queueId {}, offset {}, reviveDelay={}, result is {} ",
                 queueId, popCheckPoint, messageExt.getQueueId(), messageExt.getQueueOffset(),
@@ -131,7 +138,7 @@ public class PopReviveService extends ServiceThread {
             return false;
         }
         this.brokerController.getPopInflightMessageCounter().decrementInFlightMessageNum(popCheckPoint);
-        this.brokerController.getBrokerStatsManager().incBrokerPutNums(1);
+        this.brokerController.getBrokerStatsManager().incBrokerPutNums(popCheckPoint.getTopic(), 1);
         this.brokerController.getBrokerStatsManager().incTopicPutNums(msgInner.getTopic());
         this.brokerController.getBrokerStatsManager().incTopicPutSize(msgInner.getTopic(), putMessageResult.getAppendMessageResult().getWroteBytes());
         if (brokerController.getPopMessageProcessor() != null) {
@@ -197,11 +204,13 @@ public class PopReviveService extends ServiceThread {
             || pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL && offset == pullResult.getMaxOffset();
     }
 
-    private CompletableFuture<Pair<GetMessageStatus, MessageExt>> getBizMessage(String topic, long offset, int queueId, String brokerName) {
+    private CompletableFuture<Pair<GetMessageStatus, MessageExt>> getBizMessage(String topic, long offset, int queueId,
+        String brokerName) {
         return this.brokerController.getEscapeBridge().getMessageAsync(topic, offset, queueId, brokerName, false);
     }
 
-    public PullResult getMessage(String group, String topic, int queueId, long offset, int nums, boolean deCompressBody) {
+    public PullResult getMessage(String group, String topic, int queueId, long offset, int nums,
+        boolean deCompressBody) {
         GetMessageResult getMessageResult = this.brokerController.getMessageStore().getMessage(group, topic, queueId, offset, nums, null);
 
         if (getMessageResult != null) {
@@ -315,7 +324,7 @@ public class PopReviveService extends ServiceThread {
             List<MessageExt> messageExts = getReviveMessage(offset, queueId);
             if (messageExts == null || messageExts.isEmpty()) {
                 long old = endTime;
-                long timerDelay = brokerController.getMessageStore().getTimerMessageStore().getReadBehind();
+                long timerDelay = brokerController.getMessageStore().getTimerMessageStore().getDequeueBehind();
                 long commitLogDelay = brokerController.getMessageStore().getTimerMessageStore().getEnqueueBehind();
                 // move endTime
                 if (endTime != 0 && System.currentTimeMillis() - endTime > 3 * PopAckConstants.SECOND && timerDelay <= 0 && commitLogDelay <= 0) {
@@ -355,6 +364,7 @@ public class PopReviveService extends ServiceThread {
                         continue;
                     }
                     map.put(point.getTopic() + point.getCId() + point.getQueueId() + point.getStartOffset() + point.getPopTime(), point);
+                    PopMetricsManager.incPopReviveCkGetCount(point, queueId);
                     point.setReviveOffset(messageExt.getQueueOffset());
                     if (firstRt == 0) {
                         firstRt = point.getReviveTime();
@@ -365,6 +375,7 @@ public class PopReviveService extends ServiceThread {
                         POP_LOGGER.info("reviveQueueId={},find ack, offset:{}, raw : {}", messageExt.getQueueId(), messageExt.getQueueOffset(), raw);
                     }
                     AckMsg ackMsg = JSON.parseObject(raw, AckMsg.class);
+                    PopMetricsManager.incPopReviveAckGetCount(ackMsg, queueId);
                     String mergeKey = ackMsg.getTopic() + ackMsg.getConsumerGroup() + ackMsg.getQueueId() + ackMsg.getStartOffset() + ackMsg.getPopTime();
                     PopCheckPoint point = map.get(mergeKey);
                     if (point == null) {
@@ -555,6 +566,25 @@ public class PopReviveService extends ServiceThread {
         brokerController.getMessageStore().putMessage(ckMsg);
     }
 
+    public long getReviveBehindMillis() {
+        if (currentReviveMessageTimestamp <= 0) {
+            return 0;
+        }
+        long maxOffset = brokerController.getMessageStore().getMaxOffsetInQueue(reviveTopic, queueId);
+        if (maxOffset - reviveOffset > 1) {
+            return Math.max(0, System.currentTimeMillis() - currentReviveMessageTimestamp);
+        }
+        return 0;
+    }
+
+    public long getReviveBehindMessages() {
+        if (currentReviveMessageTimestamp <= 0) {
+            return 0;
+        }
+        long diff = brokerController.getMessageStore().getMaxOffsetInQueue(reviveTopic, queueId) - reviveOffset;
+        return Math.max(0, diff);
+    }
+
     @Override
     public void run() {
         int slow = 1;
@@ -586,7 +616,10 @@ public class PopReviveService extends ServiceThread {
                 long delay = 0;
                 if (sortList != null && !sortList.isEmpty()) {
                     delay = (System.currentTimeMillis() - sortList.get(0).getReviveTime()) / 1000;
+                    currentReviveMessageTimestamp = sortList.get(0).getReviveTime();
                     slow = 1;
+                } else {
+                    currentReviveMessageTimestamp = System.currentTimeMillis();
                 }
 
                 POP_LOGGER.info("reviveQueueId={},revive finish,old offset is {}, new offset is {}, ckDelay={}  ",
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
index dbc87a870..b2db356c8 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/ReplyMessageProcessor.java
@@ -295,7 +295,7 @@ public class ReplyMessageProcessor extends AbstractSendMessageProcessor {
             this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
             this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
                 putMessageResult.getAppendMessageResult().getWroteBytes());
-            this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
+            this.brokerController.getBrokerStatsManager().incBrokerPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum());
 
             if (!BrokerMetricsManager.isRetryOrDlqTopic(msg.getTopic())) {
                 Attributes attributes = BrokerMetricsManager.newAttributesBuilder()
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 45517e1bb..6faa7525b 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -429,7 +429,7 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor implement
             this.brokerController.getBrokerStatsManager().incTopicPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum(), 1);
             this.brokerController.getBrokerStatsManager().incTopicPutSize(msg.getTopic(),
                 putMessageResult.getAppendMessageResult().getWroteBytes());
-            this.brokerController.getBrokerStatsManager().incBrokerPutNums(putMessageResult.getAppendMessageResult().getMsgNum());
+            this.brokerController.getBrokerStatsManager().incBrokerPutNums(msg.getTopic(), putMessageResult.getAppendMessageResult().getMsgNum());
             this.brokerController.getBrokerStatsManager().incTopicPutLatency(msg.getTopic(), queueIdInt,
                 (int) (this.brokerController.getMessageStore().now() - beginTimeMillis));
 
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
index 372fb83ea..26e757ab4 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/schedule/ScheduleMessageService.java
@@ -758,7 +758,7 @@ public class ScheduleMessageService extends ConfigManager {
 
                 ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutNums(this.topic, result.getAppendMessageResult().getMsgNum(), 1);
                 ScheduleMessageService.this.brokerController.getBrokerStatsManager().incTopicPutSize(this.topic, result.getAppendMessageResult().getWroteBytes());
-                ScheduleMessageService.this.brokerController.getBrokerStatsManager().incBrokerPutNums(result.getAppendMessageResult().getMsgNum());
+                ScheduleMessageService.this.brokerController.getBrokerStatsManager().incBrokerPutNums(this.topic, result.getAppendMessageResult().getMsgNum());
 
                 attributes = BrokerMetricsManager.newAttributesBuilder()
                     .put(LABEL_TOPIC, topic)
diff --git a/broker/src/main/resources/rmq.broker.logback.xml b/broker/src/main/resources/rmq.broker.logback.xml
index d05adb011..94418ac9f 100644
--- a/broker/src/main/resources/rmq.broker.logback.xml
+++ b/broker/src/main/resources/rmq.broker.logback.xml
@@ -163,6 +163,36 @@
         </sift>
     </appender>
 
+    <appender name="RocketmqTieredStoreSiftingAppender" class="ch.qos.logback.classic.sift.SiftingAppender">
+        <discriminator>
+            <key>brokerContainerLogDir</key>
+            <defaultValue>${file.separator}</defaultValue>
+        </discriminator>
+        <sift>
+            <appender name="RocketmqTieredStoreAppender"
+                      class="ch.qos.logback.core.rolling.RollingFileAppender">
+                <file>
+                    ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}tiered_store.log
+                </file>
+                <append>true</append>
+                <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+                    <fileNamePattern>
+                        ${user.home}${file.separator}logs${file.separator}rocketmqlogs${brokerLogDir:-${file.separator}}${brokerContainerLogDir}${file.separator}otherdays${file.separator}tiered_store.%i.log.gz
+                    </fileNamePattern>
+                    <minIndex>1</minIndex>
+                    <maxIndex>10</maxIndex>
+                </rollingPolicy>
+                <triggeringPolicy class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+                    <maxFileSize>128MB</maxFileSize>
+                </triggeringPolicy>
+                <encoder>
+                    <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+                    <charset class="java.nio.charset.Charset">UTF-8</charset>
+                </encoder>
+            </appender>
+        </sift>
+    </appender>
+
     <appender name="RocketmqTrafficSiftingAppender" class="ch.qos.logback.classic.sift.SiftingAppender">
         <discriminator>
             <key>brokerContainerLogDir</key>
@@ -454,6 +484,10 @@
         <appender-ref ref="RocketmqStoreErrorSiftingAppender"/>
     </logger>
 
+    <logger name="RocketmqTieredStore" additivity="false" level="INFO">
+        <appender-ref ref="RocketmqTieredStoreSiftingAppender"/>
+    </logger>
+
     <logger name="RocketmqTransaction" additivity="false" level="INFO">
         <appender-ref ref="RocketmqTransactionSiftingAppender"/>
     </logger>
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
index ab8ee1496..2541e755e 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/BrokerOuterAPITest.java
@@ -60,6 +60,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyLong;
@@ -110,7 +111,7 @@ public class BrokerOuterAPITest {
         when(nettyRemotingClient.invokeSync(anyString(), any(RemotingCommand.class), anyLong())).thenReturn(response);
         List<Boolean> booleanList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, brokerId, topicConfigSerializeWrapper, timeOut, false);
         assertTrue(booleanList.size() > 0);
-        assertEquals(false, booleanList.contains(Boolean.FALSE));
+        assertFalse(booleanList.contains(Boolean.FALSE));
     }
 
     @Test
@@ -145,7 +146,7 @@ public class BrokerOuterAPITest {
                 }
             });
 
-        assertEquals(true, success);
+        assertTrue(success);
 
     }
 
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java
new file mode 100644
index 000000000..2148e222c
--- /dev/null
+++ b/broker/src/test/java/org/apache/rocketmq/broker/controller/ReplicasManagerRegisterTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.broker.controller;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.broker.out.BrokerOuterAPI;
+import org.apache.rocketmq.broker.slave.SlaveSynchronize;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.ApplyBrokerIdResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdResponseHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterSuccessResponseHeader;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.ha.autoswitch.AutoSwitchHAService;
+import org.apache.rocketmq.store.ha.autoswitch.BrokerMetadata;
+import org.apache.rocketmq.store.ha.autoswitch.TempBrokerMetadata;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.powermock.api.mockito.PowerMockito;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.UUID;
+
+import static org.awaitility.Awaitility.await;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.when;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(ReplicasManager.class)
+public class ReplicasManagerRegisterTest {
+
+    public static final String STORE_BASE_PATH = System.getProperty("user.home") + File.separator + "BrokerControllerRegisterTest" + File.separator +
+            UUID.randomUUID().toString().replace("-", "");
+
+    public static final String BROKER_NAME = "default-broker";
+
+    public static final String CLUSTER_NAME = "default-cluster";
+
+    public static final String NAME_SRV_ADDR = "127.0.0.1:9999";
+
+    public static final String CONTROLLER_ADDR = "127.0.0.1:8888";
+
+    public static final BrokerConfig BROKER_CONFIG;
+
+    static {
+        BROKER_CONFIG = new BrokerConfig();
+        BROKER_CONFIG.setListenPort(21030);
+        BROKER_CONFIG.setNamesrvAddr(NAME_SRV_ADDR);
+        BROKER_CONFIG.setControllerAddr(CONTROLLER_ADDR);
+        BROKER_CONFIG.setSyncControllerMetadataPeriod(2 * 1000);
+        BROKER_CONFIG.setEnableControllerMode(true);
+        BROKER_CONFIG.setBrokerName(BROKER_NAME);
+        BROKER_CONFIG.setBrokerClusterName(CLUSTER_NAME);
+    }
+
+    private MessageStoreConfig buildMessageStoreConfig(int id) {
+        MessageStoreConfig config = new MessageStoreConfig();
+        config.setStorePathRootDir(STORE_BASE_PATH + File.separator + id);
+        config.setStorePathCommitLog(config.getStorePathRootDir() + File.separator + "commitLog");
+        config.setStorePathEpochFile(config.getStorePathRootDir() + File.separator + "epochFileCache");
+        config.setStorePathMetadata(config.getStorePathRootDir() + File.separator + "metadata");
+        config.setStorePathTempMetadata(config.getStorePathRootDir() + File.separator + "tempMetadata");
+        return config;
+    }
+
+    @Mock
+    private BrokerController mockedBrokerController;
+
+    @Mock
+    private DefaultMessageStore mockedMessageStore;
+
+    @Mock
+    private BrokerOuterAPI mockedBrokerOuterAPI;
+
+    @Mock
+    private AutoSwitchHAService mockedAutoSwitchHAService;
+
+    @Before
+    public void setUp() throws Exception {
+        when(mockedBrokerController.getBrokerOuterAPI()).thenReturn(mockedBrokerOuterAPI);
+        when(mockedBrokerController.getMessageStore()).thenReturn(mockedMessageStore);
+        when(mockedBrokerController.getBrokerConfig()).thenReturn(BROKER_CONFIG);
+        when(mockedMessageStore.getHaService()).thenReturn(mockedAutoSwitchHAService);
+        when(mockedBrokerController.getSlaveSynchronize()).thenReturn(new SlaveSynchronize(mockedBrokerController));
+
+        when(mockedBrokerOuterAPI.getControllerMetaData(any())).thenReturn(
+                new GetMetaDataResponseHeader("default-group", "dledger-a", CONTROLLER_ADDR, true, CONTROLLER_ADDR));
+        when(mockedBrokerOuterAPI.checkAddressReachable(any())).thenReturn(true);
+        when(mockedBrokerController.getMessageStoreConfig()).thenReturn(buildMessageStoreConfig(0));
+    }
+
+    @Test
+    public void testBrokerRegisterSuccess() throws Exception {
+        when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L));
+        when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader());
+        when(mockedBrokerOuterAPI.registerSuccess(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterSuccessResponseHeader());
+        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1));
+
+        ReplicasManager replicasManager0 = new ReplicasManager(mockedBrokerController);
+        replicasManager0.start();
+        await().atMost(Duration.ofMillis(1000)).until(() ->
+            replicasManager0.getState() == ReplicasManager.State.RUNNING
+        );
+        Assert.assertEquals(ReplicasManager.RegisterState.REGISTERED, replicasManager0.getRegisterState());
+        Assert.assertEquals(1L, replicasManager0.getBrokerId().longValue());
+        checkMetadataFile(replicasManager0.getBrokerMetadata(), 1L);
+        Assert.assertFalse(replicasManager0.getTempBrokerMetadata().isLoaded());
+        Assert.assertFalse(replicasManager0.getTempBrokerMetadata().fileExists());
+    }
+
+    @Test
+    public void testRegisterFailedAtGetNextBrokerId() throws Exception {
+        ReplicasManager replicasManager = new ReplicasManager(mockedBrokerController);
+        when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenThrow(new RuntimeException());
+
+        replicasManager.start();
+
+        Assert.assertEquals(ReplicasManager.State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE, replicasManager.getState());
+        Assert.assertEquals(ReplicasManager.RegisterState.INITIAL, replicasManager.getRegisterState());
+        Assert.assertFalse(replicasManager.getTempBrokerMetadata().fileExists());
+        Assert.assertFalse(replicasManager.getBrokerMetadata().fileExists());
+        Assert.assertNull(replicasManager.getBrokerId());
+    }
+
+    @Test
+    public void testRegisterFailedAtCreateTempFile() throws Exception {
+        ReplicasManager replicasManager = new ReplicasManager(mockedBrokerController);
+        when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L));
+        when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader());
+        when(mockedBrokerOuterAPI.registerSuccess(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterSuccessResponseHeader());
+        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1));
+        ReplicasManager spyReplicasManager = PowerMockito.spy(replicasManager);
+        PowerMockito.doReturn(false).when(spyReplicasManager, "createTempMetadataFile", anyLong());
+
+        spyReplicasManager.start();
+
+        Assert.assertEquals(ReplicasManager.State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE, spyReplicasManager.getState());
+        Assert.assertEquals(ReplicasManager.RegisterState.INITIAL, spyReplicasManager.getRegisterState());
+        Assert.assertFalse(spyReplicasManager.getTempBrokerMetadata().fileExists());
+        Assert.assertFalse(spyReplicasManager.getBrokerMetadata().fileExists());
+        Assert.assertNull(spyReplicasManager.getBrokerId());
+    }
+
+    @Test
+    public void testRegisterFailedAtApplyBrokerIdFailed() throws Exception {
+        ReplicasManager replicasManager = new ReplicasManager(mockedBrokerController);
+        when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L));
+        when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenThrow(new RuntimeException());
+        when(mockedBrokerOuterAPI.registerSuccess(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterSuccessResponseHeader());
+        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1));
+
+        replicasManager.start();
+
+        Assert.assertEquals(ReplicasManager.State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE, replicasManager.getState());
+        Assert.assertNotEquals(ReplicasManager.RegisterState.CREATE_METADATA_FILE_DONE, replicasManager.getRegisterState());
+        Assert.assertNotEquals(ReplicasManager.RegisterState.REGISTERED, replicasManager.getRegisterState());
+
+        replicasManager.shutdown();
+
+        Assert.assertFalse(replicasManager.getBrokerMetadata().fileExists());
+        Assert.assertNull(replicasManager.getBrokerId());
+    }
+
+    @Test
+    public void testRegisterFailedAtCreateMetadataFileAndDeleteTemp() throws Exception {
+        ReplicasManager replicasManager = new ReplicasManager(mockedBrokerController);
+        when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L));
+        when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader());
+        when(mockedBrokerOuterAPI.registerSuccess(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterSuccessResponseHeader());
+        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1));
+
+        ReplicasManager spyReplicasManager = PowerMockito.spy(replicasManager);
+        PowerMockito.doReturn(false).when(spyReplicasManager, "createMetadataFileAndDeleteTemp");
+
+        spyReplicasManager.start();
+
+        Assert.assertEquals(ReplicasManager.State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE, spyReplicasManager.getState());
+        Assert.assertEquals(ReplicasManager.RegisterState.CREATE_TEMP_METADATA_FILE_DONE, spyReplicasManager.getRegisterState());
+        TempBrokerMetadata tempBrokerMetadata = spyReplicasManager.getTempBrokerMetadata();
+        Assert.assertTrue(tempBrokerMetadata.fileExists());
+        Assert.assertTrue(tempBrokerMetadata.isLoaded());
+        Assert.assertFalse(spyReplicasManager.getBrokerMetadata().fileExists());
+        Assert.assertNull(spyReplicasManager.getBrokerId());
+
+        spyReplicasManager.shutdown();
+
+        // restart, we expect that this replicasManager still keep the tempMetadata and still try to finish its registering
+        ReplicasManager replicasManagerNew = new ReplicasManager(mockedBrokerController);
+        // because apply brokerId: 1 has succeeded, so now next broker id is 2
+        when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 2L));
+
+        replicasManagerNew.start();
+
+        Assert.assertEquals(ReplicasManager.State.RUNNING, replicasManagerNew.getState());
+        Assert.assertEquals(ReplicasManager.RegisterState.REGISTERED, replicasManagerNew.getRegisterState());
+        // tempMetadata has been cleared
+        Assert.assertFalse(replicasManagerNew.getTempBrokerMetadata().fileExists());
+        Assert.assertFalse(replicasManagerNew.getTempBrokerMetadata().isLoaded());
+        // metadata has been persisted
+        Assert.assertTrue(replicasManagerNew.getBrokerMetadata().fileExists());
+        Assert.assertTrue(replicasManagerNew.getBrokerMetadata().isLoaded());
+        Assert.assertEquals(1L, replicasManagerNew.getBrokerMetadata().getBrokerId().longValue());
+        Assert.assertEquals(1L, replicasManagerNew.getBrokerId().longValue());
+
+    }
+
+    @Test
+    public void testRegisterFailedAtRegisterSuccess() throws Exception {
+        ReplicasManager replicasManager = new ReplicasManager(mockedBrokerController);
+        when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 1L));
+        when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), anyLong(), any(), any())).thenReturn(new ApplyBrokerIdResponseHeader());
+        when(mockedBrokerOuterAPI.registerSuccess(any(), any(), anyLong(), any(), any())).thenThrow(new RuntimeException());
+        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1));
+
+        replicasManager.start();
+
+        Assert.assertEquals(ReplicasManager.State.FIRST_TIME_SYNC_CONTROLLER_METADATA_DONE, replicasManager.getState());
+        Assert.assertEquals(ReplicasManager.RegisterState.CREATE_METADATA_FILE_DONE, replicasManager.getRegisterState());
+        TempBrokerMetadata tempBrokerMetadata = replicasManager.getTempBrokerMetadata();
+        // temp metadata has been cleared
+        Assert.assertFalse(tempBrokerMetadata.fileExists());
+        Assert.assertFalse(tempBrokerMetadata.isLoaded());
+        // metadata has been persisted
+        Assert.assertTrue(replicasManager.getBrokerMetadata().fileExists());
+        Assert.assertTrue(replicasManager.getBrokerMetadata().isLoaded());
+        Assert.assertEquals(1L, replicasManager.getBrokerMetadata().getBrokerId().longValue());
+        Assert.assertEquals(1L, replicasManager.getBrokerId().longValue());
+
+        replicasManager.shutdown();
+
+        Mockito.reset(mockedBrokerOuterAPI);
+        when(mockedBrokerOuterAPI.brokerElect(any(), any(), any(), anyLong())).thenReturn(new ElectMasterResponseHeader(1L, "127.0.0.1:13131", 1, 1));
+        when(mockedBrokerOuterAPI.getControllerMetaData(any())).thenReturn(
+                new GetMetaDataResponseHeader("default-group", "dledger-a", CONTROLLER_ADDR, true, CONTROLLER_ADDR));
+        when(mockedBrokerOuterAPI.checkAddressReachable(any())).thenReturn(true);
+
+        // restart, we expect that this replicasManager still keep the metadata and still try to finish its registering
+        ReplicasManager replicasManagerNew = new ReplicasManager(mockedBrokerController);
+        // because apply brokerId: 1 has succeeded, so now next broker id is 2
+        when(mockedBrokerOuterAPI.getNextBrokerId(any(), any(), any())).thenReturn(new GetNextBrokerIdResponseHeader(CLUSTER_NAME, BROKER_NAME, 2L));
+        // because apply brokerId: 1 has succeeded, so next request which try to apply brokerId: 1 will be failed
+        when(mockedBrokerOuterAPI.applyBrokerId(any(), any(), eq(1L), any(), any())).thenThrow(new RuntimeException());
+        when(mockedBrokerOuterAPI.registerSuccess(any(), any(), anyLong(), any(), any())).thenReturn(new RegisterSuccessResponseHeader());
+        replicasManagerNew.start();
+
+        Assert.assertEquals(ReplicasManager.State.RUNNING, replicasManagerNew.getState());
+        Assert.assertEquals(ReplicasManager.RegisterState.REGISTERED, replicasManagerNew.getRegisterState());
+        // tempMetadata has been cleared
+        Assert.assertFalse(replicasManagerNew.getTempBrokerMetadata().fileExists());
+        Assert.assertFalse(replicasManagerNew.getTempBrokerMetadata().isLoaded());
+        // metadata has been persisted
+        Assert.assertTrue(replicasManagerNew.getBrokerMetadata().fileExists());
+        Assert.assertTrue(replicasManagerNew.getBrokerMetadata().isLoaded());
+        Assert.assertEquals(1L, replicasManagerNew.getBrokerMetadata().getBrokerId().longValue());
+        Assert.assertEquals(1L, replicasManagerNew.getBrokerId().longValue());
+    }
+
+
+    private void checkMetadataFile(BrokerMetadata brokerMetadata0 ,Long brokerId) throws Exception {
+        Assert.assertEquals(brokerId, brokerMetadata0.getBrokerId());
+        Assert.assertTrue(brokerMetadata0.fileExists());
+        BrokerMetadata brokerMetadata = new BrokerMetadata(brokerMetadata0.getFilePath());
+        brokerMetadata.readFromFile();
+        Assert.assertEquals(brokerMetadata0, brokerMetadata);
+    }
+
+    @After
+    public void clear() {
+        File file = new File(STORE_BASE_PATH);
+        UtilAll.deleteFile(file);
+    }
+
+
+}
diff --git a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
index 79fe6d587..89ffed7e3 100644
--- a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
+++ b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopReviveServiceTest.java
@@ -90,7 +90,7 @@ public class PopReviveServiceTest {
         when(brokerController.getTopicConfigManager()).thenReturn(topicConfigManager);
         when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
         when(messageStore.getTimerMessageStore()).thenReturn(timerMessageStore);
-        when(timerMessageStore.getReadBehind()).thenReturn(0L);
+        when(timerMessageStore.getDequeueBehind()).thenReturn(0L);
         when(timerMessageStore.getEnqueueBehind()).thenReturn(0L);
 
         when(topicConfigManager.selectTopicConfig(anyString())).thenReturn(new TopicConfig());
@@ -106,7 +106,7 @@ public class PopReviveServiceTest {
         long maxReviveOffset = 4;
 
         when(consumerOffsetManager.queryOffset(PopAckConstants.REVIVE_GROUP, REVIVE_TOPIC, REVIVE_QUEUE_ID))
-                .thenReturn(0L);
+            .thenReturn(0L);
         List<MessageExt> reviveMessageExtList = new ArrayList<>();
         long basePopTime = System.currentTimeMillis();
         {
@@ -249,14 +249,15 @@ public class PopReviveServiceTest {
         return msgInner;
     }
 
-    public static MessageExtBrokerInner buildAckMsg(AckMsg ackMsg, long deliverMs, long reviveOffset, long deliverTime) {
+    public static MessageExtBrokerInner buildAckMsg(AckMsg ackMsg, long deliverMs, long reviveOffset,
+        long deliverTime) {
         MessageExtBrokerInner messageExtBrokerInner = buildAckInnerMessage(
-                REVIVE_TOPIC,
-                ackMsg,
-                REVIVE_QUEUE_ID,
-                STORE_HOST,
-                deliverMs,
-                PopMessageProcessor.genAckUniqueId(ackMsg)
+            REVIVE_TOPIC,
+            ackMsg,
+            REVIVE_QUEUE_ID,
+            STORE_HOST,
+            deliverMs,
+            PopMessageProcessor.genAckUniqueId(ackMsg)
         );
         messageExtBrokerInner.setQueueOffset(reviveOffset);
         messageExtBrokerInner.setDeliverTimeMs(deliverMs);
@@ -264,7 +265,8 @@ public class PopReviveServiceTest {
         return messageExtBrokerInner;
     }
 
-    public static MessageExtBrokerInner buildAckInnerMessage(String reviveTopic, AckMsg ackMsg, int reviveQid, SocketAddress host, long deliverMs, String ackUniqueId) {
+    public static MessageExtBrokerInner buildAckInnerMessage(String reviveTopic, AckMsg ackMsg, int reviveQid,
+        SocketAddress host, long deliverMs, String ackUniqueId) {
         MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
         msgInner.setTopic(reviveTopic);
         msgInner.setBody(JSON.toJSONString(ackMsg).getBytes(DataConverter.charset));
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index a5712008c..793778e03 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -67,6 +67,7 @@ import org.apache.rocketmq.common.sysflag.PullSysFlag;
 import org.apache.rocketmq.remoting.RPCHook;
 import org.apache.rocketmq.remoting.exception.RemotingException;
 import org.apache.rocketmq.remoting.protocol.NamespaceUtil;
+import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.body.ConsumerRunningInfo;
 import org.apache.rocketmq.remoting.protocol.filter.FilterAPI;
 import org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType;
@@ -111,9 +112,13 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
      */
     private long pullTimeDelayMillsWhenException = 1000;
     /**
-     * Flow control interval
+     * Flow control interval when message cache is full
      */
-    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
+    private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL = 50;
+    /**
+     * Flow control interval when broker return flow control
+     */
+    private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 20;
     /**
      * Delay some time when suspend pull service
      */
@@ -891,7 +896,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                 }
 
                 if ((long) consumeRequestCache.size() * defaultLitePullConsumer.getPullBatchSize() > defaultLitePullConsumer.getPullThresholdForAll()) {
-                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                     if ((consumeRequestFlowControlTimes++ % 1000) == 0) {
                         log.warn("The consume request count exceeds threshold {}, so do flow control, consume request count={}, flowControlTimes={}", consumeRequestCache.size(), consumeRequestFlowControlTimes);
                     }
@@ -902,7 +907,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                 long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
 
                 if (cachedMessageCount > defaultLitePullConsumer.getPullThresholdForQueue()) {
-                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                     if ((queueFlowControlTimes++ % 1000) == 0) {
                         log.warn(
                             "The cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
@@ -912,7 +917,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                 }
 
                 if (cachedMessageSizeInMiB > defaultLitePullConsumer.getPullThresholdSizeForQueue()) {
-                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                     if ((queueFlowControlTimes++ % 1000) == 0) {
                         log.warn(
                             "The cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, flowControlTimes={}",
@@ -922,7 +927,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                 }
 
                 if (processQueue.getMaxSpan() > defaultLitePullConsumer.getConsumeMaxSpan()) {
-                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL, TimeUnit.MILLISECONDS);
+                    scheduledThreadPoolExecutor.schedule(this, PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL, TimeUnit.MILLISECONDS);
                     if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
                         log.warn(
                             "The queue's messages, span too long, so do flow control, minOffset={}, maxOffset={}, maxSpan={}, flowControlTimes={}",
@@ -979,7 +984,11 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
                 } catch (InterruptedException interruptedException) {
                     log.warn("Polling thread was interrupted.", interruptedException);
                 } catch (Throwable e) {
-                    pullDelayTimeMills = pullTimeDelayMillsWhenException;
+                    if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
+                        pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL;
+                    } else {
+                        pullDelayTimeMills = pullTimeDelayMillsWhenException;
+                    }
                     log.error("An error occurred in pull message process.", e);
                 }
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 76a736ed6..d21029430 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -99,13 +99,13 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
      */
     private long pullTimeDelayMillsWhenException = 3000;
     /**
-     * Flow control interval when cache is full
+     * Flow control interval when message cache is full
      */
     private static final long PULL_TIME_DELAY_MILLS_WHEN_CACHE_FLOW_CONTROL = 50;
     /**
-     * Flow control interval when broker throw flow control exception
+     * Flow control interval when broker return flow control
      */
-    private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 50;
+    private static final long PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL = 20;
     /**
      * Delay some time when suspend pull service
      */
@@ -433,7 +433,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                     log.warn("execute the pull request exception", e);
                 }
 
-                DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
+                if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
+                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL);
+                } else {
+                    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
+                }
             }
         };
 
@@ -583,7 +587,11 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
                     log.warn("execute the pull request exception: {}", e);
                 }
 
-                DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
+                if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) {
+                    DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL);
+                } else {
+                    DefaultMQPushConsumerImpl.this.executePopPullRequestLater(popRequest, pullTimeDelayMillsWhenException);
+                }
             }
         };
 
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
index a2e1ac838..cb97c9f14 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/factory/MQClientInstance.java
@@ -435,7 +435,7 @@ public class MQClientInstance {
                     continue;
                 }
                 // may need to check one broker every cluster...
-                // assume that the configs of every broker in cluster are the the same.
+                // assume that the configs of every broker in cluster are the same.
                 String addr = findBrokerAddrByTopic(subscriptionData.getTopic());
 
                 if (addr != null) {
@@ -556,9 +556,9 @@ public class MQClientInstance {
                     }
                 } catch (Exception e) {
                     if (this.isBrokerInNameServer(addr)) {
-                        log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
+                        log.warn("send heart beat to broker[{} {} {}] failed", brokerName, id, addr, e);
                     } else {
-                        log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
+                        log.warn("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
                                 id, addr, e);
                     }
                 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/MixAll.java b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
index 3214bd838..8f358e73b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/MixAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/MixAll.java
@@ -43,6 +43,8 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Predicate;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.common.annotation.ImportantField;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.help.FAQUrl;
@@ -168,7 +170,7 @@ public class MixAll {
 
     public static long getPID() {
         String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
-        if (processName != null && processName.length() > 0) {
+        if (StringUtils.isNotEmpty(processName)) {
             try {
                 return Long.parseLong(processName.split("@")[0]);
             } catch (Exception e) {
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index 3c32e7c81..03d162450 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -40,6 +40,7 @@ import java.util.Enumeration;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.zip.CRC32;
 import java.util.zip.DeflaterOutputStream;
@@ -83,15 +84,18 @@ public class UtilAll {
     }
 
     public static void sleep(long sleepMs) {
-        if (sleepMs < 0) {
+        sleep(sleepMs, TimeUnit.MILLISECONDS);
+    }
+
+    public static void sleep(long timeOut, TimeUnit timeUnit) {
+        if (null == timeUnit) {
             return;
         }
         try {
-            Thread.sleep(sleepMs);
+            timeUnit.sleep(timeOut);
         } catch (Throwable ignored) {
 
         }
-
     }
 
     public static String currentStackTrace() {
diff --git a/container/pom.xml b/container/pom.xml
index 42ab6e4b8..c0ea9d33d 100644
--- a/container/pom.xml
+++ b/container/pom.xml
@@ -35,11 +35,5 @@
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-broker</artifactId>
         </dependency>
-
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-            <scope>test</scope>
-        </dependency>
     </dependencies>
 </project>
diff --git a/controller/pom.xml b/controller/pom.xml
index d89f03193..c2525dbbe 100644
--- a/controller/pom.xml
+++ b/controller/pom.xml
@@ -46,8 +46,8 @@
             <artifactId>slf4j-api</artifactId>
         </dependency>
         <dependency>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-classic</artifactId>
+            <groupId>io.github.aliyunmq</groupId>
+            <artifactId>rocketmq-shaded-slf4j-api-bridge</artifactId>
         </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
diff --git a/controller/src/main/resources/rmq.controller.logback.xml b/controller/src/main/resources/rmq.controller.logback.xml
index 0a6155b43..0fd2467b4 100644
--- a/controller/src/main/resources/rmq.controller.logback.xml
+++ b/controller/src/main/resources/rmq.controller.logback.xml
@@ -36,6 +36,30 @@
         </encoder>
     </appender>
 
+    <appender name="DLedgerAppender_inner"
+              class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <file>${user.home}/logs/rocketmqlogs/dledger.log</file>
+        <append>true</append>
+        <rollingPolicy class="ch.qos.logback.core.rolling.FixedWindowRollingPolicy">
+            <fileNamePattern>${user.home}/logs/rocketmqlogs/otherdays/dledger.%i.log.gz</fileNamePattern>
+            <minIndex>1</minIndex>
+            <maxIndex>5</maxIndex>
+        </rollingPolicy>
+        <triggeringPolicy
+            class="ch.qos.logback.core.rolling.SizeBasedTriggeringPolicy">
+            <maxFileSize>100MB</maxFileSize>
+        </triggeringPolicy>
+        <encoder>
+            <pattern>%d{yyy-MM-dd HH:mm:ss,GMT+8} %p %t - %m%n</pattern>
+            <charset class="java.nio.charset.Charset">UTF-8</charset>
+        </encoder>
+    </appender>
+
+    <appender name="DLedgerAppender" class="ch.qos.logback.classic.AsyncAppender">
+        <appender-ref ref="DLedgerAppender_inner"/>
+        <discardingThreshold>0</discardingThreshold>
+    </appender>
+
     <appender name="RocketmqControllerAppender_inner"
               class="ch.qos.logback.core.rolling.RollingFileAppender">
         <file>${user.home}/logs/rocketmqlogs/controller.log</file>
@@ -78,6 +102,10 @@
         <appender-ref ref="RocketmqControllerAppender"/>
     </logger>
 
+    <logger name="io.openmessaging.storage.dledger" additivity="false" level="INFO">
+        <appender-ref ref="DLedgerAppender"/>
+    </logger>
+
     <logger name="RocketmqControllerConsole" additivity="false" level="INFO">
         <appender-ref ref="STDOUT"/>
     </logger>
diff --git a/example/pom.xml b/example/pom.xml
index ccbfdb666..07cf59082 100644
--- a/example/pom.xml
+++ b/example/pom.xml
@@ -36,6 +36,14 @@
             <groupId>${project.groupId}</groupId>
             <artifactId>rocketmq-client</artifactId>
         </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-tools</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>${project.groupId}</groupId>
+            <artifactId>rocketmq-remoting</artifactId>
+        </dependency>
         <dependency>
             <groupId>${project.groupId}</groupId>
             <artifactId>rocketmq-srvutil</artifactId>
diff --git a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
index b78c85468..2c67e463e 100644
--- a/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
+++ b/example/src/main/java/org/apache/rocketmq/example/quickstart/Producer.java
@@ -54,7 +54,7 @@ public class Producer {
          * </pre>
          */
         // Uncomment the following line while debugging, namesrvAddr should be set to your local address
-//        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
+        producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);
 
         /*
          * Launch the instance.
diff --git a/example/src/main/java/org/apache/rocketmq/example/simple/PopConsumer.java b/example/src/main/java/org/apache/rocketmq/example/simple/PopConsumer.java
new file mode 100644
index 000000000..6321e36d9
--- /dev/null
+++ b/example/src/main/java/org/apache/rocketmq/example/simple/PopConsumer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.example.simple;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageRequestMode;
+import org.apache.rocketmq.remoting.protocol.route.BrokerData;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+
+public class PopConsumer {
+    public static final String TOPIC = "TopicTest";
+    public static final String CONSUMER_GROUP = "CID_JODIE_1";
+    public static void main(String[] args) throws Exception {
+        switchPop();
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);
+        consumer.subscribe(TOPIC, "*");
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        consumer.setClientRebalance(false);
+        consumer.start();
+        System.out.printf("Consumer Started.%n");
+    }
+    private static void switchPop() throws Exception {
+        DefaultMQAdminExt mqAdminExt = new DefaultMQAdminExt();
+        mqAdminExt.start();
+        List<BrokerData> brokerDatas = mqAdminExt.examineTopicRouteInfo(TOPIC).getBrokerDatas();
+        for (BrokerData brokerData : brokerDatas) {
+            Set<String> brokerAddrs = new HashSet<>(brokerData.getBrokerAddrs().values());
+            for (String brokerAddr : brokerAddrs) {
+                mqAdminExt.setMessageRequestMode(brokerAddr, TOPIC, CONSUMER_GROUP, MessageRequestMode.POP, 8, 3_000);
+            }
+        }
+    }
+}
diff --git a/pom.xml b/pom.xml
index ef101ebb4..a953796c4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -117,7 +117,7 @@
         <commons-codec.version>1.13</commons-codec.version>
         <rocketmq-logging.version>1.0.1</rocketmq-logging.version>
         <slf4j-api.version>2.0.3</slf4j-api.version>
-        <logback-classic.version>1.3.4</logback-classic.version>
+        <rocketmq-shaded-slf4j-api-bridge.version>1.0.0</rocketmq-shaded-slf4j-api-bridge.version>
         <commons-validator.version>1.7</commons-validator.version>
         <zstd-jni.version>1.5.2-2</zstd-jni.version>
         <lz4-java.version>1.8.0</lz4-java.version>
@@ -142,6 +142,7 @@
         <junit.version>4.13.2</junit.version>
         <assertj-core.version>3.22.0</assertj-core.version>
         <mockito-core.version>3.10.0</mockito-core.version>
+        <powermock-version>2.0.9</powermock-version>
         <awaitility.version>4.1.0</awaitility.version>
         <truth.version>0.30</truth.version>
 
@@ -590,6 +591,11 @@
                 <artifactId>rocketmq-store</artifactId>
                 <version>${project.version}</version>
             </dependency>
+            <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-tiered-store</artifactId>
+                <version>${project.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.rocketmq</groupId>
                 <artifactId>rocketmq-test</artifactId>
@@ -703,9 +709,9 @@
                 <version>${slf4j-api.version}</version>
             </dependency>
             <dependency>
-                <groupId>ch.qos.logback</groupId>
-                <artifactId>logback-classic</artifactId>
-                <version>${logback-classic.version}</version>
+                <groupId>io.github.aliyunmq</groupId>
+                <artifactId>rocketmq-shaded-slf4j-api-bridge</artifactId>
+                <version>${rocketmq-shaded-slf4j-api-bridge.version}</version>
             </dependency>
             <dependency>
                 <groupId>io.github.aliyunmq</groupId>
@@ -978,5 +984,31 @@
             <artifactId>awaitility</artifactId>
             <version>${awaitility.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>${powermock-version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.objenesis</groupId>
+                    <artifactId>objenesis</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.bytebuddy</groupId>
+                    <artifactId>byte-buddy</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>net.bytebuddy</groupId>
+                    <artifactId>byte-buddy-agent</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito2</artifactId>
+            <version>${powermock-version}</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
index c49546d62..60cc4e3e2 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/RemotingSerializable.java
@@ -26,11 +26,11 @@ public abstract class RemotingSerializable {
     private final static Charset CHARSET_UTF8 = StandardCharsets.UTF_8;
 
     public static byte[] encode(final Object obj) {
-        final String json = toJson(obj, false);
-        if (json != null) {
-            return json.getBytes(CHARSET_UTF8);
+        if (obj == null) {
+            return null;
         }
-        return null;
+        final String json = toJson(obj, false);
+        return json.getBytes(CHARSET_UTF8);
     }
 
     public static String toJson(final Object obj, boolean prettyFormat) {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
index 897b57c3f..d3c897538 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/header/controller/ElectMasterResponseHeader.java
@@ -29,6 +29,13 @@ public class ElectMasterResponseHeader implements CommandCustomHeader {
     public ElectMasterResponseHeader() {
     }
 
+    public ElectMasterResponseHeader(Long masterBrokerId, String masterAddress, Integer masterEpoch, Integer syncStateSetEpoch) {
+        this.masterBrokerId = masterBrokerId;
+        this.masterAddress = masterAddress;
+        this.masterEpoch = masterEpoch;
+        this.syncStateSetEpoch = syncStateSetEpoch;
+    }
+
     public String getMasterAddress() {
         return masterAddress;
     }
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java
index fc0ea00c2..b22906a5c 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/statictopic/TopicQueueMappingUtils.java
@@ -28,7 +28,6 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
-import java.util.Random;
 import java.util.Set;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.TopicConfig;
@@ -41,9 +40,8 @@ public class TopicQueueMappingUtils {
         Map<String, Integer> brokerNumMap = new HashMap<>();
         Map<Integer, String> idToBroker = new HashMap<>();
         //used for remapping
-        Map<String, Integer> brokerNumMapBeforeRemapping = null;
+        Map<String, Integer> brokerNumMapBeforeRemapping;
         int currentIndex = 0;
-        Random random = new Random();
         List<String> leastBrokers = new ArrayList<>();
         private MappingAllocator(Map<Integer, String> idToBroker, Map<String, Integer> brokerNumMap, Map<String, Integer> brokerNumMapBeforeRemapping) {
             this.idToBroker.putAll(idToBroker);
@@ -151,7 +149,7 @@ public class TopicQueueMappingUtils {
             || brokerConfigMap.isEmpty()) {
             return null;
         }
-        //make sure it it not null
+        //make sure it is not null
         long maxEpoch = -1;
         int maxNum = -1;
         String scope = null;
@@ -224,7 +222,7 @@ public class TopicQueueMappingUtils {
                 //the earliest item may have been deleted concurrently
                 inew++;
             } else if (oldItem.getGen() < newItem.getGen()) {
-                //in the following cases, the new item-list has less items than old item-list
+                //in the following cases, the new item-list has fewer items than old item-list
                 //1. the queue is mapped back to a broker which hold the logic queue before
                 //2. The earliest item is deleted by  TopicQueueMappingCleanService
                 iold++;
diff --git a/store/pom.xml b/store/pom.xml
index a88e92d2b..c990cbd76 100644
--- a/store/pom.xml
+++ b/store/pom.xml
@@ -58,12 +58,14 @@
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
-
         <!-- Required by DLedger -->
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
-            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.github.aliyunmq</groupId>
+            <artifactId>rocketmq-shaded-slf4j-api-bridge</artifactId>
         </dependency>
     </dependencies>
 </project>
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java
index 747eb4944..ff48c0a7c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/BrokerMetadata.java
@@ -19,6 +19,8 @@ package org.apache.rocketmq.store.ha.autoswitch;
 
 import org.apache.commons.lang3.StringUtils;
 
+import java.util.Objects;
+
 public class BrokerMetadata extends MetadataFile {
 
     private String clusterName;
@@ -79,4 +81,17 @@ public class BrokerMetadata extends MetadataFile {
     public String getClusterName() {
         return clusterName;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        BrokerMetadata that = (BrokerMetadata) o;
+        return Objects.equals(clusterName, that.clusterName) && Objects.equals(brokerName, that.brokerName) && Objects.equals(brokerId, that.brokerId);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(clusterName, brokerName, brokerId);
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java
index 2e3c3ba99..e89aedbea 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/MetadataFile.java
@@ -18,6 +18,7 @@
 package org.apache.rocketmq.store.ha.autoswitch;
 
 import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.UtilAll;
 
 import java.io.File;
 
@@ -34,7 +35,7 @@ public abstract class MetadataFile {
     public abstract void clearInMem();
 
     public void writeToFile() throws Exception {
-        deleteFile();
+        UtilAll.deleteFile(new File(filePath));
         MixAll.string2File(encodeToStr(), this.filePath);
     }
 
@@ -47,14 +48,12 @@ public abstract class MetadataFile {
         return file.exists();
     }
 
-    public void deleteFile() {
-        File file = new File(filePath);
-        file.deleteOnExit();
-    }
-
     public void clear() {
         clearInMem();
-        deleteFile();
+        UtilAll.deleteFile(new File(filePath));
     }
 
+    public String getFilePath() {
+        return filePath;
+    }
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
index b5993222c..271604b1e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
+++ b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsConstant.java
@@ -22,8 +22,18 @@ public class DefaultStoreMetricsConstant {
     public static final String GAUGE_STORAGE_DISPATCH_BEHIND = "rocketmq_storage_dispatch_behind_bytes";
     public static final String GAUGE_STORAGE_MESSAGE_RESERVE_TIME = "rocketmq_storage_message_reserve_time";
 
+    public static final String GAUGE_TIMER_ENQUEUE_LAG = "rocketmq_timer_enqueue_lag";
+    public static final String GAUGE_TIMER_ENQUEUE_LATENCY = "rocketmq_timer_enqueue_latency";
+    public static final String GAUGE_TIMER_DEQUEUE_LAG = "rocketmq_timer_dequeue_lag";
+    public static final String GAUGE_TIMER_DEQUEUE_LATENCY = "rocketmq_timer_dequeue_latency";
+    public static final String GAUGE_TIMING_MESSAGES = "rocketmq_timing_messages";
+
+    public static final String COUNTER_TIMER_ENQUEUE_TOTAL = "rocketmq_timer_enqueue_total";
+    public static final String COUNTER_TIMER_DEQUEUE_TOTAL = "rocketmq_timer_dequeue_total";
+
     public static final String LABEL_STORAGE_TYPE = "storage_type";
     public static final String DEFAULT_STORAGE_TYPE = "local";
     public static final String LABEL_STORAGE_MEDIUM = "storage_medium";
     public static final String DEFAULT_STORAGE_MEDIUM = "disk";
+    public static final String LABEL_TOPIC = "topic";
 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
index 020974a22..686265292 100644
--- a/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
+++ b/store/src/main/java/org/apache/rocketmq/store/metrics/DefaultStoreMetricsManager.java
@@ -18,6 +18,7 @@ package org.apache.rocketmq.store.metrics;
 
 import io.opentelemetry.api.common.Attributes;
 import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongCounter;
 import io.opentelemetry.api.metrics.Meter;
 import io.opentelemetry.api.metrics.ObservableLongGauge;
 import io.opentelemetry.sdk.metrics.InstrumentSelector;
@@ -27,18 +28,28 @@ import java.util.Collections;
 import java.util.List;
 import java.util.function.Supplier;
 import org.apache.rocketmq.common.Pair;
+import org.apache.rocketmq.common.metrics.NopLongCounter;
 import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
 import org.apache.rocketmq.store.DefaultMessageStore;
 import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.timer.TimerMessageStore;
 
+import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.COUNTER_TIMER_DEQUEUE_TOTAL;
+import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.COUNTER_TIMER_ENQUEUE_TOTAL;
 import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.DEFAULT_STORAGE_MEDIUM;
 import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.DEFAULT_STORAGE_TYPE;
 import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_STORAGE_DISPATCH_BEHIND;
 import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_STORAGE_FLUSH_BEHIND;
 import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_STORAGE_MESSAGE_RESERVE_TIME;
 import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_STORAGE_SIZE;
+import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_DEQUEUE_LAG;
+import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_DEQUEUE_LATENCY;
+import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_ENQUEUE_LAG;
+import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMER_ENQUEUE_LATENCY;
+import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.GAUGE_TIMING_MESSAGES;
 import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_MEDIUM;
 import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_STORAGE_TYPE;
+import static org.apache.rocketmq.store.metrics.DefaultStoreMetricsConstant.LABEL_TOPIC;
 
 public class DefaultStoreMetricsManager {
     public static Supplier<AttributesBuilder> attributesBuilderSupplier;
@@ -49,6 +60,15 @@ public class DefaultStoreMetricsManager {
     public static ObservableLongGauge dispatchBehind = new NopObservableLongGauge();
     public static ObservableLongGauge messageReserveTime = new NopObservableLongGauge();
 
+    public static ObservableLongGauge timerEnqueueLag = new NopObservableLongGauge();
+    public static ObservableLongGauge timerEnqueueLatency = new NopObservableLongGauge();
+    public static ObservableLongGauge timerDequeueLag = new NopObservableLongGauge();
+    public static ObservableLongGauge timerDequeueLatency = new NopObservableLongGauge();
+    public static ObservableLongGauge timingMessages = new NopObservableLongGauge();
+
+    public static LongCounter timerDequeueTotal = new NopLongCounter();
+    public static LongCounter timerEnqueueTotal = new NopLongCounter();
+
     public static List<Pair<InstrumentSelector, View>> getMetricsView() {
         return Collections.emptyList();
     }
@@ -95,6 +115,72 @@ public class DefaultStoreMetricsManager {
                 }
                 measurement.record(System.currentTimeMillis() - earliestMessageTime, newAttributesBuilder().build());
             });
+
+        timerEnqueueLag = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LAG)
+            .setDescription("Timer enqueue messages lag")
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+                TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
+                measurement.record(timerMessageStore.getEnqueueBehindMessages(), newAttributesBuilder().build());
+            });
+
+        timerEnqueueLatency = meter.gaugeBuilder(GAUGE_TIMER_ENQUEUE_LATENCY)
+            .setDescription("Timer enqueue latency")
+            .setUnit("milliseconds")
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+                TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
+                measurement.record(timerMessageStore.getEnqueueBehindMillis(), newAttributesBuilder().build());
+            });
+        timerDequeueLag = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LAG)
+            .setDescription("Timer dequeue messages lag")
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+                TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
+                measurement.record(timerMessageStore.getDequeueBehindMessages(), newAttributesBuilder().build());
+            });
+        timerDequeueLatency = meter.gaugeBuilder(GAUGE_TIMER_DEQUEUE_LATENCY)
+            .setDescription("Timer dequeue latency")
+            .setUnit("milliseconds")
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+                TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
+                measurement.record(timerMessageStore.getDequeueBehind(), newAttributesBuilder().build());
+            });
+        timingMessages = meter.gaugeBuilder(GAUGE_TIMING_MESSAGES)
+            .setDescription("Current message number in timing")
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+                TimerMessageStore timerMessageStore = messageStore.getTimerMessageStore();
+                timerMessageStore.getTimerMetrics()
+                    .getTimingCount()
+                    .forEach((topic, metric) -> {
+                        measurement.record(
+                            metric.getCount().get(),
+                            newAttributesBuilder().put(LABEL_TOPIC, topic).build()
+                        );
+                    });
+            });
+        timerDequeueTotal = meter.counterBuilder(COUNTER_TIMER_DEQUEUE_TOTAL)
+            .setDescription("Total number of timer dequeue")
+            .build();
+        timerEnqueueTotal = meter.counterBuilder(COUNTER_TIMER_ENQUEUE_TOTAL)
+            .setDescription("Total number of timer enqueue")
+            .build();
+    }
+
+    public static void incTimerDequeueCount(String topic) {
+        timerDequeueTotal.add(1, newAttributesBuilder()
+            .put(LABEL_TOPIC, topic)
+            .build());
+    }
+
+    public static void incTimerEnqueueCount(String topic) {
+        AttributesBuilder attributesBuilder = newAttributesBuilder();
+        if (topic != null) {
+            attributesBuilder.put(LABEL_TOPIC, topic);
+        }
+        timerEnqueueTotal.add(1, attributesBuilder.build());
     }
 
     public static AttributesBuilder newAttributesBuilder() {
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index 486e1b756..8b77f4942 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -375,6 +375,7 @@ public class ConsumeQueueStore {
 
     public void setTopicQueueTable(ConcurrentMap<String, Long> topicQueueTable) {
         this.queueOffsetAssigner.setTopicQueueTable(topicQueueTable);
+        this.queueOffsetAssigner.setLmqTopicQueueTable(topicQueueTable);
     }
 
     public ConcurrentMap getTopicQueueTable() {
diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
index d069aea67..fe8586f6d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/QueueOffsetAssigner.java
@@ -17,8 +17,11 @@
 
 package org.apache.rocketmq.store.queue;
 
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+
+import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.utils.ConcurrentHashMapUtils;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -82,6 +85,16 @@ public class QueueOffsetAssigner {
         this.topicQueueTable = topicQueueTable;
     }
 
+    public void setLmqTopicQueueTable(ConcurrentMap<String, Long> lmqTopicQueueTable) {
+        ConcurrentMap<String, Long> table = new ConcurrentHashMap<String, Long>(1024);
+        for (Map.Entry<String, Long> entry : lmqTopicQueueTable.entrySet()) {
+            if (MixAll.isLmq(entry.getKey())) {
+                table.put(entry.getKey(), entry.getValue());
+            }
+        }
+        this.lmqTopicQueueTable = table;
+    }
+
     public ConcurrentMap<String, Long> getTopicQueueTable() {
         return topicQueueTable;
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
index d864dd50a..fb717550f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStats.java
@@ -43,7 +43,7 @@ public class BrokerStats {
         this.msgGetTotalYesterdayMorning = this.msgGetTotalTodayMorning;
 
         this.msgPutTotalTodayMorning =
-            this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
+            this.defaultMessageStore.getBrokerStatsManager().getBrokerPutNumsWithoutSystemTopic();
         this.msgGetTotalTodayMorning =
             this.defaultMessageStore.getBrokerStatsManager().getBrokerGetNumsWithoutSystemTopic();
 
@@ -84,7 +84,7 @@ public class BrokerStats {
     }
 
     public long getMsgPutTotalTodayNow() {
-        return this.defaultMessageStore.getStoreStatsService().getPutMessageTimesTotal();
+        return this.defaultMessageStore.getBrokerStatsManager().getBrokerPutNumsWithoutSystemTopic();
     }
 
     public long getMsgGetTotalTodayNow() {
diff --git a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index 132ddc333..2dd3fc5b5 100644
--- a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++ b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -76,6 +76,7 @@ public class BrokerStatsManager {
     public static final String BROKER_ACK_NUMS = "BROKER_ACK_NUMS";
     public static final String BROKER_CK_NUMS = "BROKER_CK_NUMS";
     public static final String BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC = "BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC";
+    public static final String BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC = "BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC";
     public static final String SNDBCK2DLQ_TIMES = "SNDBCK2DLQ_TIMES";
 
     public static final String COMMERCIAL_OWNER = "Owner";
@@ -190,6 +191,8 @@ public class BrokerStatsManager {
         this.statsTable.put(BROKER_CK_NUMS, new StatsItemSet(BROKER_CK_NUMS, this.scheduledExecutorService, log));
         this.statsTable.put(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC,
             new StatsItemSet(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC, this.scheduledExecutorService, log));
+        this.statsTable.put(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC,
+            new StatsItemSet(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC, this.scheduledExecutorService, log));
         this.statsTable.put(Stats.GROUP_GET_FROM_DISK_NUMS,
             new StatsItemSet(Stats.GROUP_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log));
         this.statsTable.put(Stats.GROUP_GET_FROM_DISK_SIZE,
@@ -513,11 +516,12 @@ public class BrokerStatsManager {
         this.statsTable.get(Stats.BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(1);
     }
 
-    public void incBrokerPutNums(final int incValue) {
+    public void incBrokerPutNums(final String topic, final int incValue) {
         this.statsTable.get(Stats.BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
+        incBrokerPutNumsWithoutSystemTopic(topic, incValue);
     }
 
-    public void incBrokerGetNums(String topic, final int incValue) {
+    public void incBrokerGetNums(final String topic, final int incValue) {
         this.statsTable.get(Stats.BROKER_GET_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
         this.incBrokerGetNumsWithoutSystemTopic(topic, incValue);
     }
@@ -537,6 +541,13 @@ public class BrokerStatsManager {
         this.statsTable.get(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
     }
 
+    public void incBrokerPutNumsWithoutSystemTopic(final String topic, final int incValue) {
+        if (TopicValidator.isSystemTopic(topic)) {
+            return;
+        }
+        this.statsTable.get(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC).getAndCreateStatsItem(this.clusterName).getValue().add(incValue);
+    }
+
     public long getBrokerGetNumsWithoutSystemTopic() {
         final StatsItemSet statsItemSet = this.statsTable.get(BROKER_GET_NUMS_WITHOUT_SYSTEM_TOPIC);
         if (statsItemSet == null) {
@@ -549,6 +560,18 @@ public class BrokerStatsManager {
         return statsItem.getValue().longValue();
     }
 
+    public long getBrokerPutNumsWithoutSystemTopic() {
+        final StatsItemSet statsItemSet = this.statsTable.get(BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC);
+        if (statsItemSet == null) {
+            return 0;
+        }
+        final StatsItem statsItem = statsItemSet.getStatsItem(this.clusterName);
+        if (statsItem == null) {
+            return 0;
+        }
+        return statsItem.getValue().longValue();
+    }
+
     public void incSendBackNums(final String group, final String topic) {
         final String statsKey = buildStatsKey(topic, group);
         this.statsTable.get(Stats.SNDBCK_PUT_NUMS).addValue(statsKey, 1, 1);
diff --git a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
index 89b93abd0..c6ab81df4 100644
--- a/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/timer/TimerMessageStore.java
@@ -17,32 +17,6 @@
 package org.apache.rocketmq.store.timer;
 
 import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
-import java.util.function.Function;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.rocketmq.common.ServiceThread;
-import org.apache.rocketmq.common.ThreadFactoryImpl;
-import org.apache.rocketmq.common.TopicFilterType;
-import org.apache.rocketmq.common.UtilAll;
-import org.apache.rocketmq.common.constant.LoggerName;
-import org.apache.rocketmq.common.message.MessageAccessor;
-import org.apache.rocketmq.common.message.MessageClientIDSetter;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageDecoder;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.topic.TopicValidator;
-import org.apache.rocketmq.logging.org.slf4j.Logger;
-import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
-import org.apache.rocketmq.store.ConsumeQueue;
-import org.apache.rocketmq.store.DefaultMessageStore;
-import org.apache.rocketmq.store.logfile.MappedFile;
-import org.apache.rocketmq.common.message.MessageExtBrokerInner;
-import org.apache.rocketmq.store.MessageStore;
-import org.apache.rocketmq.store.PutMessageResult;
-import org.apache.rocketmq.store.SelectMappedBufferResult;
-import org.apache.rocketmq.store.config.BrokerRole;
-import org.apache.rocketmq.store.config.MessageStoreConfig;
-import org.apache.rocketmq.store.stats.BrokerStatsManager;
-
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -66,6 +40,32 @@ import java.util.concurrent.LinkedBlockingDeque;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.rocketmq.common.ServiceThread;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+import org.apache.rocketmq.common.TopicFilterType;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.message.MessageAccessor;
+import org.apache.rocketmq.common.message.MessageClientIDSetter;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.ConsumeQueue;
+import org.apache.rocketmq.store.DefaultMessageStore;
+import org.apache.rocketmq.store.MessageStore;
+import org.apache.rocketmq.store.PutMessageResult;
+import org.apache.rocketmq.store.SelectMappedBufferResult;
+import org.apache.rocketmq.store.config.BrokerRole;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.metrics.DefaultStoreMetricsManager;
+import org.apache.rocketmq.store.stats.BrokerStatsManager;
 import org.apache.rocketmq.store.util.PerfCounter;
 
 public class TimerMessageStore {
@@ -1050,7 +1050,7 @@ public class TimerMessageStore {
                             this.brokerStatsManager.incTopicPutNums(message.getTopic(), 1, 1);
                             this.brokerStatsManager.incTopicPutSize(message.getTopic(),
                                 putMessageResult.getAppendMessageResult().getWroteBytes());
-                            this.brokerStatsManager.incBrokerPutNums(1);
+                            this.brokerStatsManager.incBrokerPutNums(message.getTopic(), 1);
                         }
                         return PUT_OK;
                     case SERVICE_NOT_AVAILABLE:
@@ -1106,6 +1106,13 @@ public class TimerMessageStore {
         return msgInner;
     }
 
+    private String getRealTopic(MessageExt msgExt) {
+        if (msgExt == null) {
+            return null;
+        }
+        return msgExt.getProperty(MessageConst.PROPERTY_REAL_TOPIC);
+    }
+
     private long formatTimeMs(long timeMs) {
         return timeMs / precisionMs * precisionMs;
     }
@@ -1300,6 +1307,7 @@ public class TimerMessageStore {
                             req.setLatch(latch);
                             try {
                                 perfs.startTick("enqueue_put");
+                                DefaultStoreMetricsManager.incTimerEnqueueCount(getRealTopic(req.getMsg()));
                                 if (shouldRunningDequeue && req.getDelayTime() < currWriteTimeMs) {
                                     dequeuePutQueue.put(req);
                                 } else {
@@ -1414,6 +1422,7 @@ public class TimerMessageStore {
                             }
                             try {
                                 perfs.startTick("dequeue_put");
+                                DefaultStoreMetricsManager.incTimerDequeueCount(getRealTopic(tr.getMsg()));
                                 addMetric(tr.getMsg(), -1);
                                 MessageExtBrokerInner msg = convert(tr.getMsg(), tr.getEnqueueTime(), needRoll(tr.getMagic()));
                                 doRes = PUT_NEED_RETRY != doPut(msg, needRoll(tr.getMagic()));
@@ -1600,7 +1609,7 @@ public class TimerMessageStore {
                         TimerMessageStore.LOGGER.info("[{}]Timer progress-check commitRead:[{}] currRead:[{}] currWrite:[{}] readBehind:{} currReadOffset:{} offsetBehind:{} behindMaster:{} " +
                                 "enqPutQueue:{} deqGetQueue:{} deqPutQueue:{} allCongestNum:{} enqExpiredStoreTime:{}",
                             storeConfig.getBrokerRole(),
-                            format(commitReadTimeMs), format(currReadTimeMs), format(currWriteTimeMs), getReadBehind(),
+                            format(commitReadTimeMs), format(currReadTimeMs), format(currWriteTimeMs), getDequeueBehind(),
                             tmpQueueOffset, maxOffsetInQueue - tmpQueueOffset, timerCheckpoint.getMasterTimerQueueOffset() - tmpQueueOffset,
                             enqueuePutQueue.size(), dequeueGetQueue.size(), dequeuePutQueue.size(), getAllCongestNum(), format(lastEnqueueButExpiredStoreTime));
                     }
@@ -1636,22 +1645,34 @@ public class TimerMessageStore {
         return false;
     }
 
-    public long getEnqueueBehind() {
+    public long getEnqueueBehindMessages() {
+        long tmpQueueOffset = currQueueOffset;
+        ConsumeQueue cq = (ConsumeQueue) messageStore.getConsumeQueue(TIMER_TOPIC, 0);
+        long maxOffsetInQueue = cq == null ? 0 : cq.getMaxOffsetInQueue();
+        return maxOffsetInQueue - tmpQueueOffset;
+    }
+
+    public long getEnqueueBehindMillis() {
         if (System.currentTimeMillis() - lastEnqueueButExpiredTime < 2000) {
             return (System.currentTimeMillis() - lastEnqueueButExpiredStoreTime) / 1000;
         }
         return 0;
     }
 
-    public long getReadBehind() {
-        return (System.currentTimeMillis() - currReadTimeMs) / 1000;
+    public long getEnqueueBehind() {
+        return getEnqueueBehindMillis() / 1000;
     }
 
-    public long getOffsetBehind() {
-        long tmpQueueOffset = currQueueOffset;
-        ConsumeQueue cq = (ConsumeQueue) messageStore.getConsumeQueue(TIMER_TOPIC, 0);
-        long maxOffsetInQueue = cq == null ? 0 : cq.getMaxOffsetInQueue();
-        return maxOffsetInQueue - tmpQueueOffset;
+    public long getDequeueBehindMessages() {
+        return timerWheel.getAllNum(currReadTimeMs);
+    }
+
+    public long getDequeueBehindMillis() {
+        return System.currentTimeMillis() - currReadTimeMs;
+    }
+
+    public long getDequeueBehind() {
+        return getDequeueBehindMillis() / 1000;
     }
 
     public float getEnqueueTps() {
diff --git a/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java b/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
index c32db16dd..a602da093 100644
--- a/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
@@ -199,4 +199,17 @@ public class BrokerStatsManagerTest {
             .getValue().doubleValue()).isEqualTo(1L);
         assertThat(brokerStatsManager.getBrokerGetNumsWithoutSystemTopic()).isEqualTo(1L);
     }
+
+    @Test
+    public void testIncBrokerPutNumsWithoutSystemTopic() {
+        brokerStatsManager.incBrokerPutNumsWithoutSystemTopic(TOPIC, 1);
+        assertThat(brokerStatsManager.getStatsItem(BrokerStatsManager.BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC, CLUSTER_NAME)
+            .getValue().doubleValue()).isEqualTo(1L);
+        assertThat(brokerStatsManager.getBrokerPutNumsWithoutSystemTopic()).isEqualTo(1L);
+
+        brokerStatsManager.incBrokerPutNumsWithoutSystemTopic(TopicValidator.RMQ_SYS_TRACE_TOPIC, 1);
+        assertThat(brokerStatsManager.getStatsItem(BrokerStatsManager.BROKER_PUT_NUMS_WITHOUT_SYSTEM_TOPIC, CLUSTER_NAME)
+            .getValue().doubleValue()).isEqualTo(1L);
+        assertThat(brokerStatsManager.getBrokerPutNumsWithoutSystemTopic()).isEqualTo(1L);
+    }
 }
diff --git a/test/pom.xml b/test/pom.xml
index a50bc4e33..1fa2f7420 100644
--- a/test/pom.xml
+++ b/test/pom.xml
@@ -66,10 +66,6 @@
             <groupId>com.google.truth</groupId>
             <artifactId>truth</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.mockito</groupId>
-            <artifactId>mockito-core</artifactId>
-        </dependency>
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
diff --git a/tieredstore/README.md b/tieredstore/README.md
new file mode 100644
index 000000000..d58b5e0c6
--- /dev/null
+++ b/tieredstore/README.md
@@ -0,0 +1,64 @@
+# Tiered storage for RocketMQ (Technical preview)
+
+RocketMQ tiered storage allows users to offload message data from the local disk to other cheaper and larger storage mediums. So that users can extend the message reserve time at a lower cost. And different topics can flexibly specify different TTL as needed.
+
+This article is a cookbook for RocketMQ tiered storage.
+
+## Architecture
+
+![Tiered storage architecture](tiered_storage_arch.png)
+
+## Quick start
+
+Use the following steps to easily use tiered storage
+
+1. Change `messageStorePlugIn` to `org.apache.rocketmq.tieredstore.TieredMessageStore` in your `broker.conf`.
+2. Configure your backend service provider. change `tieredBackendServiceProvider` to your storage medium implement. We give a default implement: POSIX provider, and you need to change `tieredStoreFilepath` to the mount point of storage medium for tiered storage.
+3. Start the broker and enjoy!
+
+## Configuration
+
+The following are some core configurations, for more details, see [TieredMessageStoreConfig](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java)
+
+| Configuration                   | Default value                                                   | Unit        | Function                                                                        |
+| ------------------------------- | --------------------------------------------------------------- | ----------- | ------------------------------------------------------------------------------- |
+| messageStorePlugIn              |                                                                 |             | Set to org.apache.rocketmq.tieredstore.TieredMessageStore to use tiered storage |
+| tieredMetadataServiceProvider   | org.apache.rocketmq.tieredstore.metadata.TieredMetadataManager  |             | Select your metadata provider                                                   |
+| tieredBackendServiceProvider    | org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment |             | Select your backend service provider                                            |
+| tieredStoreFilepath             |                                                                 |             | Select the directory using for tiered storage, only for POSIX provider.         |
+| tieredStorageLevel              | NOT_IN_DISK                                                     |             | The options are DISABLE, NOT_IN_DISK, NOT_IN_MEM, FORCE                         |
+| tieredStoreFileReservedTime     | 72                                                              | hour        | Default topic TTL in tiered storage                                             |
+| tieredStoreGroupCommitCount     | 2500                                                            |             | The number of messages that trigger one batch transfer                          |
+| tieredStoreGroupCommitSize      | 33554432                                                        | byte        | The size of messages that trigger one batch transfer, 32M by default            |
+| tieredStoreMaxGroupCommitCount  | 10000                                                           |             | The maximum number of messages waiting to be transfered per queue               |
+| readAheadCacheExpireDuration    | 1000                                                            | millisecond | Read-ahead cache expiration time                                                |
+| readAheadCacheSizeThresholdRate | 0.3                                                             |             | The maximum heap space occupied by the read-ahead cache                         |
+
+## Metrics
+
+Tiered storage provides some useful metrics, see [RIP-46](https://github.com/apache/rocketmq/wiki/RIP-46-Observability-improvement-for-RocketMQ) for details.
+
+| Type      | Name                                                | Unit         |
+| --------- | --------------------------------------------------- | ------------ |
+| Histogram | rocketmq_tiered_store_api_latency                   | milliseconds |
+| Histogram | rocketmq_tiered_store_provider_rpc_latency          | milliseconds |
+| Histogram | rocketmq_tiered_store_provider_upload_bytes         | byte         |
+| Histogram | rocketmq_tiered_store_provider_download_bytes       | byte         |
+| Gauge     | rocketmq_tiered_store_dispatch_behind               |              |
+| Gauge     | rocketmq_tiered_store_dispatch_latency              | byte         |
+| Counter   | rocketmq_tiered_store_messages_dispatch_total       |              |
+| Counter   | rocketmq_tiered_store_messages_out_total            |              |
+| Counter   | rocketmq_tiered_store_get_message_fallback_total    |              |
+| Gauge     | rocketmq_tiered_store_read_ahead_cache_count        |              |
+| Gauge     | rocketmq_tiered_store_read_ahead_cache_bytes        | byte         |
+| Counter   | rocketmq_tiered_store_read_ahead_cache_access_total |              |
+| Counter   | rocketmq_tiered_store_read_ahead_cache_hit_total    |              |
+| Gauge     | rocketmq_storage_message_reserve_time               | milliseconds |
+
+## How to contribute
+
+We need community participation to add more backend service providers for tiered storage. [PosixFileSegment](https://github.com/apache/rocketmq/blob/tiered_storage/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java), the implementation provided by default is just an example. People who want to contribute can follow it to implement their own providers, such as S3FileSegment, OSSFileSegment, and MinIOFileSegment. Here are some guidelines:
+
+1. Extend [TieredFileSegment](https://github.com/apache/rocketmq/blob/tiered_storage/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java) and implement the methods of [TieredStoreProvider](https://github.com/apache/rocketmq/blob/tiered_storage/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java) interface.
+2. Record metrics where appropriate. See `rocketmq_tiered_store_provider_rpc_latency`, `rocketmq_tiered_store_provider_upload_bytes`, and `rocketmq_tiered_store_provider_download_bytes`
+3. No need to maintain your own cache and avoid polluting the page cache. It is already having the read-ahead cache.
diff --git a/tieredstore/pom.xml b/tieredstore/pom.xml
index 94b5c88e7..27d31af93 100644
--- a/tieredstore/pom.xml
+++ b/tieredstore/pom.xml
@@ -35,6 +35,11 @@
         <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-store</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
             <exclusions>
                 <exclusion>
                     <groupId>org.checkerframework</groupId>
@@ -43,11 +48,6 @@
             </exclusions>
         </dependency>
 
-        <dependency>
-            <groupId>com.github.ben-manes.caffeine</groupId>
-            <artifactId>caffeine</artifactId>
-        </dependency>
-
         <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
index 1d7aeae38..0b1194953 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/TieredDispatcher.java
@@ -166,7 +166,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
                 if (result == AppendResult.SUCCESS) {
                     Attributes attributes = TieredStoreMetricsManager.newAttributesBuilder()
                         .put(TieredStoreMetricsConstant.LABEL_TOPIC, request.getTopic())
-                        .put(TieredStoreMetricsConstant.LABEL_QUEUE, request.getQueueId())
+                        .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, request.getQueueId())
                         .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, TieredFileSegment.FileSegmentType.COMMIT_LOG.name().toLowerCase())
                         .build();
                     TieredStoreMetricsManager.messagesDispatchTotal.add(1, attributes);
@@ -271,7 +271,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
             }
             Attributes attributes = TieredStoreMetricsManager.newAttributesBuilder()
                 .put(TieredStoreMetricsConstant.LABEL_TOPIC, mq.getTopic())
-                .put(TieredStoreMetricsConstant.LABEL_QUEUE, mq.getQueueId())
+                .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, mq.getQueueId())
                 .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, TieredFileSegment.FileSegmentType.COMMIT_LOG.name().toLowerCase())
                 .build();
             TieredStoreMetricsManager.messagesDispatchTotal.add(queueOffset - beforeOffset, attributes);
@@ -290,7 +290,8 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
         }
     }
 
-    public void handleAppendCommitLogResult(AppendResult result, TieredMessageQueueContainer container, long queueOffset,
+    public void handleAppendCommitLogResult(AppendResult result, TieredMessageQueueContainer container,
+        long queueOffset,
         long dispatchOffset, long newCommitLogOffset, int size, long tagCode, ByteBuffer message) {
         MessageQueue mq = container.getMessageQueue();
         String topic = mq.getTopic();
@@ -449,7 +450,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
         cqMetricsMap.forEach((messageQueue, count) -> {
             Attributes attributes = TieredStoreMetricsManager.newAttributesBuilder()
                 .put(TieredStoreMetricsConstant.LABEL_TOPIC, messageQueue.getTopic())
-                .put(TieredStoreMetricsConstant.LABEL_QUEUE, messageQueue.getQueueId())
+                .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, messageQueue.getQueueId())
                 .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, TieredFileSegment.FileSegmentType.CONSUME_QUEUE.name().toLowerCase())
                 .build();
             TieredStoreMetricsManager.messagesDispatchTotal.add(count, attributes);
@@ -457,7 +458,7 @@ public class TieredDispatcher extends ServiceThread implements CommitLogDispatch
         ifMetricsMap.forEach((messageQueue, count) -> {
             Attributes attributes = TieredStoreMetricsManager.newAttributesBuilder()
                 .put(TieredStoreMetricsConstant.LABEL_TOPIC, messageQueue.getTopic())
-                .put(TieredStoreMetricsConstant.LABEL_QUEUE, messageQueue.getQueueId())
+                .put(TieredStoreMetricsConstant.LABEL_QUEUE_ID, messageQueue.getQueueId())
                 .put(TieredStoreMetricsConstant.LABEL_FILE_TYPE, TieredFileSegment.FileSegmentType.INDEX.name().toLowerCase())
                 .build();
             TieredStoreMetricsManager.messagesDispatchTotal.add(count, attributes);
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
index f91650419..6cc51f541 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/common/TieredMessageStoreConfig.java
@@ -74,7 +74,7 @@ public class TieredMessageStoreConfig {
     // index file will force rolling to next file after idle specified time, default is 3h
     private int tieredStoreIndexFileRollingIdleInterval = 3 * 60 * 60 * 1000;
     private String tieredMetadataServiceProvider = "org.apache.rocketmq.tieredstore.metadata.TieredMetadataManager";
-    private String tieredBackendServiceProvider = "";
+    private String tieredBackendServiceProvider = "org.apache.rocketmq.tieredstore.provider.posix.PosixFileSegment";
     // file reserved time, default is 72 hour
     private int tieredStoreFileReservedTime = 72;
     // time of forcing commitLog to roll to next file, default is 24 hour
@@ -97,6 +97,8 @@ public class TieredMessageStoreConfig {
     private long readAheadCacheExpireDuration = 10 * 1000;
     private double readAheadCacheSizeThresholdRate = 0.3;
 
+    private String tieredStoreFilepath = "";
+
     public static String localHostName() {
         try {
             return InetAddress.getLocalHost().getHostName();
@@ -321,4 +323,12 @@ public class TieredMessageStoreConfig {
     public void setReadAheadCacheSizeThresholdRate(double rate) {
         this.readAheadCacheSizeThresholdRate = rate;
     }
+
+    public String getTieredStoreFilepath() {
+        return tieredStoreFilepath;
+    }
+
+    public void setTieredStoreFilepath(String tieredStoreFilepath) {
+        this.tieredStoreFilepath = tieredStoreFilepath;
+    }
 }
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredIndexFile.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredIndexFile.java
index fd696ed5c..6514c4e95 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredIndexFile.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/container/TieredIndexFile.java
@@ -45,8 +45,8 @@ public class TieredIndexFile {
     public static final int INDEX_FILE_BEGIN_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 4;
     public static final int INDEX_FILE_END_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 8;
     private static final int INDEX_FILE_HEADER_SIZE = 28;
-    private static final int INDEX_FILE_HASH_SLOT_SIZE = 8;
-    private static final int INDEX_FILE_HASH_ORIGIN_INDEX_SIZE = 32;
+    public static final int INDEX_FILE_HASH_SLOT_SIZE = 8;
+    public static final int INDEX_FILE_HASH_ORIGIN_INDEX_SIZE = 32;
     public static final int INDEX_FILE_HASH_COMPACT_INDEX_SIZE = 28;
 
     public static final int INDEX_FILE_HEADER_MAGIC_CODE_POSITION = 0;
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/TieredMetadataManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/TieredMetadataManager.java
index 1e7aea5e3..2fe964b3a 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/TieredMetadataManager.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/TieredMetadataManager.java
@@ -16,6 +16,8 @@
  */
 package org.apache.rocketmq.tieredstore.metadata;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.serializer.SerializerFeature;
 import java.io.File;
 import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -53,7 +55,13 @@ public class TieredMetadataManager extends ConfigManager implements TieredMetada
         dataWrapper.setMaxTopicId(maxTopicId);
         dataWrapper.setTopicMetadataTable(topicMetadataTable);
         dataWrapper.setQueueMetadataTable(new HashMap<>(queueMetadataTable));
-        return dataWrapper.toJson(false);
+        dataWrapper.setCommitLogFileSegmentTable(new HashMap<>(commitLogFileSegmentTable));
+        dataWrapper.setConsumeQueueFileSegmentTable(new HashMap<>(consumeQueueFileSegmentTable));
+        dataWrapper.setIndexFileSegmentTable(new HashMap<>(indexFileSegmentTable));
+        if (prettyFormat) {
+            JSON.toJSONString(dataWrapper, SerializerFeature.DisableCircularReferenceDetect, SerializerFeature.PrettyFormat);
+        }
+        return JSON.toJSONString(dataWrapper, SerializerFeature.DisableCircularReferenceDetect);
     }
 
     @Override
@@ -71,6 +79,12 @@ public class TieredMetadataManager extends ConfigManager implements TieredMetada
                 topicMetadataTable.putAll(dataWrapper.getTopicMetadataTable());
                 dataWrapper.getQueueMetadataTable()
                     .forEach((topic, map) -> queueMetadataTable.put(topic, new ConcurrentHashMap<>(map)));
+                dataWrapper.getCommitLogFileSegmentTable()
+                    .forEach((mq, map) -> commitLogFileSegmentTable.put(mq, new ConcurrentHashMap<>(map)));
+                dataWrapper.getConsumeQueueFileSegmentTable()
+                    .forEach((mq, map) -> consumeQueueFileSegmentTable.put(mq, new ConcurrentHashMap<>(map)));
+                dataWrapper.getIndexFileSegmentTable()
+                    .forEach((mq, map) -> indexFileSegmentTable.put(mq, new ConcurrentHashMap<>(map)));
             }
         }
     }
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/TieredMetadataSerializeWrapper.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/TieredMetadataSerializeWrapper.java
index 24352743f..ad058ab8e 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/TieredMetadataSerializeWrapper.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metadata/TieredMetadataSerializeWrapper.java
@@ -18,13 +18,16 @@ package org.apache.rocketmq.tieredstore.metadata;
 
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
 public class TieredMetadataSerializeWrapper extends RemotingSerializable {
     private AtomicInteger maxTopicId;
     private Map<String /*topic*/, TopicMetadata> topicMetadataTable;
     private Map<String /*topic*/, Map<Integer /*queueId*/, QueueMetadata>> queueMetadataTable;
-
+    private Map<MessageQueue, Map<Long /*baseOffset*/, FileSegmentMetadata>> commitLogFileSegmentTable;
+    private Map<MessageQueue, Map<Long /*baseOffset*/, FileSegmentMetadata>> consumeQueueFileSegmentTable;
+    private Map<MessageQueue, Map<Long /*baseOffset*/, FileSegmentMetadata>> indexFileSegmentTable;
 
     public AtomicInteger getMaxTopicId() {
         return maxTopicId;
@@ -51,4 +54,31 @@ public class TieredMetadataSerializeWrapper extends RemotingSerializable {
         Map<String, Map<Integer, QueueMetadata>> queueMetadataTable) {
         this.queueMetadataTable = queueMetadataTable;
     }
+
+    public Map<MessageQueue, Map<Long, FileSegmentMetadata>> getCommitLogFileSegmentTable() {
+        return commitLogFileSegmentTable;
+    }
+
+    public void setCommitLogFileSegmentTable(
+        Map<MessageQueue, Map<Long, FileSegmentMetadata>> commitLogFileSegmentTable) {
+        this.commitLogFileSegmentTable = commitLogFileSegmentTable;
+    }
+
+    public Map<MessageQueue, Map<Long, FileSegmentMetadata>> getConsumeQueueFileSegmentTable() {
+        return consumeQueueFileSegmentTable;
+    }
+
+    public void setConsumeQueueFileSegmentTable(
+        Map<MessageQueue, Map<Long, FileSegmentMetadata>> consumeQueueFileSegmentTable) {
+        this.consumeQueueFileSegmentTable = consumeQueueFileSegmentTable;
+    }
+
+    public Map<MessageQueue, Map<Long, FileSegmentMetadata>> getIndexFileSegmentTable() {
+        return indexFileSegmentTable;
+    }
+
+    public void setIndexFileSegmentTable(
+        Map<MessageQueue, Map<Long, FileSegmentMetadata>> indexFileSegmentTable) {
+        this.indexFileSegmentTable = indexFileSegmentTable;
+    }
 }
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
index d9a10d15c..ad7281510 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsConstant.java
@@ -36,9 +36,11 @@ public class TieredStoreMetricsConstant {
     public static final String GAUGE_STORAGE_MESSAGE_RESERVE_TIME = "rocketmq_storage_message_reserve_time";
 
     public static final String LABEL_OPERATION = "operation";
+    public static final String LABEL_SUCCESS = "success";
+
     public static final String LABEL_TOPIC = "topic";
     public static final String LABEL_GROUP = "group";
-    public static final String LABEL_QUEUE = "queue";
+    public static final String LABEL_QUEUE_ID = "queue_id";
     public static final String LABEL_FILE_TYPE = "file_type";
 
     // blob constants
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
index 69fe28047..0b0dfd63a 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/metrics/TieredStoreMetricsManager.java
@@ -70,7 +70,7 @@ import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant
 import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.HISTOGRAM_PROVIDER_RPC_LATENCY;
 import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.HISTOGRAM_UPLOAD_BYTES;
 import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE;
-import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_QUEUE;
+import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_QUEUE_ID;
 import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_TOPIC;
 import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.STORAGE_MEDIUM_BLOB;
 
@@ -181,13 +181,13 @@ public class TieredStoreMetricsManager {
 
                     Attributes commitLogAttributes = newAttributesBuilder()
                         .put(LABEL_TOPIC, mq.getTopic())
-                        .put(LABEL_QUEUE, mq.getQueueId())
+                        .put(LABEL_QUEUE_ID, mq.getQueueId())
                         .put(LABEL_FILE_TYPE, TieredFileSegment.FileSegmentType.COMMIT_LOG.name().toLowerCase())
                         .build();
                     measurement.record(Math.max(maxOffset - container.getDispatchOffset(), 0), commitLogAttributes);
                     Attributes consumeQueueAttributes = newAttributesBuilder()
                         .put(LABEL_TOPIC, mq.getTopic())
-                        .put(LABEL_QUEUE, mq.getQueueId())
+                        .put(LABEL_QUEUE_ID, mq.getQueueId())
                         .put(LABEL_FILE_TYPE, TieredFileSegment.FileSegmentType.CONSUME_QUEUE.name().toLowerCase())
                         .build();
                     measurement.record(Math.max(maxOffset - container.getConsumeQueueMaxOffset(), 0), consumeQueueAttributes);
@@ -209,7 +209,7 @@ public class TieredStoreMetricsManager {
 
                     Attributes commitLogAttributes = newAttributesBuilder()
                         .put(LABEL_TOPIC, mq.getTopic())
-                        .put(LABEL_QUEUE, mq.getQueueId())
+                        .put(LABEL_QUEUE_ID, mq.getQueueId())
                         .put(LABEL_FILE_TYPE, TieredFileSegment.FileSegmentType.COMMIT_LOG.name().toLowerCase())
                         .build();
                     long commitLogDispatchLatency = next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), container.getDispatchOffset());
@@ -221,7 +221,7 @@ public class TieredStoreMetricsManager {
 
                     Attributes consumeQueueAttributes = newAttributesBuilder()
                         .put(LABEL_TOPIC, mq.getTopic())
-                        .put(LABEL_QUEUE, mq.getQueueId())
+                        .put(LABEL_QUEUE_ID, mq.getQueueId())
                         .put(LABEL_FILE_TYPE, TieredFileSegment.FileSegmentType.CONSUME_QUEUE.name().toLowerCase())
                         .build();
                     long consumeQueueDispatchOffset = container.getConsumeQueueMaxOffset();
@@ -307,7 +307,7 @@ public class TieredStoreMetricsManager {
                         MessageQueue mq = container.getMessageQueue();
                         Attributes attributes = newAttributesBuilder()
                             .put(LABEL_TOPIC, mq.getTopic())
-                            .put(LABEL_QUEUE, mq.getQueueId())
+                            .put(LABEL_QUEUE_ID, mq.getQueueId())
                             .build();
                         measurement.record(System.currentTimeMillis() - timestamp, attributes);
                     }
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
index 5a86db6dd..2712e84c0 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java
@@ -38,7 +38,7 @@ import org.apache.rocketmq.tieredstore.exception.TieredStoreException;
 import org.apache.rocketmq.tieredstore.util.MessageBufferUtil;
 import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
 
-public abstract class TieredFileSegment implements Comparable<TieredFileSegment>, TieredStoreBackendProvider {
+public abstract class TieredFileSegment implements Comparable<TieredFileSegment>, TieredStoreProvider {
     private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
     private volatile boolean closed = false;
     private final ReentrantLock bufferLock = new ReentrantLock();
@@ -271,6 +271,11 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
                 new TieredStoreException(TieredStoreErrorCode.ILLEGAL_PARAM, "length is zero"));
             return future;
         }
+        if (position >= commitPosition) {
+            future.completeExceptionally(
+                new TieredStoreException(TieredStoreErrorCode.ILLEGAL_PARAM, "position is illegal"));
+            return future;
+        }
         if (position + length > commitPosition) {
             logger.warn("TieredFileSegment#readAsync request position + length is greater than commit position," +
                     " correct length using commit position, file: {}, request position: {}, commit position:{}, change length from {} to {}",
@@ -523,26 +528,4 @@ public abstract class TieredFileSegment implements Comparable<TieredFileSegment>
             return codaBuffer.get() & 0xff;
         }
     }
-
-    @Override
-    public abstract String getPath();
-
-    @Override
-    public abstract long getSize();
-
-    @Override
-    public abstract boolean exists();
-
-    @Override
-    public abstract void createFile();
-
-    @Override
-    public abstract void destroyFile();
-
-    @Override
-    public abstract CompletableFuture<ByteBuffer> read0(long position, int length);
-
-    @Override
-    public abstract CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position,
-        int length, boolean append);
 }
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreBackendProvider.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
similarity index 98%
rename from tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreBackendProvider.java
rename to tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
index cda701026..081143ce8 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreBackendProvider.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java
@@ -19,7 +19,7 @@ package org.apache.rocketmq.tieredstore.provider;
 import java.nio.ByteBuffer;
 import java.util.concurrent.CompletableFuture;
 
-public interface TieredStoreBackendProvider {
+public interface TieredStoreProvider {
     /**
      * Get file path in backend file system
      *
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
new file mode 100644
index 000000000..9def6bd29
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.provider.posix;
+
+import com.google.common.base.Stopwatch;
+import com.google.common.io.ByteStreams;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.common.TieredStoreExecutor;
+import org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsManager;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.apache.rocketmq.tieredstore.util.TieredStoreUtil;
+
+import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_FILE_TYPE;
+import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_OPERATION;
+import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_SUCCESS;
+import static org.apache.rocketmq.tieredstore.metrics.TieredStoreMetricsConstant.LABEL_TOPIC;
+
+/**
+ * this class is experimental and may change without notice.
+ */
+public class PosixFileSegment extends TieredFileSegment {
+    private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+    private static final String OPERATION_POSIX_READ = "read";
+    private static final String OPERATION_POSIX_WRITE = "write";
+
+    private final String basePath;
+    private final String filepath;
+
+    private volatile File file;
+    private volatile FileChannel readFileChannel;
+    private volatile FileChannel writeFileChannel;
+
+    public PosixFileSegment(FileSegmentType fileType, MessageQueue messageQueue,
+        long baseOffset, TieredMessageStoreConfig storeConfig) {
+        super(fileType, messageQueue, baseOffset, storeConfig);
+
+        String basePath = storeConfig.getTieredStoreFilepath();
+        if (StringUtils.isBlank(basePath) || basePath.endsWith(File.separator)) {
+            this.basePath = basePath;
+        } else {
+            this.basePath = basePath + File.separator;
+        }
+        this.filepath = this.basePath
+            + TieredStoreUtil.getHash(storeConfig.getBrokerClusterName()) + "_" + storeConfig.getBrokerClusterName() + File.separator
+            + messageQueue.getBrokerName() + File.separator
+            + messageQueue.getTopic() + File.separator
+            + messageQueue.getQueueId() + File.separator
+            + fileType + File.separator
+            + TieredStoreUtil.offset2FileName(baseOffset);
+        createFile();
+    }
+
+    protected AttributesBuilder newAttributesBuilder() {
+        return TieredStoreMetricsManager.newAttributesBuilder()
+            .put(LABEL_TOPIC, messageQueue.getTopic())
+            .put(LABEL_FILE_TYPE, fileType.name().toLowerCase());
+    }
+
+    @Override
+    public String getPath() {
+        return filepath;
+    }
+
+    @Override
+    public long getSize() {
+        if (exists()) {
+            return file.length();
+        }
+        return -1;
+    }
+
+    @Override
+    public boolean exists() {
+        return file != null && file.exists();
+    }
+
+    @Override
+    public void createFile() {
+        if (file == null) {
+            synchronized (this) {
+                if (file == null) {
+                    File file = new File(filepath);
+                    try {
+                        File dir = file.getParentFile();
+                        if (!dir.exists()) {
+                            dir.mkdirs();
+                        }
+
+                        // TODO use direct IO to avoid polluting the page cache
+                        file.createNewFile();
+                        this.readFileChannel = new RandomAccessFile(file, "r").getChannel();
+                        this.writeFileChannel = new RandomAccessFile(file, "rwd").getChannel();
+                        this.file = file;
+                    } catch (Exception e) {
+                        logger.error("PosixFileSegment#createFile: create file {} failed: ", filepath, e);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void destroyFile() {
+        if (file.exists()) {
+            file.delete();
+        }
+
+        try {
+            if (readFileChannel != null && readFileChannel.isOpen()) {
+                readFileChannel.close();
+            }
+            if (writeFileChannel != null && writeFileChannel.isOpen()) {
+                writeFileChannel.close();
+            }
+        } catch (IOException e) {
+            logger.error("PosixFileSegment#destroyFile: destroy file {} failed: ", filepath, e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<ByteBuffer> read0(long position, int length) {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        AttributesBuilder attributesBuilder = newAttributesBuilder()
+            .put(LABEL_OPERATION, OPERATION_POSIX_READ);
+
+        CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
+        ByteBuffer byteBuffer = ByteBuffer.allocate(length);
+        try {
+            readFileChannel.position(position);
+            readFileChannel.read(byteBuffer);
+            byteBuffer.flip();
+
+            attributesBuilder.put(LABEL_SUCCESS, true);
+            long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
+            TieredStoreMetricsManager.providerRpcLatency.record(costTime, attributesBuilder.build());
+
+            Attributes metricsAttributes = newAttributesBuilder()
+                .put(LABEL_OPERATION, OPERATION_POSIX_READ)
+                .build();
+            int downloadedBytes = byteBuffer.remaining();
+            TieredStoreMetricsManager.downloadBytes.record(downloadedBytes, metricsAttributes);
+
+            future.complete(byteBuffer);
+        } catch (IOException e) {
+            long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
+            attributesBuilder.put(LABEL_SUCCESS, false);
+            TieredStoreMetricsManager.providerRpcLatency.record(costTime, attributesBuilder.build());
+            logger.error("PosixFileSegment#read0: read file {} failed: position: {}, length: {}",
+                filepath, position, length, e);
+            future.completeExceptionally(e);
+        }
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length,
+        boolean append) {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        AttributesBuilder attributesBuilder = newAttributesBuilder()
+            .put(LABEL_OPERATION, OPERATION_POSIX_WRITE);
+
+        CompletableFuture<Boolean> future = new CompletableFuture<>();
+        try {
+            TieredStoreExecutor.COMMIT_EXECUTOR.execute(() -> {
+                try {
+                    byte[] byteArray = ByteStreams.toByteArray(inputStream);
+                    if (byteArray.length != length) {
+                        logger.error("PosixFileSegment#commit0: append file {} failed: real data size: {}, is not equal to length: {}",
+                            filepath, byteArray.length, length);
+                        future.complete(false);
+                        return;
+                    }
+                    writeFileChannel.position(position);
+                    ByteBuffer buffer = ByteBuffer.wrap(byteArray);
+                    while (buffer.hasRemaining()) {
+                        writeFileChannel.write(buffer);
+                    }
+
+                    attributesBuilder.put(LABEL_SUCCESS, true);
+                    long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
+                    TieredStoreMetricsManager.providerRpcLatency.record(costTime, attributesBuilder.build());
+
+                    Attributes metricsAttributes = newAttributesBuilder()
+                        .put(LABEL_OPERATION, OPERATION_POSIX_WRITE)
+                        .build();
+                    TieredStoreMetricsManager.uploadBytes.record(length, metricsAttributes);
+
+                    future.complete(true);
+                } catch (Exception e) {
+                    long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
+                    attributesBuilder.put(LABEL_SUCCESS, false);
+                    TieredStoreMetricsManager.providerRpcLatency.record(costTime, attributesBuilder.build());
+
+                    logger.error("PosixFileSegment#commit0: append file {} failed: position: {}, length: {}",
+                        filepath, position, length, e);
+                    future.completeExceptionally(e);
+                }
+            });
+        } catch (Exception e) {
+            // commit task cannot be executed
+            long costTime = stopwatch.stop().elapsed(TimeUnit.MILLISECONDS);
+            attributesBuilder.put(LABEL_SUCCESS, false);
+            TieredStoreMetricsManager.providerRpcLatency.record(costTime, attributesBuilder.build());
+
+            future.completeExceptionally(e);
+        }
+        return future;
+    }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/TieredStoreUtil.java b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/TieredStoreUtil.java
index 54e0a0ee4..c41e5a48e 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/TieredStoreUtil.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/util/TieredStoreUtil.java
@@ -42,7 +42,7 @@ public class TieredStoreUtil {
     public static final long PB = TB << 10;
     public static final long EB = PB << 10;
 
-    public static final String TIERED_STORE_LOGGER_NAME = "RocketMQTieredStore";
+    public static final String TIERED_STORE_LOGGER_NAME = "RocketmqTieredStore";
     public static final String RMQ_SYS_TIERED_STORE_INDEX_TOPIC = "rmq_sys_INDEX";
     public final static int MSG_ID_LENGTH = 8 + 8;
 
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
index 1134729e0..9dd94ccf6 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/TieredMessageFetcherTest.java
@@ -36,6 +36,7 @@ import org.apache.rocketmq.tieredstore.common.BoundaryType;
 import org.apache.rocketmq.tieredstore.common.SelectMappedBufferResultWrapper;
 import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
 import org.apache.rocketmq.tieredstore.container.TieredContainerManager;
+import org.apache.rocketmq.tieredstore.container.TieredIndexFile;
 import org.apache.rocketmq.tieredstore.container.TieredMessageQueueContainer;
 import org.apache.rocketmq.tieredstore.metadata.TieredMetadataStore;
 import org.apache.rocketmq.tieredstore.mock.MemoryFileSegment;
@@ -282,6 +283,8 @@ public class TieredMessageFetcherTest {
         request = new DispatchRequest(mq.getTopic(), mq.getQueueId(), MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN, 0, 0, 0, "", "another-key", 0, 0, null);
         container.appendIndexFile(request);
         container.commit(true);
+        TieredIndexFile indexFile = TieredContainerManager.getIndexFile(storeConfig);
+        indexFile.commit(true);
         Assert.assertEquals(1, fetcher.queryMessageAsync(mq.getTopic(), "key", 1, 0, Long.MAX_VALUE).join().getMessageMapedList().size());
 
         QueryMessageResult result = fetcher.queryMessageAsync(mq.getTopic(), "key", 32, 0, Long.MAX_VALUE).join();
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredIndexFileTest.java
index c30ee2a55..0824cf35d 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredIndexFileTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/container/TieredIndexFileTest.java
@@ -35,6 +35,7 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 public class TieredIndexFileTest {
@@ -61,6 +62,7 @@ public class TieredIndexFileTest {
 //        metadataStore.reLoadStore();
     }
 
+    @Ignore
     @Test
     public void testAppendAndQuery() throws IOException, ClassNotFoundException, NoSuchMethodException {
         // skip this test on windows
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metadata/MetadataStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metadata/MetadataStoreTest.java
index 45a3a6b7a..4832d1246 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metadata/MetadataStoreTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/metadata/MetadataStoreTest.java
@@ -19,7 +19,9 @@ package org.apache.rocketmq.tieredstore.metadata;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.io.FileUtils;
@@ -35,7 +37,9 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class MetadataStoreTest {
-    MessageQueue mq;
+    MessageQueue mq0;
+    MessageQueue mq1;
+    MessageQueue mq2;
     TieredMessageStoreConfig storeConfig;
     TieredMetadataStore metadataStore;
 
@@ -43,7 +47,9 @@ public class MetadataStoreTest {
     public void setUp() {
         storeConfig = new TieredMessageStoreConfig();
         storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID());
-        mq = new MessageQueue("MetadataStoreTest", storeConfig.getBrokerName(), 1);
+        mq0 = new MessageQueue("MetadataStoreTest0", storeConfig.getBrokerName(), 0);
+        mq1 = new MessageQueue("MetadataStoreTest1", storeConfig.getBrokerName(), 0);
+        mq2 = new MessageQueue("MetadataStoreTest1", storeConfig.getBrokerName(), 1);
         metadataStore = new TieredMetadataManager(storeConfig);
     }
 
@@ -55,10 +61,10 @@ public class MetadataStoreTest {
 
     @Test
     public void testQueue() {
-        QueueMetadata queueMetadata = metadataStore.getQueue(mq);
+        QueueMetadata queueMetadata = metadataStore.getQueue(mq0);
         Assert.assertNull(queueMetadata);
 
-        queueMetadata = metadataStore.addQueue(mq, -1);
+        queueMetadata = metadataStore.addQueue(mq0, -1);
         Assert.assertEquals(queueMetadata.getMinOffset(), -1);
         Assert.assertEquals(queueMetadata.getMaxOffset(), -1);
 
@@ -66,49 +72,49 @@ public class MetadataStoreTest {
         queueMetadata.setMinOffset(0);
         queueMetadata.setMaxOffset(0);
         metadataStore.updateQueue(queueMetadata);
-        queueMetadata = metadataStore.getQueue(mq);
+        queueMetadata = metadataStore.getQueue(mq0);
         Assert.assertTrue(queueMetadata.getUpdateTimestamp() >= currentTimeMillis);
         Assert.assertEquals(queueMetadata.getMinOffset(), 0);
         Assert.assertEquals(queueMetadata.getMaxOffset(), 0);
 
-        MessageQueue mq2 = new MessageQueue("MetadataStoreTest", storeConfig.getBrokerName(), 2);
+        MessageQueue mq2 = new MessageQueue(mq0.getTopic(), storeConfig.getBrokerName(), 2);
         metadataStore.addQueue(mq2, 1);
         AtomicInteger i = new AtomicInteger(0);
-        metadataStore.iterateQueue(mq.getTopic(), metadata -> {
+        metadataStore.iterateQueue(mq0.getTopic(), metadata -> {
             Assert.assertEquals(i.get(), metadata.getMinOffset());
             i.getAndIncrement();
         });
         Assert.assertEquals(i.get(), 2);
 
-        metadataStore.deleteQueue(mq);
-        queueMetadata = metadataStore.getQueue(mq);
+        metadataStore.deleteQueue(mq0);
+        queueMetadata = metadataStore.getQueue(mq0);
         Assert.assertNull(queueMetadata);
     }
 
     @Test
     public void testTopic() {
-        TopicMetadata topicMetadata = metadataStore.getTopic(mq.getTopic());
+        TopicMetadata topicMetadata = metadataStore.getTopic(mq0.getTopic());
         Assert.assertNull(topicMetadata);
 
-        metadataStore.addTopic(mq.getTopic(), 2);
-        topicMetadata = metadataStore.getTopic(mq.getTopic());
-        Assert.assertEquals(mq.getTopic(), topicMetadata.getTopic());
+        metadataStore.addTopic(mq0.getTopic(), 2);
+        topicMetadata = metadataStore.getTopic(mq0.getTopic());
+        Assert.assertEquals(mq0.getTopic(), topicMetadata.getTopic());
         Assert.assertEquals(topicMetadata.getStatus(), 0);
         Assert.assertEquals(topicMetadata.getReserveTime(), 2);
         Assert.assertEquals(topicMetadata.getTopicId(), 0);
 
-        metadataStore.updateTopicStatus(mq.getTopic(), 1);
-        metadataStore.updateTopicReserveTime(mq.getTopic(), 0);
-        topicMetadata = metadataStore.getTopic(mq.getTopic());
+        metadataStore.updateTopicStatus(mq0.getTopic(), 1);
+        metadataStore.updateTopicReserveTime(mq0.getTopic(), 0);
+        topicMetadata = metadataStore.getTopic(mq0.getTopic());
         Assert.assertNotNull(topicMetadata);
         Assert.assertEquals(topicMetadata.getStatus(), 1);
         Assert.assertEquals(topicMetadata.getReserveTime(), 0);
 
-        metadataStore.addTopic(mq.getTopic() + "1", 1);
-        metadataStore.updateTopicStatus(mq.getTopic() + "1", 2);
+        metadataStore.addTopic(mq0.getTopic() + "1", 1);
+        metadataStore.updateTopicStatus(mq0.getTopic() + "1", 2);
 
-        metadataStore.addTopic(mq.getTopic() + "2", 2);
-        metadataStore.updateTopicStatus(mq.getTopic() + "2", 3);
+        metadataStore.addTopic(mq0.getTopic() + "2", 2);
+        metadataStore.updateTopicStatus(mq0.getTopic() + "2", 3);
 
         AtomicInteger n = new AtomicInteger();
         metadataStore.iterateTopic(metadata -> {
@@ -122,21 +128,19 @@ public class MetadataStoreTest {
         });
         Assert.assertEquals(3, n.get());
 
-        Assert.assertNull(metadataStore.getTopic(mq.getTopic() + "2"));
+        Assert.assertNull(metadataStore.getTopic(mq0.getTopic() + "2"));
 
-        Assert.assertNotNull(metadataStore.getTopic(mq.getTopic()));
-        Assert.assertNotNull(metadataStore.getTopic(mq.getTopic() + "1"));
+        Assert.assertNotNull(metadataStore.getTopic(mq0.getTopic()));
+        Assert.assertNotNull(metadataStore.getTopic(mq0.getTopic() + "1"));
     }
 
     @Test
     public void testFileSegment() {
         MemoryFileSegment fileSegment1 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG,
-            mq,
-            100,
-            storeConfig);
+            mq0, 100, storeConfig);
         fileSegment1.initPosition(fileSegment1.getSize());
         FileSegmentMetadata metadata1 = metadataStore.updateFileSegment(fileSegment1);
-        Assert.assertEquals(mq, metadata1.getQueue());
+        Assert.assertEquals(mq0, metadata1.getQueue());
         Assert.assertEquals(TieredFileSegment.FileSegmentType.COMMIT_LOG, TieredFileSegment.FileSegmentType.valueOf(metadata1.getType()));
         Assert.assertEquals(100, metadata1.getBaseOffset());
         Assert.assertEquals(0, metadata1.getSealTimestamp());
@@ -152,12 +156,10 @@ public class MetadataStoreTest {
         Assert.assertTrue(metadata1.getSealTimestamp() > 0);
 
         MemoryFileSegment fileSegment2 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG,
-            mq,
-            1100,
-            storeConfig);
+            mq0, 1100, storeConfig);
         metadataStore.updateFileSegment(fileSegment2);
         List<FileSegmentMetadata> list = new ArrayList<>();
-        metadataStore.iterateFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG, "MetadataStoreTest", 1, list::add);
+        metadataStore.iterateFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG, mq0.getTopic(), mq0.getQueueId(), list::add);
         Assert.assertEquals(2, list.size());
         Assert.assertEquals(100, list.get(0).getBaseOffset());
         Assert.assertEquals(1100, list.get(1).getBaseOffset());
@@ -170,19 +172,57 @@ public class MetadataStoreTest {
     @Test
     public void testReload() {
         TieredMetadataManager metadataManager = (TieredMetadataManager) metadataStore;
-        metadataManager.addTopic(mq.getTopic(), 1);
-        metadataManager.addQueue(mq, 2);
+        metadataManager.addTopic(mq0.getTopic(), 1);
+        metadataManager.addTopic(mq1.getTopic(), 2);
+
+        metadataManager.addQueue(mq0, 2);
+        metadataManager.addQueue(mq1, 4);
+        metadataManager.addQueue(mq2, 8);
+
+
+        MemoryFileSegment fileSegment = new MemoryFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG,
+            mq0, 100, storeConfig);
+        metadataStore.updateFileSegment(fileSegment);
+
+        fileSegment = new MemoryFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG,
+            mq0, 200, storeConfig);
+        metadataStore.updateFileSegment(fileSegment);
+
         Assert.assertTrue(new File(metadataManager.configFilePath()).exists());
 
         metadataManager = new TieredMetadataManager(storeConfig);
-        metadataManager.load();
 
-        TopicMetadata topicMetadata = metadataManager.getTopic(mq.getTopic());
+        TopicMetadata topicMetadata = metadataManager.getTopic(mq0.getTopic());
         Assert.assertNotNull(topicMetadata);
         Assert.assertEquals(topicMetadata.getReserveTime(), 1);
 
-        QueueMetadata queueMetadata = metadataManager.getQueue(mq);
+        topicMetadata = metadataManager.getTopic(mq1.getTopic());
+        Assert.assertNotNull(topicMetadata);
+        Assert.assertEquals(topicMetadata.getReserveTime(), 2);
+
+        QueueMetadata queueMetadata = metadataManager.getQueue(mq0);
         Assert.assertNotNull(queueMetadata);
+        Assert.assertEquals(mq0, queueMetadata.getQueue());
         Assert.assertEquals(queueMetadata.getMinOffset(), 2);
+
+        queueMetadata = metadataManager.getQueue(mq1);
+        Assert.assertNotNull(queueMetadata);
+        Assert.assertEquals(mq1, queueMetadata.getQueue());
+        Assert.assertEquals(queueMetadata.getMinOffset(), 4);
+
+        queueMetadata = metadataManager.getQueue(mq2);
+        Assert.assertNotNull(queueMetadata);
+        Assert.assertEquals(mq2, queueMetadata.getQueue());
+        Assert.assertEquals(queueMetadata.getMinOffset(), 8);
+
+        Map<Long, FileSegmentMetadata> map = new HashMap<>();
+        metadataManager.iterateFileSegment(metadata -> map.put(metadata.getBaseOffset(), metadata));
+        FileSegmentMetadata fileSegmentMetadata = map.get(100L);
+        Assert.assertNotNull(fileSegmentMetadata);
+        Assert.assertEquals(mq0, fileSegmentMetadata.getQueue());
+
+        fileSegmentMetadata = map.get(200L);
+        Assert.assertNotNull(fileSegmentMetadata);
+        Assert.assertEquals(mq0, fileSegmentMetadata.getQueue());
     }
 }
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
new file mode 100644
index 000000000..0f2ee2f37
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegmentTest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.tieredstore.provider.posix;
+
+import com.google.common.io.ByteStreams;
+import com.google.common.io.Files;
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Random;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.tieredstore.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.tieredstore.provider.TieredFileSegment;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PosixFileSegmentTest {
+    TieredMessageStoreConfig storeConfig;
+    MessageQueue mq;
+
+    @Before
+    public void setUp() {
+        storeConfig = new TieredMessageStoreConfig();
+        storeConfig.setTieredStoreFilepath(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID());
+        mq = new MessageQueue("OSSFileSegmentTest", "broker", 0);
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        FileUtils.deleteDirectory(new File(FileUtils.getTempDirectory() + File.separator + "tiered_store_unit_test" + UUID.randomUUID()));
+    }
+
+    @Test
+    public void testCommitAndRead() throws IOException {
+        PosixFileSegment fileSegment = new PosixFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE, mq, 0, storeConfig);
+        byte[] source = new byte[4096];
+        new Random().nextBytes(source);
+        ByteBuffer buffer = ByteBuffer.wrap(source);
+        fileSegment.append(buffer, 0);
+        fileSegment.commit();
+
+        File file = new File(fileSegment.getPath());
+        Assert.assertTrue(file.exists());
+        byte[] result = new byte[4096];
+        ByteStreams.read(Files.asByteSource(file).openStream(), result, 0, 4096);
+        Assert.assertArrayEquals(source, result);
+
+        ByteBuffer read = fileSegment.read(0, 4096);
+        Assert.assertArrayEquals(source, read.array());
+    }
+}
diff --git a/tieredstore/tiered_storage_arch.png b/tieredstore/tiered_storage_arch.png
new file mode 100644
index 000000000..05efac726
Binary files /dev/null and b/tieredstore/tiered_storage_arch.png differ