You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by lo...@apache.org on 2023/01/16 09:59:30 UTC

[rocketmq] branch develop updated: [ISSUE #5874] implement file queue for tiered storage (#5875)

This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 970021128 [ISSUE #5874] implement file queue for tiered storage (#5875)
970021128 is described below

commit 9700211285d65b33e82409c086aed5a31fdf97ca
Author: SSpirits <ad...@lv5.moe>
AuthorDate: Mon Jan 16 17:59:09 2023 +0800

    [ISSUE #5874] implement file queue for tiered storage (#5875)
    
    * implement file queue for tiered storage
    
    * fix bazel
    
    * suppress error found by spotbugs
    
    * fix tests in windows
    
    * fixed according to comment
    
    * fix warning in bazel build
---
 style/spotbugs-suppressions.xml                    |   5 +
 tieredstore/BUILD.bazel                            |   6 +
 tieredstore/pom.xml                                |   6 +
 ...edMessageStoreConfig.java => AppendResult.java} |  20 +-
 ...edMessageStoreConfig.java => BoundaryType.java} |  29 +-
 .../tiered/common/TieredMessageStoreConfig.java    | 293 +++++++++++
 .../store/tiered/common/TieredStoreExecutor.java   |  93 ++++
 .../store/tiered/container/TieredCommitLog.java    | 119 +++++
 .../store/tiered/container/TieredConsumeQueue.java | 111 +++++
 .../store/tiered/container/TieredFileQueue.java    | 519 ++++++++++++++++++++
 .../store/tiered/container/TieredFileSegment.java  | 538 +++++++++++++++++++++
 .../store/tiered/container/TieredIndexFile.java    | 427 ++++++++++++++++
 .../TieredStoreErrorCode.java}                     |  22 +-
 .../tiered/exception/TieredStoreException.java     |  63 +++
 .../tiered/metadata/TieredMetadataManager.java     | 321 ++++++++++++
 ...er.java => TieredMetadataSerializeWrapper.java} |   2 +-
 ...MetadataStore.java => TieredMetadataStore.java} |  48 +-
 .../metadata/TieredStoreMetadataManager.java       | 167 -------
 .../CQItemBufferUtil.java}                         |  18 +-
 .../store/tiered/util/MessageBufferUtil.java       | 165 +++++++
 .../store/tiered/util/TieredStoreUtil.java         | 157 ++++++
 .../tiered/container/TieredFileQueueTest.java      | 233 +++++++++
 .../tiered/container/TieredFileSegmentTest.java    | 153 ++++++
 .../tiered/container/TieredIndexFileTest.java      | 130 +++++
 .../store/tiered/metadata/MetadataStoreTest.java   |  58 ++-
 .../store/tiered/mock/MemoryFileSegment.java       | 114 +++++
 .../store/tiered/util/CQItemBufferUtilTest.java    |  51 ++
 .../store/tiered/util/MessageBufferUtilTest.java   | 243 ++++++++++
 .../store/tiered/util/TieredStoreUtilTest.java     |  59 +++
 tieredstore/src/test/resources/logback.xml         |  29 ++
 30 files changed, 3984 insertions(+), 215 deletions(-)

diff --git a/style/spotbugs-suppressions.xml b/style/spotbugs-suppressions.xml
index 607080cfb..fa6d85508 100644
--- a/style/spotbugs-suppressions.xml
+++ b/style/spotbugs-suppressions.xml
@@ -30,6 +30,11 @@
         <Method name="indexKeyHashMethod" />
         <Bug pattern="RV_ABSOLUTE_VALUE_OF_HASHCODE"/>
     </Match>
+    <Match>
+        <Class name="org.apache.rocketmq.store.tiered.container.TieredIndexFile"/>
+        <Method name="indexKeyHashMethod" />
+        <Bug pattern="RV_ABSOLUTE_VALUE_OF_HASHCODE"/>
+    </Match>
     <Match>
         <Class name="org.apache.rocketmq.broker.transaction.queue.DefaultTransactionalMessageCheckListener"/>
         <Method name="toMessageExtBrokerInner" />
diff --git a/tieredstore/BUILD.bazel b/tieredstore/BUILD.bazel
index 5a0176e89..fb9fadfdd 100644
--- a/tieredstore/BUILD.bazel
+++ b/tieredstore/BUILD.bazel
@@ -25,6 +25,10 @@ java_library(
         "//remoting",
         "//store",
         "@maven//:com_google_code_findbugs_jsr305",
+        "@maven//:com_google_guava_guava",
+        "@maven//:io_github_aliyunmq_rocketmq_slf4j_api",
+        "@maven//:io_github_aliyunmq_rocketmq_logback_classic",
+        "@maven//:org_apache_commons_commons_lang3",
         "@maven//:org_apache_tomcat_annotations_api",
     ],
 )
@@ -38,7 +42,9 @@ java_library(
         ":tieredstore",
         "//:test_deps",
         "//common",
+        "//store",
         "@maven//:commons_io_commons_io",
+        "@maven//:org_apache_commons_commons_lang3",
     ],
 )
 
diff --git a/tieredstore/pom.xml b/tieredstore/pom.xml
index c983e3f08..ff7435ce0 100644
--- a/tieredstore/pom.xml
+++ b/tieredstore/pom.xml
@@ -37,6 +37,12 @@
             <artifactId>rocketmq-store</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/AppendResult.java
similarity index 70%
copy from tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
copy to tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/AppendResult.java
index c85317177..0dd5e9a8e 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/AppendResult.java
@@ -16,16 +16,12 @@
  */
 package org.apache.rocketmq.store.tiered.common;
 
-import java.io.File;
-
-public class TieredMessageStoreConfig {
-    private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
-
-    public String getStorePathRootDir() {
-        return storePathRootDir;
-    }
-
-    public void setStorePathRootDir(String storePathRootDir) {
-        this.storePathRootDir = storePathRootDir;
-    }
+public enum AppendResult {
+    SUCCESS,
+    OFFSET_INCORRECT,
+    BUFFER_FULL,
+    FILE_FULL,
+    IO_ERROR,
+    FILE_CLOSE,
+    UNKNOWN_ERROR
 }
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/BoundaryType.java
similarity index 62%
copy from tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
copy to tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/BoundaryType.java
index c85317177..0e78f1211 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/BoundaryType.java
@@ -16,16 +16,31 @@
  */
 package org.apache.rocketmq.store.tiered.common;
 
-import java.io.File;
+public enum BoundaryType {
+    /**
+     * Indicate that lower boundary is expected.
+     */
+    LOWER("lower"),
 
-public class TieredMessageStoreConfig {
-    private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
+    /**
+     * Indicate that upper boundary is expected.
+     */
+    UPPER("upper");
 
-    public String getStorePathRootDir() {
-        return storePathRootDir;
+    private String name;
+
+    BoundaryType(String name) {
+        this.name = name;
+    }
+
+    public String getName() {
+        return name;
     }
 
-    public void setStorePathRootDir(String storePathRootDir) {
-        this.storePathRootDir = storePathRootDir;
+    public static BoundaryType getType(String name) {
+        if (BoundaryType.UPPER.getName().equalsIgnoreCase(name)) {
+            return UPPER;
+        }
+        return LOWER;
     }
 }
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
index c85317177..f7712c41d 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
@@ -17,9 +17,126 @@
 package org.apache.rocketmq.store.tiered.common;
 
 import java.io.File;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 
 public class TieredMessageStoreConfig {
+    private String brokerName = localHostName();
+    private String brokerClusterName = "DefaultCluster";
+    private TieredStorageLevel tieredStorageLevel = TieredStorageLevel.NOT_IN_DISK;
+    public enum TieredStorageLevel {
+        DISABLE(0),
+        NOT_IN_DISK(1),
+        NOT_IN_MEM(2),
+        FORCE(3);
+
+        private final int value;
+
+        TieredStorageLevel(int value) {
+            this.value = value;
+        }
+
+        public int getValue() {
+            return value;
+        }
+
+        public static TieredStorageLevel valueOf(int value) {
+            switch (value) {
+                case 1:
+                    return NOT_IN_DISK;
+                case 2:
+                    return NOT_IN_MEM;
+                case 3:
+                    return FORCE;
+                default:
+                    return DISABLE;
+            }
+        }
+
+        public boolean isEnable() {
+            return this.value > 0;
+        }
+
+        public boolean check(TieredStorageLevel targetLevel) {
+            return this.value >= targetLevel.value;
+        }
+    }
+
     private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
+    private boolean messageIndexEnable = true;
+
+    // CommitLog file size, default is 1G
+    private long tieredStoreCommitLogMaxSize = 1024 * 1024 * 1024;
+    // ConsumeQueue file size, default is 100M
+    private long tieredStoreConsumeQueueMaxSize = 100 * 1024 * 1024;
+    private int tieredStoreIndexFileMaxHashSlotNum = 5000000;
+    private int tieredStoreIndexFileMaxIndexNum = 5000000 * 4;
+    // index file will force rolling to next file after idle specified time, default is 3h
+    private int tieredStoreIndexFileRollingIdleInterval = 3 * 60 * 60 * 1000;
+    private String tieredMetadataServiceProvider = "org.apache.rocketmq.store.tiered.metadata.TieredMetadataManager";
+    private String tieredBackendServiceProvider = "";
+    // file reserved time, default is 72 hour
+    private int tieredStoreFileReservedTime = 72;
+    // time of forcing commitLog to roll to next file, default is 24 hour
+    private int commitLogRollingInterval = 24;
+    // rolling will only happen if file segment size is larger than commitLogRollingMinimumSize, default is 128M
+    private int commitLogRollingMinimumSize = 128 * 1024 * 1024;
+    // default is 100, unit is millisecond
+    private int maxCommitJitter = 100;
+    // Cached message count larger than this value will trigger async commit. default is 1000
+    private int tieredStoreGroupCommitCount = 2500;
+    // Cached message size larger than this value will trigger async commit. default is 32M
+    private int tieredStoreGroupCommitSize = 32 * 1024 * 1024;
+    // Cached message count larger than this value will suspend append. default is 2000
+    private int tieredStoreMaxGroupCommitCount = 10000;
+    private int readAheadMinFactor  = 2;
+    private int readAheadMaxFactor = 24;
+    private int readAheadBatchSizeFactorThreshold = 8;
+    private int readAheadMessageCountThreshold = 2048;
+    private int readAheadMessageSizeThreshold = 128 * 1024 * 1024;
+    private long readAheadCacheExpireDuration = 10 * 1000;
+    private double readAheadCacheSizeThresholdRate = 0.3;
+
+    public static String localHostName() {
+        try {
+            return InetAddress.getLocalHost().getHostName();
+        } catch (UnknownHostException ignore) {
+        }
+
+        return "DEFAULT_BROKER";
+    }
+
+    public String getBrokerName() {
+        return brokerName;
+    }
+
+    public void setBrokerName(String brokerName) {
+        this.brokerName = brokerName;
+    }
+
+    public String getBrokerClusterName() {
+        return brokerClusterName;
+    }
+
+    public void setBrokerClusterName(String brokerClusterName) {
+        this.brokerClusterName = brokerClusterName;
+    }
+
+    public TieredStorageLevel getTieredStorageLevel() {
+        return tieredStorageLevel;
+    }
+
+    public void setTieredStorageLevel(TieredStorageLevel tieredStorageLevel) {
+        this.tieredStorageLevel = tieredStorageLevel;
+    }
+
+    public void setTieredStorageLevel(int tieredStorageLevel) {
+        this.tieredStorageLevel = TieredStorageLevel.valueOf(tieredStorageLevel);
+    }
+
+    public void setTieredStorageLevel(String tieredStorageLevel) {
+        this.tieredStorageLevel = TieredStorageLevel.valueOf(tieredStorageLevel);
+    }
 
     public String getStorePathRootDir() {
         return storePathRootDir;
@@ -28,4 +145,180 @@ public class TieredMessageStoreConfig {
     public void setStorePathRootDir(String storePathRootDir) {
         this.storePathRootDir = storePathRootDir;
     }
+
+    public boolean isMessageIndexEnable() {
+        return messageIndexEnable;
+    }
+
+    public void setMessageIndexEnable(boolean messageIndexEnable) {
+        this.messageIndexEnable = messageIndexEnable;
+    }
+
+    public long getTieredStoreCommitLogMaxSize() {
+        return tieredStoreCommitLogMaxSize;
+    }
+
+    public void setTieredStoreCommitLogMaxSize(long tieredStoreCommitLogMaxSize) {
+        this.tieredStoreCommitLogMaxSize = tieredStoreCommitLogMaxSize;
+    }
+
+    public long getTieredStoreConsumeQueueMaxSize() {
+        return tieredStoreConsumeQueueMaxSize;
+    }
+
+    public void setTieredStoreConsumeQueueMaxSize(long tieredStoreConsumeQueueMaxSize) {
+        this.tieredStoreConsumeQueueMaxSize = tieredStoreConsumeQueueMaxSize;
+    }
+
+    public int getTieredStoreIndexFileMaxHashSlotNum() {
+        return tieredStoreIndexFileMaxHashSlotNum;
+    }
+
+    public void setTieredStoreIndexFileMaxHashSlotNum(int tieredStoreIndexFileMaxHashSlotNum) {
+        this.tieredStoreIndexFileMaxHashSlotNum = tieredStoreIndexFileMaxHashSlotNum;
+    }
+
+    public int getTieredStoreIndexFileMaxIndexNum() {
+        return tieredStoreIndexFileMaxIndexNum;
+    }
+
+    public void setTieredStoreIndexFileMaxIndexNum(int tieredStoreIndexFileMaxIndexNum) {
+        this.tieredStoreIndexFileMaxIndexNum = tieredStoreIndexFileMaxIndexNum;
+    }
+
+    public int getTieredStoreIndexFileRollingIdleInterval() {
+        return tieredStoreIndexFileRollingIdleInterval;
+    }
+
+    public void setTieredStoreIndexFileRollingIdleInterval(int tieredStoreIndexFileRollingIdleInterval) {
+        this.tieredStoreIndexFileRollingIdleInterval = tieredStoreIndexFileRollingIdleInterval;
+    }
+
+    public String getTieredMetadataServiceProvider() {
+        return tieredMetadataServiceProvider;
+    }
+
+    public void setTieredMetadataServiceProvider(String tieredMetadataServiceProvider) {
+        this.tieredMetadataServiceProvider = tieredMetadataServiceProvider;
+    }
+
+    public String getTieredBackendServiceProvider() {
+        return tieredBackendServiceProvider;
+    }
+
+    public void setTieredBackendServiceProvider(String tieredBackendServiceProvider) {
+        this.tieredBackendServiceProvider = tieredBackendServiceProvider;
+    }
+
+    public int getTieredStoreFileReservedTime() {
+        return tieredStoreFileReservedTime;
+    }
+
+    public void setTieredStoreFileReservedTime(int tieredStoreFileReservedTime) {
+        this.tieredStoreFileReservedTime = tieredStoreFileReservedTime;
+    }
+
+    public int getCommitLogRollingInterval() {
+        return commitLogRollingInterval;
+    }
+
+    public void setCommitLogRollingInterval(int commitLogRollingInterval) {
+        this.commitLogRollingInterval = commitLogRollingInterval;
+    }
+
+    public int getCommitLogRollingMinimumSize() {
+        return commitLogRollingMinimumSize;
+    }
+
+    public void setCommitLogRollingMinimumSize(int commitLogRollingMinimumSize) {
+        this.commitLogRollingMinimumSize = commitLogRollingMinimumSize;
+    }
+
+    public int getMaxCommitJitter() {
+        return maxCommitJitter;
+    }
+
+    public void setMaxCommitJitter(int maxCommitJitter) {
+        this.maxCommitJitter = maxCommitJitter;
+    }
+
+    public int getTieredStoreGroupCommitCount() {
+        return tieredStoreGroupCommitCount;
+    }
+
+    public void setTieredStoreGroupCommitCount(int tieredStoreGroupCommitCount) {
+        this.tieredStoreGroupCommitCount = tieredStoreGroupCommitCount;
+    }
+
+    public int getTieredStoreGroupCommitSize() {
+        return tieredStoreGroupCommitSize;
+    }
+
+    public void setTieredStoreGroupCommitSize(int tieredStoreGroupCommitSize) {
+        this.tieredStoreGroupCommitSize = tieredStoreGroupCommitSize;
+    }
+
+    public int getTieredStoreMaxGroupCommitCount() {
+        return tieredStoreMaxGroupCommitCount;
+    }
+
+    public void setTieredStoreMaxGroupCommitCount(int tieredStoreMaxGroupCommitCount) {
+        this.tieredStoreMaxGroupCommitCount = tieredStoreMaxGroupCommitCount;
+    }
+
+    public int getReadAheadMinFactor() {
+        return readAheadMinFactor;
+    }
+
+    public void setReadAheadMinFactor(int readAheadMinFactor) {
+        this.readAheadMinFactor = readAheadMinFactor;
+    }
+
+    public int getReadAheadMaxFactor() {
+        return readAheadMaxFactor;
+    }
+
+    public int getReadAheadBatchSizeFactorThreshold() {
+        return readAheadBatchSizeFactorThreshold;
+    }
+
+    public void setReadAheadBatchSizeFactorThreshold(int readAheadBatchSizeFactorThreshold) {
+        this.readAheadBatchSizeFactorThreshold = readAheadBatchSizeFactorThreshold;
+    }
+
+    public void setReadAheadMaxFactor(int readAheadMaxFactor) {
+        this.readAheadMaxFactor = readAheadMaxFactor;
+    }
+
+    public int getReadAheadMessageCountThreshold() {
+        return readAheadMessageCountThreshold;
+    }
+
+    public void setReadAheadMessageCountThreshold(int readAheadMessageCountThreshold) {
+        this.readAheadMessageCountThreshold = readAheadMessageCountThreshold;
+    }
+
+    public int getReadAheadMessageSizeThreshold() {
+        return readAheadMessageSizeThreshold;
+    }
+
+    public void setReadAheadMessageSizeThreshold(int readAheadMessageSizeThreshold) {
+        this.readAheadMessageSizeThreshold = readAheadMessageSizeThreshold;
+    }
+
+    public long getReadAheadCacheExpireDuration() {
+        return readAheadCacheExpireDuration;
+    }
+
+    public void setReadAheadCacheExpireDuration(long duration) {
+        this.readAheadCacheExpireDuration = duration;
+    }
+
+    public double getReadAheadCacheSizeThresholdRate() {
+        return readAheadCacheSizeThresholdRate;
+    }
+
+    public void setReadAheadCacheSizeThresholdRate(double rate) {
+        this.readAheadCacheSizeThresholdRate = rate;
+    }
 }
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredStoreExecutor.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredStoreExecutor.java
new file mode 100644
index 000000000..3c56b3dc1
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredStoreExecutor.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.common;
+
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import org.apache.rocketmq.common.ThreadFactoryImpl;
+
+public class TieredStoreExecutor {
+    private static final int QUEUE_CAPACITY = 10000;
+    private static final BlockingQueue<Runnable> DISPATCH_THREAD_POOL_QUEUE;
+    public static final ExecutorService DISPATCH_EXECUTOR;
+    public static final ScheduledExecutorService COMMON_SCHEDULED_EXECUTOR;
+
+    public static final ScheduledExecutorService COMMIT_EXECUTOR;
+
+    public static final ScheduledExecutorService CLEAN_EXPIRED_FILE_EXECUTOR;
+
+    private static final BlockingQueue<Runnable> FETCH_DATA_THREAD_POOL_QUEUE;
+    public static final ExecutorService FETCH_DATA_EXECUTOR;
+
+    private static final BlockingQueue<Runnable> COMPACT_INDEX_FILE_THREAD_POOL_QUEUE;
+    public static final ExecutorService COMPACT_INDEX_FILE_EXECUTOR;
+
+    static {
+        DISPATCH_THREAD_POOL_QUEUE = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
+        DISPATCH_EXECUTOR = new ThreadPoolExecutor(
+            Math.max(2, Runtime.getRuntime().availableProcessors()),
+            Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            DISPATCH_THREAD_POOL_QUEUE,
+            new ThreadFactoryImpl("TieredCommonExecutor_"));
+
+        COMMON_SCHEDULED_EXECUTOR = new ScheduledThreadPoolExecutor(
+            Math.max(4, Runtime.getRuntime().availableProcessors()),
+            new ThreadFactoryImpl("TieredCommonScheduledExecutor_"));
+
+        COMMIT_EXECUTOR = new ScheduledThreadPoolExecutor(
+            Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
+            new ThreadFactoryImpl("TieredCommitExecutor_"));
+
+        CLEAN_EXPIRED_FILE_EXECUTOR = new ScheduledThreadPoolExecutor(
+            Math.max(4, Runtime.getRuntime().availableProcessors()),
+            new ThreadFactoryImpl("TieredCleanExpiredFileExecutor_"));
+
+        FETCH_DATA_THREAD_POOL_QUEUE = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
+        FETCH_DATA_EXECUTOR = new ThreadPoolExecutor(
+            Math.max(16, Runtime.getRuntime().availableProcessors() * 4),
+            Math.max(64, Runtime.getRuntime().availableProcessors() * 8),
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            FETCH_DATA_THREAD_POOL_QUEUE,
+            new ThreadFactoryImpl("TieredFetchDataExecutor_"));
+
+        COMPACT_INDEX_FILE_THREAD_POOL_QUEUE = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
+        COMPACT_INDEX_FILE_EXECUTOR = new ThreadPoolExecutor(
+            1,
+            1,
+            1000 * 60,
+            TimeUnit.MILLISECONDS,
+            COMPACT_INDEX_FILE_THREAD_POOL_QUEUE,
+            new ThreadFactoryImpl("TieredCompactIndexFileExecutor_"));
+    }
+
+    public static void shutdown() {
+        DISPATCH_EXECUTOR.shutdown();
+        COMMON_SCHEDULED_EXECUTOR.shutdown();
+        COMMIT_EXECUTOR.shutdown();
+        CLEAN_EXPIRED_FILE_EXECUTOR.shutdown();
+        FETCH_DATA_EXECUTOR.shutdown();
+        COMPACT_INDEX_FILE_EXECUTOR.shutdown();
+    }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredCommitLog.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredCommitLog.java
new file mode 100644
index 000000000..91e26de91
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredCommitLog.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.container;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.tiered.common.AppendResult;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.util.MessageBufferUtil;
+import org.apache.rocketmq.store.tiered.util.TieredStoreUtil;
+
+public class TieredCommitLog {
+    private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+    public static final int CODA_SIZE = 4 /* item size: int, 4 bytes */
+        + 4 /* magic code: int, 4 bytes */
+        + 8 /* max store timestamp: long, 8 bytes */;
+    public static final int BLANK_MAGIC_CODE = 0xBBCCDDEE ^ 1880681586 + 8;
+
+    private final MessageQueue messageQueue;
+    private final TieredMessageStoreConfig storeConfig;
+    private final TieredFileQueue fileQueue;
+
+    public TieredCommitLog(MessageQueue messageQueue, TieredMessageStoreConfig storeConfig)
+        throws ClassNotFoundException, NoSuchMethodException {
+        this.messageQueue = messageQueue;
+        this.storeConfig = storeConfig;
+        this.fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.COMMIT_LOG, messageQueue, storeConfig);
+        if (fileQueue.getBaseOffset() == -1) {
+            fileQueue.setBaseOffset(0);
+        }
+    }
+
+    public long getMinOffset() {
+        return fileQueue.getMinOffset();
+    }
+
+    public long getCommitOffset() {
+        return fileQueue.getCommitOffset();
+    }
+
+    public long getCommitMsgQueueOffset() {
+        return fileQueue.getCommitMsgQueueOffset();
+    }
+
+    public long getMaxOffset() {
+        return fileQueue.getMaxOffset();
+    }
+
+    public long getBeginTimestamp() {
+        TieredFileSegment firstIndexFile = fileQueue.getFileByIndex(0);
+        if (firstIndexFile == null) {
+            return -1;
+        }
+        long beginTimestamp = firstIndexFile.getBeginTimestamp();
+        return beginTimestamp != Long.MAX_VALUE ? beginTimestamp : -1;
+    }
+
+    public long getEndTimestamp() {
+        return fileQueue.getFileToWrite().getEndTimestamp();
+    }
+
+    public AppendResult append(ByteBuffer byteBuf) {
+        return fileQueue.append(byteBuf, MessageBufferUtil.getStoreTimeStamp(byteBuf));
+    }
+
+    public AppendResult append(ByteBuffer byteBuf, boolean commit) {
+        return fileQueue.append(byteBuf, MessageBufferUtil.getStoreTimeStamp(byteBuf), commit);
+    }
+
+    public CompletableFuture<ByteBuffer> readAsync(long offset, int length) {
+        return fileQueue.readAsync(offset, length);
+    }
+
+    public void commit(boolean sync) {
+        fileQueue.commit(sync);
+    }
+
+    public void cleanExpiredFile(long expireTimestamp) {
+        fileQueue.cleanExpiredFile(expireTimestamp);
+    }
+
+    public void destroyExpiredFile() {
+        fileQueue.destroyExpiredFile();
+
+        if (fileQueue.getFileSegmentCount() == 0) {
+            return;
+        }
+        TieredFileSegment fileSegment = fileQueue.getFileToWrite();
+        try {
+            if (System.currentTimeMillis() - fileSegment.getEndTimestamp() > (long) storeConfig.getCommitLogRollingInterval() * 60 * 60 * 1000
+                && fileSegment.getSize() > storeConfig.getCommitLogRollingMinimumSize()) {
+                fileQueue.rollingNewFile();
+            }
+        } catch (Exception e) {
+            logger.error("Rolling to next file failed:", e);
+        }
+    }
+
+    public void destroy() {
+        fileQueue.destroy();
+    }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredConsumeQueue.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredConsumeQueue.java
new file mode 100644
index 000000000..f4fecdc1a
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredConsumeQueue.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.container;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.common.AppendResult;
+import org.apache.rocketmq.store.tiered.common.BoundaryType;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+
+public class TieredConsumeQueue {
+    public static final int CONSUME_QUEUE_STORE_UNIT_SIZE = 8 /* commit log offset: long, 8 bytes */
+        + 4 /* message size: int, 4 bytes */
+        + 8 /* tag hash code: long, 8 bytes */;
+    private final MessageQueue messageQueue;
+    private final TieredMessageStoreConfig storeConfig;
+    private final TieredFileQueue fileQueue;
+
+
+    public TieredConsumeQueue(MessageQueue messageQueue, TieredMessageStoreConfig storeConfig) throws ClassNotFoundException, NoSuchMethodException {
+        this.messageQueue = messageQueue;
+        this.storeConfig = storeConfig;
+        this.fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.CONSUME_QUEUE, messageQueue, storeConfig);
+    }
+
+    public boolean isInitialized() {
+        return fileQueue.getBaseOffset() != -1;
+    }
+
+    public long getBaseOffset() {
+        return fileQueue.getBaseOffset();
+    }
+
+    public void setBaseOffset(long baseOffset) {
+        fileQueue.setBaseOffset(baseOffset);
+    }
+
+    public long getMinOffset() {
+        return fileQueue.getMinOffset();
+    }
+
+    public long getCommitOffset() {
+        return fileQueue.getCommitOffset();
+    }
+
+    public long getMaxOffset() {
+        return fileQueue.getMaxOffset();
+    }
+
+    public long getEndTimestamp() {
+        return fileQueue.getFileToWrite().getEndTimestamp();
+    }
+
+    public AppendResult append(final long offset, final int size, final long tagsCode, long timeStamp) {
+        return append(offset, size, tagsCode, timeStamp, false);
+    }
+
+    public AppendResult append(final long offset, final int size, final long tagsCode, long timeStamp, boolean commit) {
+        ByteBuffer cqItem = ByteBuffer.allocate(CONSUME_QUEUE_STORE_UNIT_SIZE);
+        cqItem.putLong(offset);
+        cqItem.putInt(size);
+        cqItem.putLong(tagsCode);
+        cqItem.flip();
+        return fileQueue.append(cqItem, timeStamp, commit);
+    }
+
+    public CompletableFuture<ByteBuffer> readAsync(long offset, int length) {
+        return fileQueue.readAsync(offset, length);
+    }
+
+    public void commit(boolean sync) {
+        fileQueue.commit(sync);
+    }
+
+    public void cleanExpiredFile(long expireTimestamp) {
+        fileQueue.cleanExpiredFile(expireTimestamp);
+    }
+
+    public void destroyExpiredFile() {
+        fileQueue.destroyExpiredFile();
+    }
+
+    protected Pair<Long, Long> getQueueOffsetInFileByTime(long timestamp, BoundaryType boundaryType) {
+        TieredFileSegment fileSegment = fileQueue.getFileByTime(timestamp, boundaryType);
+        if (fileSegment == null) {
+            return Pair.of(-1L, -1L);
+        }
+        return Pair.of(fileSegment.getBaseOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE,
+            fileSegment.getCommitOffset() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE - 1);
+    }
+
+    public void destroy() {
+        fileQueue.destroy();
+    }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredFileQueue.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredFileQueue.java
new file mode 100644
index 000000000..aa7e8e044
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredFileQueue.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.container;
+
+import java.lang.reflect.Constructor;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.tiered.common.AppendResult;
+import org.apache.rocketmq.store.tiered.common.BoundaryType;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.exception.TieredStoreErrorCode;
+import org.apache.rocketmq.store.tiered.exception.TieredStoreException;
+import org.apache.rocketmq.store.tiered.metadata.FileSegmentMetadata;
+import org.apache.rocketmq.store.tiered.metadata.TieredMetadataStore;
+import org.apache.rocketmq.store.tiered.util.TieredStoreUtil;
+
+public class TieredFileQueue {
+    private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+    private final TieredFileSegment.FileSegmentType fileType;
+    private final MessageQueue messageQueue;
+    private long baseOffset = -1;
+    private final TieredMessageStoreConfig storeConfig;
+    private final TieredMetadataStore metadataStore;
+
+    private final List<TieredFileSegment> fileSegmentList = new ArrayList<>();
+    protected final List<TieredFileSegment> needCommitFileSegmentList = new CopyOnWriteArrayList<>();
+    private final ReentrantReadWriteLock fileSegmentLock = new ReentrantReadWriteLock();
+
+    private final Constructor<? extends TieredFileSegment> fileSegmentConstructor;
+
+    public TieredFileQueue(TieredFileSegment.FileSegmentType fileType, MessageQueue messageQueue,
+        TieredMessageStoreConfig storeConfig) throws ClassNotFoundException, NoSuchMethodException {
+        this.fileType = fileType;
+        this.messageQueue = messageQueue;
+        this.storeConfig = storeConfig;
+        this.metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
+        Class<? extends TieredFileSegment> clazz = Class.forName(storeConfig.getTieredBackendServiceProvider()).asSubclass(TieredFileSegment.class);
+        fileSegmentConstructor = clazz.getConstructor(TieredFileSegment.FileSegmentType.class, MessageQueue.class, Long.TYPE, TieredMessageStoreConfig.class);
+        loadFromMetadata();
+        if (fileType != TieredFileSegment.FileSegmentType.INDEX) {
+            checkAndFixFileSize();
+        }
+    }
+
+    public long getBaseOffset() {
+        return baseOffset;
+    }
+
+    public void setBaseOffset(long baseOffset) {
+        if (fileSegmentList.size() > 0) {
+            throw new IllegalStateException("can not set base offset after file segment has been created");
+        }
+        this.baseOffset = baseOffset;
+    }
+
+    public long getMinOffset() {
+        fileSegmentLock.readLock().lock();
+        try {
+            if (fileSegmentList.isEmpty()) {
+                return baseOffset;
+            }
+            return fileSegmentList.get(0).getBaseOffset();
+        } finally {
+            fileSegmentLock.readLock().unlock();
+        }
+    }
+
+    public long getCommitOffset() {
+        fileSegmentLock.readLock().lock();
+        try {
+            if (fileSegmentList.isEmpty()) {
+                return baseOffset;
+            }
+            return fileSegmentList.get(fileSegmentList.size() - 1).getCommitOffset();
+        } finally {
+            fileSegmentLock.readLock().unlock();
+        }
+    }
+
+    public long getMaxOffset() {
+        fileSegmentLock.readLock().lock();
+        try {
+            if (fileSegmentList.isEmpty()) {
+                return baseOffset;
+            }
+            return fileSegmentList.get(fileSegmentList.size() - 1).getMaxOffset();
+        } finally {
+            fileSegmentLock.readLock().unlock();
+        }
+    }
+
+    public long getCommitMsgQueueOffset() {
+        fileSegmentLock.readLock().lock();
+        try {
+            if (fileSegmentList.isEmpty()) {
+                return 0;
+            }
+            return fileSegmentList.get(fileSegmentList.size() - 1).getCommitMsgQueueOffset();
+        } finally {
+            fileSegmentLock.readLock().unlock();
+        }
+    }
+
+    private void loadFromMetadata() {
+        metadataStore.iterateFileSegment(fileType, messageQueue.getTopic(), messageQueue.getQueueId(), metadata -> {
+            if (metadata.getStatus() == FileSegmentMetadata.STATUS_DELETED) {
+                return;
+            }
+            TieredFileSegment segment = newSegment(metadata.getBaseOffset(), false);
+            segment.initPosition(metadata.getSize());
+            segment.setBeginTimestamp(metadata.getBeginTimestamp());
+            segment.setEndTimestamp(metadata.getEndTimestamp());
+            if (metadata.getStatus() == FileSegmentMetadata.STATUS_SEALED) {
+                segment.setFull(false);
+            }
+            // TODO check coda/size
+            fileSegmentList.add(segment);
+        });
+        if (!fileSegmentList.isEmpty()) {
+            fileSegmentList.sort(Comparator.comparingLong(TieredFileSegment::getBaseOffset));
+            baseOffset = fileSegmentList.get(0).getBaseOffset();
+            needCommitFileSegmentList.addAll(fileSegmentList.stream()
+                .filter(segment -> !segment.isFull())
+                .collect(Collectors.toList()));
+        }
+    }
+
+    private void checkAndFixFileSize() {
+        for (int i = 1; i < fileSegmentList.size(); i++) {
+            TieredFileSegment pre = fileSegmentList.get(i - 1);
+            TieredFileSegment cur = fileSegmentList.get(i);
+            if (pre.getCommitOffset() != cur.getBaseOffset()) {
+                logger.warn("TieredFileQueue#checkAndFixFileSize: file segment has incorrect size: topic: {}, queue: {}, file type: {}, base offset: {}",
+                    messageQueue.getTopic(), messageQueue.getQueueId(), fileType, pre.getBaseOffset());
+                try {
+                    long actualSize = pre.getSize();
+                    if (pre.getBaseOffset() + actualSize != cur.getBaseOffset()) {
+                        logger.error("[Bug]TieredFileQueue#checkAndFixFileSize: file segment has incorrect size and can not fix: topic: {}, queue: {}, file type: {}, base offset: {}, actual size: {}, next file offset: {}",
+                            messageQueue.getTopic(), messageQueue.getQueueId(), fileType, pre.getBaseOffset(), actualSize, cur.getBaseOffset());
+                        continue;
+                    }
+                    pre.initPosition(actualSize);
+                    metadataStore.updateFileSegment(pre);
+                } catch (Exception e) {
+                    logger.error("TieredFileQueue#checkAndFixFileSize: fix file segment size failed: topic: {}, queue: {}, file type: {}, base offset: {}",
+                        messageQueue.getTopic(), messageQueue.getQueueId(), fileType, pre.getBaseOffset());
+                }
+            }
+        }
+        if (!fileSegmentList.isEmpty()) {
+            TieredFileSegment lastFile = fileSegmentList.get(fileSegmentList.size() - 1);
+            long lastFileSize = lastFile.getSize();
+            if (lastFile.getCommitPosition() != lastFileSize) {
+                logger.warn("TieredFileQueue#checkAndFixFileSize: fix last file {} size: origin: {}, actual: {}", lastFile.getPath(), lastFile.getCommitOffset() - lastFile.getBaseOffset(), lastFileSize);
+                lastFile.initPosition(lastFileSize);
+            }
+        }
+    }
+
+    private TieredFileSegment newSegment(long baseOffset, boolean createMetadata) {
+        TieredFileSegment segment = null;
+        try {
+            segment = fileSegmentConstructor.newInstance(fileType, messageQueue, baseOffset, storeConfig);
+            if (fileType != TieredFileSegment.FileSegmentType.INDEX) {
+                segment.createFile();
+            }
+            if (createMetadata) {
+                metadataStore.updateFileSegment(segment);
+            }
+        } catch (Exception e) {
+            logger.error("create file segment failed: topic: {}, queue: {}, file type: {}, base offset: {}",
+                messageQueue.getTopic(), messageQueue.getQueueId(), fileType, baseOffset, e);
+        }
+        return segment;
+    }
+
+    public void rollingNewFile() {
+        TieredFileSegment segment = getFileToWrite();
+        segment.setFull();
+        // create new segment
+        getFileToWrite();
+    }
+
+    public int getFileSegmentCount() {
+        return fileSegmentList.size();
+    }
+
+    @Nullable
+    protected TieredFileSegment getFileByIndex(int index) {
+        fileSegmentLock.readLock().lock();
+        try {
+            if (index < fileSegmentList.size()) {
+                return fileSegmentList.get(index);
+            }
+            return null;
+        } finally {
+            fileSegmentLock.readLock().unlock();
+        }
+    }
+
+    protected TieredFileSegment getFileToWrite() {
+        if (baseOffset == -1) {
+            throw new IllegalStateException("need to set base offset before create file segment");
+        }
+        fileSegmentLock.readLock().lock();
+        try {
+            if (!fileSegmentList.isEmpty()) {
+                TieredFileSegment fileSegment = fileSegmentList.get(fileSegmentList.size() - 1);
+                if (!fileSegment.isFull()) {
+                    return fileSegment;
+                }
+            }
+        } finally {
+            fileSegmentLock.readLock().unlock();
+        }
+        // Create new file segment
+        fileSegmentLock.writeLock().lock();
+        try {
+            long offset = baseOffset;
+            if (!fileSegmentList.isEmpty()) {
+                TieredFileSegment segment = fileSegmentList.get(fileSegmentList.size() - 1);
+                if (!segment.isFull()) {
+                    return segment;
+                }
+                if (segment.commit()) {
+                    try {
+                        metadataStore.updateFileSegment(segment);
+                    } catch (Exception e) {
+                        return segment;
+                    }
+                } else {
+                    return segment;
+                }
+
+                offset = segment.getMaxOffset();
+            }
+            TieredFileSegment fileSegment = newSegment(offset, true);
+            fileSegmentList.add(fileSegment);
+            needCommitFileSegmentList.add(fileSegment);
+
+            Collections.sort(fileSegmentList);
+
+            logger.debug("Create a new file segment: baseOffset: {}, file: {}, file type: {}", baseOffset, fileSegment.getPath(), fileType);
+            return fileSegment;
+        } finally {
+            fileSegmentLock.writeLock().unlock();
+        }
+    }
+
+    @Nullable
+    protected TieredFileSegment getFileByTime(long timestamp, BoundaryType boundaryType) {
+        fileSegmentLock.readLock().lock();
+        try {
+            List<TieredFileSegment> segmentList = fileSegmentList.stream()
+                .sorted(boundaryType == BoundaryType.UPPER ? Comparator.comparingLong(TieredFileSegment::getEndTimestamp) : Comparator.comparingLong(TieredFileSegment::getBeginTimestamp))
+                .filter(segment -> boundaryType == BoundaryType.UPPER ? segment.getEndTimestamp() >= timestamp : segment.getBeginTimestamp() <= timestamp)
+                .collect(Collectors.toList());
+            if (!segmentList.isEmpty()) {
+                return boundaryType == BoundaryType.UPPER ? segmentList.get(0) : segmentList.get(segmentList.size() - 1);
+            }
+            return fileSegmentList.isEmpty() ? null : fileSegmentList.get(fileSegmentList.size() - 1);
+        } finally {
+            fileSegmentLock.readLock().unlock();
+        }
+    }
+
+    protected List<TieredFileSegment> getFileListByTime(long beginTime, long endTime) {
+        fileSegmentLock.readLock().lock();
+        try {
+            return fileSegmentList.stream()
+                .filter(segment -> Math.max(beginTime, segment.getBeginTimestamp()) <= Math.min(endTime, segment.getEndTimestamp()))
+                .collect(Collectors.toList());
+        } finally {
+            fileSegmentLock.readLock().unlock();
+        }
+    }
+
+    protected int getSegmentIndexByOffset(long offset) {
+        fileSegmentLock.readLock().lock();
+        try {
+            if (fileSegmentList.size() <= 0) {
+                return -1;
+            }
+
+            int left = 0;
+            int right = fileSegmentList.size() - 1;
+            int mid = (left + right) / 2;
+
+            long firstSegmentOffset = fileSegmentList.get(left).getBaseOffset();
+            long lastSegmentOffset = fileSegmentList.get(right).getCommitOffset();
+            long midSegmentOffset = fileSegmentList.get(mid).getBaseOffset();
+
+            if (offset < firstSegmentOffset || offset > lastSegmentOffset) {
+                return -1;
+            }
+
+            while (left < right - 1) {
+                if (offset == midSegmentOffset) {
+                    return mid;
+                }
+                if (offset < midSegmentOffset) {
+                    right = mid;
+                } else {
+                    left = mid;
+                }
+                mid = (left + right) / 2;
+                midSegmentOffset = fileSegmentList.get(mid).getBaseOffset();
+            }
+            return offset < fileSegmentList.get(right).getBaseOffset() ? mid : right;
+        } finally {
+            fileSegmentLock.readLock().unlock();
+        }
+    }
+
+    public AppendResult append(ByteBuffer byteBuf) {
+        return append(byteBuf, Long.MAX_VALUE, false);
+    }
+
+    public AppendResult append(ByteBuffer byteBuf, long timeStamp) {
+        return append(byteBuf, timeStamp, false);
+    }
+
+    public AppendResult append(ByteBuffer byteBuf, long timeStamp, boolean commit) {
+        TieredFileSegment fileSegment = getFileToWrite();
+        AppendResult result = fileSegment.append(byteBuf, timeStamp);
+        if (commit && result == AppendResult.BUFFER_FULL && fileSegment.commit()) {
+            result = fileSegment.append(byteBuf, timeStamp);
+        }
+        if (result == AppendResult.FILE_FULL) {
+            // write to new file
+            return getFileToWrite().append(byteBuf, timeStamp);
+        }
+        return result;
+    }
+
+    public void cleanExpiredFile(long expireTimestamp) {
+        Set<Long> needToDeleteSet = new HashSet<>();
+        try {
+            metadataStore.iterateFileSegment(fileType, messageQueue.getTopic(), messageQueue.getQueueId(),
+                metadata -> {
+                    if (metadata.getEndTimestamp() < expireTimestamp) {
+                        needToDeleteSet.add(metadata.getBaseOffset());
+                    }
+                });
+        } catch (Exception e) {
+            logger.error("clean expired failed: topic: {}, queue: {}, file type: {}, expire timestamp: {}",
+                messageQueue.getTopic(), messageQueue.getQueueId(), fileType, expireTimestamp);
+        }
+
+        if (needToDeleteSet.isEmpty()) {
+            return;
+        }
+
+        fileSegmentLock.writeLock().lock();
+        try {
+            for (int i = 0; i < fileSegmentList.size(); i++) {
+                try {
+                    TieredFileSegment fileSegment = fileSegmentList.get(i);
+                    if (needToDeleteSet.contains(fileSegment.getBaseOffset())) {
+                        fileSegment.close();
+                        fileSegmentList.remove(fileSegment);
+                        needCommitFileSegmentList.remove(fileSegment);
+                        i--;
+                        metadataStore.updateFileSegment(fileSegment);
+                        logger.info("expired file {} is been cleaned", fileSegment.getPath());
+                    } else {
+                        break;
+                    }
+                } catch (Exception e) {
+                    logger.error("clean expired file failed: topic: {}, queue: {}, file type: {}, expire timestamp: {}",
+                        messageQueue.getTopic(), messageQueue.getQueueId(), fileType, expireTimestamp, e);
+                }
+            }
+            if (fileSegmentList.size() > 0) {
+                baseOffset = fileSegmentList.get(0).getBaseOffset();
+            } else if (fileType == TieredFileSegment.FileSegmentType.CONSUME_QUEUE) {
+                baseOffset = -1;
+            } else {
+                baseOffset = 0;
+            }
+        } finally {
+            fileSegmentLock.writeLock().unlock();
+        }
+    }
+
+    public void destroyExpiredFile() {
+        try {
+            metadataStore.iterateFileSegment(fileType, messageQueue.getTopic(), messageQueue.getQueueId(),
+                metadata -> {
+                    if (metadata.getStatus() == FileSegmentMetadata.STATUS_DELETED) {
+                        try {
+                            TieredFileSegment fileSegment = newSegment(metadata.getBaseOffset(), false);
+                            fileSegment.destroyFile();
+                            if (!fileSegment.exists()) {
+                                metadataStore.deleteFileSegment(fileSegment);
+                                logger.info("expired file {} is been destroyed", fileSegment.getPath());
+                            }
+                        } catch (Exception e) {
+                            logger.error("destroy expired failed: topic: {}, queue: {}, file type: {}",
+                                messageQueue.getTopic(), messageQueue.getQueueId(), fileType, e);
+                        }
+                    }
+                });
+        } catch (Exception e) {
+            logger.error("destroy expired file failed: topic: {}, queue: {}, file type: {}",
+                messageQueue.getTopic(), messageQueue.getQueueId(), fileType);
+        }
+    }
+
+    public void commit(boolean sync) {
+        ArrayList<CompletableFuture<Void>> futureList = new ArrayList<>();
+        try {
+            for (TieredFileSegment segment : needCommitFileSegmentList) {
+                if (segment.isClosed()) {
+                    continue;
+                }
+                futureList.add(segment.commitAsync()
+                    .thenAccept(success -> {
+                        try {
+                            metadataStore.updateFileSegment(segment);
+                        } catch (Exception e) {
+                            // TODO handle update segment metadata failed exception
+                            logger.error("update file segment metadata failed: topic: {}, queue: {}, file type: {}, base offset: {}",
+                                messageQueue.getTopic(), messageQueue.getQueueId(), fileType, segment.getBaseOffset(), e);
+                        }
+                        if (segment.isFull() && !segment.needCommit()) {
+                            needCommitFileSegmentList.remove(segment);
+                        }
+                    }));
+            }
+        } catch (Exception e) {
+            logger.error("commit file segment failed: topic: {}, queue: {}, file type: {}", messageQueue.getTopic(), messageQueue.getQueueId(), fileType, e);
+        }
+        if (sync) {
+            CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
+        }
+    }
+
+    public CompletableFuture<ByteBuffer> readAsync(long offset, int length) {
+        int index = getSegmentIndexByOffset(offset);
+        if (index == -1) {
+            String errorMsg = String.format("TieredFileQueue#readAsync: offset is illegal, topic: %s, queue: %s, file type: %s, start: %d, length: %d, file num: %d",
+                messageQueue.getTopic(), messageQueue.getQueueId(), fileType, offset, length, fileSegmentList.size());
+            logger.error(errorMsg);
+            throw new TieredStoreException(TieredStoreErrorCode.ILLEGAL_OFFSET, errorMsg);
+        }
+        TieredFileSegment fileSegment1;
+        TieredFileSegment fileSegment2 = null;
+        fileSegmentLock.readLock().lock();
+        try {
+            fileSegment1 = fileSegmentList.get(index);
+            if (offset + length > fileSegment1.getCommitOffset()) {
+                if (fileSegmentList.size() > index + 1) {
+                    fileSegment2 = fileSegmentList.get(index + 1);
+                }
+            }
+        } finally {
+            fileSegmentLock.readLock().unlock();
+        }
+        if (fileSegment2 == null) {
+            return fileSegment1.readAsync(offset - fileSegment1.getBaseOffset(), length);
+        }
+        int segment1Length = (int) (fileSegment1.getCommitOffset() - offset);
+        return fileSegment1.readAsync(offset - fileSegment1.getBaseOffset(), segment1Length)
+            .thenCombine(fileSegment2.readAsync(0, length - segment1Length), (buffer1, buffer2) -> {
+                ByteBuffer compositeBuffer = ByteBuffer.allocate(buffer1.remaining() + buffer2.remaining());
+                compositeBuffer.put(buffer1).put(buffer2);
+                compositeBuffer.flip();
+                return compositeBuffer;
+            });
+    }
+
+    public void destroy() {
+        fileSegmentLock.writeLock().lock();
+        try {
+            for (TieredFileSegment fileSegment : fileSegmentList) {
+                fileSegment.close();
+                try {
+                    metadataStore.updateFileSegment(fileSegment);
+                } catch (Exception e) {
+                    logger.error("TieredFileQueue#destroy: mark file segment: {} is deleted failed", fileSegment.getPath(), e);
+                }
+                fileSegment.destroyFile();
+            }
+            fileSegmentList.clear();
+            needCommitFileSegmentList.clear();
+        } finally {
+            fileSegmentLock.writeLock().unlock();
+        }
+    }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredFileSegment.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredFileSegment.java
new file mode 100644
index 000000000..9cf19c638
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredFileSegment.java
@@ -0,0 +1,538 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.container;
+
+import com.google.common.base.Stopwatch;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.tiered.common.AppendResult;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.exception.TieredStoreErrorCode;
+import org.apache.rocketmq.store.tiered.exception.TieredStoreException;
+import org.apache.rocketmq.store.tiered.util.MessageBufferUtil;
+import org.apache.rocketmq.store.tiered.util.TieredStoreUtil;
+
+public abstract class TieredFileSegment implements Comparable<TieredFileSegment> {
+    private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+    private volatile boolean closed = false;
+    private final ReentrantLock bufferLock = new ReentrantLock();
+    private final Semaphore commitLock = new Semaphore(1);
+    private List<ByteBuffer> uploadBufferList = new ArrayList<>();
+    private boolean full;
+    protected final FileSegmentType fileType;
+    protected final MessageQueue messageQueue;
+    protected final TieredMessageStoreConfig storeConfig;
+    protected final long baseOffset;
+    private volatile long commitPosition;
+    private volatile long appendPosition;
+    private final long maxSize;
+    private long beginTimestamp = Long.MAX_VALUE;
+    private long endTimestamp = Long.MAX_VALUE;
+    // only used in commitLog
+    private long commitMsgQueueOffset = 0;
+    private ByteBuffer codaBuffer;
+
+    private CompletableFuture<Boolean> inflightCommitRequest = CompletableFuture.completedFuture(false);
+
+    public TieredFileSegment(FileSegmentType fileType, MessageQueue messageQueue, long baseOffset,
+        TieredMessageStoreConfig storeConfig) {
+        this.fileType = fileType;
+        this.messageQueue = messageQueue;
+        this.storeConfig = storeConfig;
+        this.baseOffset = baseOffset;
+        this.commitPosition = 0;
+        this.appendPosition = 0;
+        switch (fileType) {
+            case COMMIT_LOG:
+                this.maxSize = storeConfig.getTieredStoreCommitLogMaxSize();
+                break;
+            case CONSUME_QUEUE:
+                this.maxSize = storeConfig.getTieredStoreConsumeQueueMaxSize();
+                break;
+            case INDEX:
+                this.maxSize = Long.MAX_VALUE;
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported file type: " + fileType);
+        }
+    }
+
+    @Override
+    public int compareTo(TieredFileSegment o) {
+        return Long.compare(this.baseOffset, o.baseOffset);
+    }
+
+    public long getBaseOffset() {
+        return baseOffset;
+    }
+
+    public long getCommitOffset() {
+        return baseOffset + commitPosition;
+    }
+
+    public long getCommitPosition() {
+        return commitPosition;
+    }
+
+    public long getCommitMsgQueueOffset() {
+        return commitMsgQueueOffset;
+    }
+
+    public long getMaxOffset() {
+        return baseOffset + appendPosition;
+    }
+
+    public long getMaxSize() {
+        return maxSize;
+    }
+
+    public long getBeginTimestamp() {
+        return beginTimestamp;
+    }
+
+    protected void setBeginTimestamp(long beginTimestamp) {
+        this.beginTimestamp = beginTimestamp;
+    }
+
+    public long getEndTimestamp() {
+        return endTimestamp;
+    }
+
+    protected void setEndTimestamp(long endTimestamp) {
+        this.endTimestamp = endTimestamp;
+    }
+
+    public boolean isFull() {
+        return full;
+    }
+
+    public void setFull() {
+        setFull(true);
+    }
+
+    public void setFull(boolean appendCoda) {
+        bufferLock.lock();
+        try {
+            full = true;
+            if (fileType == FileSegmentType.COMMIT_LOG && appendCoda) {
+                appendCoda();
+            }
+        } finally {
+            bufferLock.unlock();
+        }
+    }
+
+    public boolean isClosed() {
+        return closed;
+    }
+
+    public void close() {
+        closed = true;
+    }
+
+    public FileSegmentType getFileType() {
+        return fileType;
+    }
+
+    public MessageQueue getMessageQueue() {
+        return messageQueue;
+    }
+
+    public void initPosition(long pos) {
+        this.commitPosition = pos;
+        this.appendPosition = pos;
+    }
+
+    private List<ByteBuffer> rollingUploadBuffer() {
+        bufferLock.lock();
+        try {
+            List<ByteBuffer> tmp = uploadBufferList;
+            uploadBufferList = new ArrayList<>();
+            return tmp;
+        } finally {
+            bufferLock.unlock();
+        }
+    }
+
+    private void sendBackBuffer(TieredFileSegmentInputStream inputStream) {
+        bufferLock.lock();
+        try {
+            List<ByteBuffer> tmpBufferList = inputStream.getUploadBufferList();
+            for (ByteBuffer buffer : tmpBufferList) {
+                buffer.rewind();
+            }
+            tmpBufferList.addAll(uploadBufferList);
+            uploadBufferList = tmpBufferList;
+            if (inputStream.getCodaBuffer() != null) {
+                codaBuffer.rewind();
+            }
+        } finally {
+            bufferLock.unlock();
+        }
+    }
+
+    public AppendResult append(ByteBuffer byteBuf, long timeStamp) {
+        if (closed) {
+            return AppendResult.FILE_CLOSE;
+        }
+        bufferLock.lock();
+        try {
+            if (full || codaBuffer != null) {
+                return AppendResult.FILE_FULL;
+            }
+
+            if (fileType == FileSegmentType.INDEX) {
+                beginTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION);
+                endTimestamp = byteBuf.getLong(TieredIndexFile.INDEX_FILE_HEADER_END_TIME_STAMP_POSITION);
+                appendPosition += byteBuf.remaining();
+                uploadBufferList.add(byteBuf);
+                setFull();
+                return AppendResult.SUCCESS;
+            }
+
+            if (appendPosition + byteBuf.remaining() > maxSize) {
+                setFull();
+                return AppendResult.FILE_FULL;
+            }
+            if (uploadBufferList.size() > storeConfig.getTieredStoreGroupCommitCount()
+                || appendPosition - commitPosition > storeConfig.getTieredStoreGroupCommitSize()) {
+                commitAsync();
+            }
+            if (uploadBufferList.size() > storeConfig.getTieredStoreMaxGroupCommitCount()) {
+                logger.debug("TieredFileSegment#append: buffer full: file: {}, upload buffer size: {}",
+                    getPath(), uploadBufferList.size());
+                return AppendResult.BUFFER_FULL;
+            }
+            if (timeStamp != Long.MAX_VALUE) {
+                endTimestamp = timeStamp;
+                if (beginTimestamp == Long.MAX_VALUE) {
+                    beginTimestamp = timeStamp;
+                }
+            }
+            appendPosition += byteBuf.remaining();
+            uploadBufferList.add(byteBuf);
+            return AppendResult.SUCCESS;
+        } finally {
+            bufferLock.unlock();
+        }
+    }
+
+    private void appendCoda() {
+        if (codaBuffer != null) {
+            return;
+        }
+        codaBuffer = ByteBuffer.allocate(TieredCommitLog.CODA_SIZE);
+        codaBuffer.putInt(TieredCommitLog.CODA_SIZE);
+        codaBuffer.putInt(TieredCommitLog.BLANK_MAGIC_CODE);
+        codaBuffer.putLong(endTimestamp);
+        codaBuffer.flip();
+        appendPosition += TieredCommitLog.CODA_SIZE;
+    }
+
+    public ByteBuffer read(long position, int length) {
+        return readAsync(position, length).join();
+    }
+
+    public CompletableFuture<ByteBuffer> readAsync(long position, int length) {
+        CompletableFuture<ByteBuffer> future = new CompletableFuture<>();
+        if (position < 0 || length < 0) {
+            future.completeExceptionally(
+                new TieredStoreException(TieredStoreErrorCode.ILLEGAL_PARAM, "position or length is negative"));
+            return future;
+        }
+        if (length == 0) {
+            future.completeExceptionally(
+                new TieredStoreException(TieredStoreErrorCode.ILLEGAL_PARAM, "length is zero"));
+            return future;
+        }
+        if (position + length > commitPosition) {
+            logger.warn("TieredFileSegment#readAsync request position + length is greater than commit position," +
+                    " correct length using commit position, file: {}, request position: {}, commit position:{}, change length from {} to {}",
+                getPath(), position, commitPosition, length, commitPosition - position);
+            length = (int) (commitPosition - position);
+            if (length == 0) {
+                future.completeExceptionally(
+                    new TieredStoreException(TieredStoreErrorCode.NO_NEW_DATA, "request position is equal to commit position"));
+                return future;
+            }
+            if (fileType == FileSegmentType.CONSUME_QUEUE && length % TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE != 0) {
+                future.completeExceptionally(
+                    new TieredStoreException(TieredStoreErrorCode.ILLEGAL_PARAM, "position and length is illegal"));
+                return future;
+            }
+        }
+        return read0(position, length);
+    }
+
+    public boolean needCommit() {
+        return appendPosition > commitPosition;
+    }
+
+    public boolean commit() {
+        if (closed) {
+            return false;
+        }
+        Boolean result = commitAsync().join();
+        if (!result) {
+            result = inflightCommitRequest.join();
+        }
+        return result;
+    }
+
+    public CompletableFuture<Boolean> commitAsync() {
+        if (closed) {
+            return CompletableFuture.completedFuture(false);
+        }
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        if (!needCommit()) {
+            return CompletableFuture.completedFuture(true);
+        }
+        try {
+            int permits = commitLock.drainPermits();
+            if (permits <= 0) {
+                return CompletableFuture.completedFuture(false);
+            }
+        } catch (Exception e) {
+            return CompletableFuture.completedFuture(false);
+        }
+        List<ByteBuffer> bufferList = rollingUploadBuffer();
+        int bufferSize = 0;
+        for (ByteBuffer buffer : bufferList) {
+            bufferSize += buffer.remaining();
+        }
+        if (codaBuffer != null) {
+            bufferSize += codaBuffer.remaining();
+        }
+        if (bufferSize == 0) {
+            return CompletableFuture.completedFuture(true);
+        }
+        TieredFileSegmentInputStream inputStream = new TieredFileSegmentInputStream(fileType, baseOffset + commitPosition, bufferList, codaBuffer, bufferSize);
+        int finalBufferSize = bufferSize;
+        try {
+            inflightCommitRequest = commit0(inputStream, commitPosition, bufferSize, fileType != FileSegmentType.INDEX)
+                .thenApply(result -> {
+                    if (result) {
+                        if (fileType == FileSegmentType.COMMIT_LOG && bufferList.size() > 0) {
+                            commitMsgQueueOffset = MessageBufferUtil.getQueueOffset(bufferList.get(bufferList.size() - 1));
+                        }
+                        commitPosition += finalBufferSize;
+                        return true;
+                    }
+                    sendBackBuffer(inputStream);
+                    return false;
+                }).exceptionally(e -> handleCommitException(inputStream, e))
+                .whenComplete((result, e) -> {
+                    if (commitLock.availablePermits() == 0) {
+                        logger.debug("TieredFileSegment#commitAsync: commit cost: {}ms, file: {}, item count: {}, buffer size: {}", stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(), finalBufferSize);
+                        commitLock.release();
+                    } else {
+                        logger.error("[Bug]TieredFileSegment#commitAsync: commit lock is already released: available permits: {}", commitLock.availablePermits());
+                    }
+                });
+            return inflightCommitRequest;
+        } catch (Exception e) {
+            handleCommitException(inputStream, e);
+            if (commitLock.availablePermits() == 0) {
+                logger.debug("TieredFileSegment#commitAsync: commit cost: {}ms, file: {}, item count: {}, buffer size: {}", stopwatch.elapsed(TimeUnit.MILLISECONDS), getPath(), bufferList.size(), finalBufferSize);
+                commitLock.release();
+            } else {
+                logger.error("[Bug]TieredFileSegment#commitAsync: commit lock is already released: available permits: {}", commitLock.availablePermits());
+            }
+        }
+        return CompletableFuture.completedFuture(false);
+    }
+
+    private boolean handleCommitException(TieredFileSegmentInputStream inputStream, Throwable e) {
+        Throwable cause = e.getCause() != null ? e.getCause() : e;
+        sendBackBuffer(inputStream);
+        long realSize = 0;
+        if (cause instanceof TieredStoreException && ((TieredStoreException) cause).getPosition() > 0) {
+            realSize = ((TieredStoreException) cause).getPosition();
+        }
+        if (realSize <= 0) {
+            realSize = getSize();
+        }
+        if (realSize > 0 && realSize > commitPosition) {
+            logger.error("TieredFileSegment#handleCommitException: commit failed: file: {}, try to fix position: origin: {}, real: {}", getPath(), commitPosition, realSize, cause);
+            // TODO check if this diff part is uploaded to backend storage
+            long diff = appendPosition - commitPosition;
+            commitPosition = realSize;
+            appendPosition = realSize + diff;
+            // TODO check if appendPosition is large than maxOffset
+        } else if (realSize < commitPosition) {
+            logger.error("[Bug]TieredFileSegment#handleCommitException: commit failed: file: {}, can not fix position: origin: {}, real: {}", getPath(), commitPosition, realSize, cause);
+        }
+        return false;
+    }
+
+    public enum FileSegmentType {
+        COMMIT_LOG(0),
+        CONSUME_QUEUE(1),
+        INDEX(2);
+
+        private int type;
+
+        FileSegmentType(int type) {
+            this.type = type;
+        }
+
+        public int getType() {
+            return type;
+        }
+
+        public static FileSegmentType valueOf(int type) {
+            switch (type) {
+                case 0:
+                    return COMMIT_LOG;
+                case 1:
+                    return CONSUME_QUEUE;
+                case 2:
+                    return INDEX;
+                default:
+                    throw new IllegalStateException("Unexpected value: " + type);
+            }
+        }
+    }
+
+    protected static class TieredFileSegmentInputStream extends InputStream {
+
+        private final FileSegmentType fileType;
+        private final List<ByteBuffer> uploadBufferList;
+        private int bufferReadIndex = 0;
+        private int readOffset = 0;
+        // only used in commitLog
+        private long commitLogOffset;
+        private final ByteBuffer commitLogOffsetBuffer = ByteBuffer.allocate(8);
+        private final ByteBuffer codaBuffer;
+        private ByteBuffer curBuffer;
+        private final int contentLength;
+        private int readBytes = 0;
+
+        public TieredFileSegmentInputStream(FileSegmentType fileType, long startOffset,
+            List<ByteBuffer> uploadBufferList, ByteBuffer codaBuffer, int contentLength) {
+            this.fileType = fileType;
+            this.commitLogOffset = startOffset;
+            this.commitLogOffsetBuffer.putLong(0, startOffset);
+            this.uploadBufferList = uploadBufferList;
+            this.codaBuffer = codaBuffer;
+            this.contentLength = contentLength;
+            if (uploadBufferList.size() > 0) {
+                this.curBuffer = uploadBufferList.get(0);
+            }
+            if (fileType == FileSegmentType.INDEX && uploadBufferList.size() != 1) {
+                logger.error("[Bug]TieredFileSegmentInputStream: index file must have only one buffer");
+            }
+        }
+
+        public List<ByteBuffer> getUploadBufferList() {
+            return uploadBufferList;
+        }
+
+        public ByteBuffer getCodaBuffer() {
+            return codaBuffer;
+        }
+
+        @Override
+        public int available() {
+            return contentLength - readBytes;
+        }
+
+        @Override
+        public int read() {
+            if (bufferReadIndex >= uploadBufferList.size()) {
+                return readCoda();
+            }
+
+            int res;
+            switch (fileType) {
+                case COMMIT_LOG:
+                    if (readOffset >= curBuffer.remaining()) {
+                        bufferReadIndex++;
+                        if (bufferReadIndex >= uploadBufferList.size()) {
+                            return readCoda();
+                        }
+                        curBuffer = uploadBufferList.get(bufferReadIndex);
+                        commitLogOffset += readOffset;
+                        commitLogOffsetBuffer.putLong(0, commitLogOffset);
+                        readOffset = 0;
+                    }
+                    if (readOffset >= MessageBufferUtil.PHYSICAL_OFFSET_POSITION && readOffset < MessageBufferUtil.SYS_FLAG_OFFSET_POSITION) {
+                        res = commitLogOffsetBuffer.get(readOffset - MessageBufferUtil.PHYSICAL_OFFSET_POSITION) & 0xff;
+                        readOffset++;
+                    } else {
+                        res = curBuffer.get(readOffset++) & 0xff;
+                    }
+                    break;
+                case CONSUME_QUEUE:
+                    if (!curBuffer.hasRemaining()) {
+                        bufferReadIndex++;
+                        if (bufferReadIndex >= uploadBufferList.size()) {
+                            return -1;
+                        }
+                        curBuffer = uploadBufferList.get(bufferReadIndex);
+                    }
+                    res = curBuffer.get() & 0xff;
+                    break;
+                case INDEX:
+                    if (!curBuffer.hasRemaining()) {
+                        return -1;
+                    }
+                    res = curBuffer.get() & 0xff;
+                    break;
+                default:
+                    throw new IllegalStateException("unknown file type");
+            }
+            readBytes++;
+            return res;
+        }
+
+        private int readCoda() {
+            if (fileType != FileSegmentType.COMMIT_LOG || codaBuffer == null) {
+                return -1;
+            }
+            if (!codaBuffer.hasRemaining()) {
+                return -1;
+            }
+            readBytes++;
+            return codaBuffer.get() & 0xff;
+        }
+    }
+
+    public abstract String getPath();
+
+    public abstract long getSize();
+
+    protected abstract boolean exists();
+
+    protected abstract void createFile();
+
+    protected abstract void destroyFile();
+
+    protected abstract CompletableFuture<ByteBuffer> read0(long position, int length);
+
+    protected abstract CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position,
+        int length, boolean append);
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredIndexFile.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredIndexFile.java
new file mode 100644
index 000000000..0f423a4a3
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/container/TieredIndexFile.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.container;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.index.IndexHeader;
+import org.apache.rocketmq.store.logfile.DefaultMappedFile;
+import org.apache.rocketmq.store.logfile.MappedFile;
+import org.apache.rocketmq.store.tiered.common.AppendResult;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.common.TieredStoreExecutor;
+import org.apache.rocketmq.store.tiered.util.TieredStoreUtil;
+
+public class TieredIndexFile {
+    private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+    public static final int INDEX_FILE_BEGIN_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 4;
+    public static final int INDEX_FILE_END_MAGIC_CODE = 0xCCDDEEFF ^ 1880681586 + 8;
+    private static final int INDEX_FILE_HEADER_SIZE = 28;
+    private static final int INDEX_FILE_HASH_SLOT_SIZE = 8;
+    private static final int INDEX_FILE_HASH_ORIGIN_INDEX_SIZE = 32;
+    public static final int INDEX_FILE_HASH_COMPACT_INDEX_SIZE = 28;
+
+    public static final int INDEX_FILE_HEADER_MAGIC_CODE_POSITION = 0;
+    public static final int INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION = 4;
+    public static final int INDEX_FILE_HEADER_END_TIME_STAMP_POSITION = 12;
+    private static final int INDEX_FILE_HEADER_SLOT_NUM_POSITION = 20;
+    private static final int INDEX_FILE_HEADER_INDEX_NUM_POSITION = 24;
+
+    private static final String INDEX_FILE_DIR_NAME = "tiered_index_file";
+    private static final String CUR_INDEX_FILE_NAME = "0000";
+    private static final String PRE_INDEX_FILE_NAME = "1111";
+    private static final String COMPACT_FILE_NAME = "2222";
+
+    private final TieredMessageStoreConfig storeConfig;
+    private final TieredFileQueue fileQueue;
+    private final int maxHashSlotNum;
+    private final int maxIndexNum;
+    private final int fileMaxSize;
+    private final String curFilePath;
+    private final String preFilepath;
+    private MappedFile preMappedFile;
+    private MappedFile curMappedFile;
+
+    private ReentrantLock curFileLock = new ReentrantLock();
+    private Future<Void> inflightCompactFuture = CompletableFuture.completedFuture(null);
+
+    protected TieredIndexFile(TieredMessageStoreConfig storeConfig)
+        throws ClassNotFoundException, NoSuchMethodException, IOException {
+        this.storeConfig = storeConfig;
+        this.fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.INDEX, new MessageQueue(TieredStoreUtil.RMQ_SYS_TIERED_STORE_INDEX_TOPIC, storeConfig.getBrokerName(), 0), storeConfig);
+        if (fileQueue.getBaseOffset() == -1) {
+            fileQueue.setBaseOffset(0);
+        }
+        this.maxHashSlotNum = storeConfig.getTieredStoreIndexFileMaxHashSlotNum();
+        this.maxIndexNum = storeConfig.getTieredStoreIndexFileMaxIndexNum();
+
+        this.fileMaxSize = IndexHeader.INDEX_HEADER_SIZE + (this.maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE) + (this.maxIndexNum * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE) + 4;
+        this.curFilePath = storeConfig.getStorePathRootDir() + File.separator + INDEX_FILE_DIR_NAME + File.separator + CUR_INDEX_FILE_NAME;
+        this.preFilepath = storeConfig.getStorePathRootDir() + File.separator + INDEX_FILE_DIR_NAME + File.separator + PRE_INDEX_FILE_NAME;
+        initFile();
+        TieredStoreExecutor.COMMON_SCHEDULED_EXECUTOR.scheduleWithFixedDelay(() -> {
+            try {
+                curFileLock.lock();
+                try {
+                    synchronized (TieredIndexFile.class) {
+                        MappedByteBuffer mappedByteBuffer = curMappedFile.getMappedByteBuffer();
+                        int indexNum = mappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION);
+                        long lastIndexTime = mappedByteBuffer.getLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION);
+                        if (indexNum > 0 && System.currentTimeMillis() - lastIndexTime > storeConfig.getTieredStoreIndexFileRollingIdleInterval()) {
+                            mappedByteBuffer.putInt(fileMaxSize - 4, INDEX_FILE_END_MAGIC_CODE);
+                            rollingFile();
+                        }
+                        if (inflightCompactFuture.isDone() && preMappedFile != null && preMappedFile.isAvailable()) {
+                            inflightCompactFuture = TieredStoreExecutor.COMPACT_INDEX_FILE_EXECUTOR.submit(new CompactTask(storeConfig, preMappedFile, fileQueue), null);
+                        }
+                    }
+                } finally {
+                    curFileLock.unlock();
+                }
+            } catch (Throwable throwable) {
+                logger.error("TieredIndexFile: submit compact index file task failed:", throwable);
+            }
+        }, 10, 10, TimeUnit.SECONDS);
+    }
+
+    private static boolean isFileSealed(MappedFile mappedFile) {
+        return mappedFile.getMappedByteBuffer().getInt(mappedFile.getFileSize() - 4) == INDEX_FILE_END_MAGIC_CODE;
+    }
+
+    private void initIndexFileHeader(MappedFile mappedFile) {
+        MappedByteBuffer mappedByteBuffer = mappedFile.getMappedByteBuffer();
+        if (mappedByteBuffer.getInt(0) != INDEX_FILE_BEGIN_MAGIC_CODE) {
+            mappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, INDEX_FILE_BEGIN_MAGIC_CODE);
+            mappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, -1L);
+            mappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, -1L);
+            mappedByteBuffer.putInt(INDEX_FILE_HEADER_SLOT_NUM_POSITION, 0);
+            mappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, 0);
+            for (int i = 0; i < maxHashSlotNum; i++) {
+                mappedByteBuffer.putInt(INDEX_FILE_HEADER_SIZE + i * INDEX_FILE_HASH_SLOT_SIZE, -1);
+            }
+            mappedByteBuffer.putInt(fileMaxSize - 4, -1);
+        }
+    }
+
+    private void initFile() throws IOException {
+        curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
+        initIndexFileHeader(curMappedFile);
+        File preFile = new File(preFilepath);
+        boolean preFileExists = preFile.exists();
+        if (preFileExists) {
+            preMappedFile = new DefaultMappedFile(preFilepath, fileMaxSize);
+        }
+
+        if (isFileSealed(curMappedFile)) {
+            if (preFileExists) {
+                preFile.delete();
+            }
+            boolean rename = curMappedFile.renameTo(preFilepath);
+            if (rename) {
+                preMappedFile = curMappedFile;
+                curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
+                preFileExists = true;
+            }
+        }
+        if (preFileExists) {
+            synchronized (TieredIndexFile.class) {
+                if (inflightCompactFuture.isDone()) {
+                    inflightCompactFuture = TieredStoreExecutor.COMPACT_INDEX_FILE_EXECUTOR.submit(new CompactTask(storeConfig, preMappedFile, fileQueue), null);
+                }
+            }
+        }
+    }
+
+    public AppendResult append(MessageQueue mq, int topicId, String key, long offset, int size, long timeStamp) {
+        return putKey(mq, topicId, indexKeyHashMethod(buildKey(mq.getTopic(), key)), offset, size, timeStamp);
+    }
+
+    private boolean rollingFile() throws IOException {
+        File preFile = new File(preFilepath);
+        boolean preFileExists = preFile.exists();
+        if (!preFileExists) {
+            boolean rename = curMappedFile.renameTo(preFilepath);
+            if (rename) {
+                preMappedFile = curMappedFile;
+                curMappedFile = new DefaultMappedFile(curFilePath, fileMaxSize);
+                initIndexFileHeader(curMappedFile);
+                tryToCompactPreFile();
+                return true;
+            } else {
+                logger.error("TieredIndexFile#rollingFile: rename current file failed");
+                return false;
+            }
+        }
+        tryToCompactPreFile();
+        return false;
+    }
+
+    private void tryToCompactPreFile() throws IOException {
+        synchronized (TieredIndexFile.class) {
+            if (inflightCompactFuture.isDone()) {
+                inflightCompactFuture = TieredStoreExecutor.COMPACT_INDEX_FILE_EXECUTOR.submit(new CompactTask(storeConfig, preMappedFile, fileQueue), null);
+            }
+        }
+    }
+
+    private AppendResult putKey(MessageQueue mq, int topicId, int hashCode, long offset, int size, long timeStamp) {
+        curFileLock.lock();
+        try {
+            if (isFileSealed(curMappedFile) && !rollingFile()) {
+                return AppendResult.FILE_FULL;
+            }
+
+            MappedByteBuffer mappedByteBuffer = curMappedFile.getMappedByteBuffer();
+
+            int slotPosition = hashCode % maxHashSlotNum;
+            int slotOffset = INDEX_FILE_HEADER_SIZE + slotPosition * INDEX_FILE_HASH_SLOT_SIZE;
+
+            int slotValue = mappedByteBuffer.getInt(slotOffset);
+
+            long beginTimeStamp = mappedByteBuffer.getLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION);
+            if (beginTimeStamp == -1) {
+                mappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, timeStamp);
+                beginTimeStamp = timeStamp;
+            }
+
+            int indexCount = mappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION);
+            int indexOffset = INDEX_FILE_HEADER_SIZE + maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE
+                + indexCount * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE;
+
+            int timeDiff = (int) (timeStamp - beginTimeStamp);
+
+            // put hash index
+            mappedByteBuffer.putInt(indexOffset, hashCode);
+            mappedByteBuffer.putInt(indexOffset + 4, topicId);
+            mappedByteBuffer.putInt(indexOffset + 4 + 4, mq.getQueueId());
+            mappedByteBuffer.putLong(indexOffset + 4 + 4 + 4, offset);
+            mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8, size);
+            mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8 + 4, timeDiff);
+            mappedByteBuffer.putInt(indexOffset + 4 + 4 + 4 + 8 + 4 + 4, slotValue);
+
+            // put hash slot
+            mappedByteBuffer.putInt(slotOffset, indexCount);
+
+            // put header
+            indexCount += 1;
+            mappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, indexCount);
+            mappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, timeStamp);
+            if (indexCount == maxIndexNum) {
+                mappedByteBuffer.putInt(fileMaxSize - 4, INDEX_FILE_END_MAGIC_CODE);
+                rollingFile();
+            }
+            return AppendResult.SUCCESS;
+        } catch (Exception e) {
+            logger.error("TieredIndexFile#putKey: put key failed:", e);
+            return AppendResult.IO_ERROR;
+        } finally {
+            curFileLock.unlock();
+        }
+    }
+
+    public CompletableFuture<List<Pair<Long, ByteBuffer>>> queryAsync(String topic, String key, long beginTime, long endTime) {
+        int hashCode = indexKeyHashMethod(buildKey(topic, key));
+        int slotPosition = hashCode % maxHashSlotNum;
+        List<TieredFileSegment> fileSegmentList = fileQueue.getFileListByTime(beginTime, endTime);
+        CompletableFuture<List<Pair<Long, ByteBuffer>>> future = null;
+        for (int i = fileSegmentList.size() - 1; i >= 0; i--) {
+            TieredFileSegment fileSegment = fileSegmentList.get(i);
+            CompletableFuture<ByteBuffer> tmpFuture = fileSegment.readAsync(INDEX_FILE_HEADER_SIZE + slotPosition * INDEX_FILE_HASH_SLOT_SIZE, INDEX_FILE_HASH_SLOT_SIZE)
+                .thenCompose(slotBuffer -> {
+                    int indexPosition = slotBuffer.getInt();
+                    if (indexPosition == -1) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+
+                    int indexSize = slotBuffer.getInt();
+                    if (indexSize <= 0) {
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    return fileSegment.readAsync(indexPosition, indexSize);
+                });
+            if (future == null) {
+                future = tmpFuture.thenApply(indexBuffer -> {
+                    List<Pair<Long, ByteBuffer>> result = new ArrayList<>();
+                    if (indexBuffer != null) {
+                        result.add(Pair.of(fileSegment.getBeginTimestamp(), indexBuffer));
+                    }
+                    return result;
+                });
+            } else {
+                future = future.thenCombine(tmpFuture, (indexList, indexBuffer) -> {
+                    if (indexBuffer != null) {
+                        indexList.add(Pair.of(fileSegment.getBeginTimestamp(), indexBuffer));
+                    }
+                    return indexList;
+                });
+            }
+        }
+        return future == null ? CompletableFuture.completedFuture(new ArrayList<>()) : future;
+    }
+
+    public static String buildKey(String topic, String key) {
+        return topic + "#" + key;
+    }
+
+    public static int indexKeyHashMethod(String key) {
+        int keyHash = key.hashCode();
+        int keyHashPositive = Math.abs(keyHash);
+        if (keyHashPositive < 0)
+            keyHashPositive = 0;
+        return keyHashPositive;
+    }
+
+    public void commit(boolean sync) {
+        fileQueue.commit(sync);
+        if (sync) {
+            try {
+                inflightCompactFuture.get();
+            } catch (Exception ignore) {
+            }
+        }
+    }
+
+    public void cleanExpiredFile(long expireTimestamp) {
+        fileQueue.cleanExpiredFile(expireTimestamp);
+    }
+
+    public void destroyExpiredFile() {
+        fileQueue.destroyExpiredFile();
+    }
+
+    public void destroy() {
+        inflightCompactFuture.cancel(true);
+        if (preMappedFile != null) {
+            preMappedFile.destroy(-1);
+        }
+        if (curMappedFile != null) {
+            curMappedFile.destroy(-1);
+        }
+        String compactFilePath = storeConfig.getStorePathRootDir() + File.separator + INDEX_FILE_DIR_NAME + File.separator + COMPACT_FILE_NAME;
+        File compactFile = new File(compactFilePath);
+        if (compactFile.exists()) {
+            compactFile.delete();
+        }
+        fileQueue.destroy();
+    }
+
+    static class CompactTask implements Runnable {
+        private final TieredMessageStoreConfig storeConfig;
+
+        private final int maxHashSlotNum;
+        private final int maxIndexNum;
+        private final int fileMaxSize;
+        private MappedFile originFile;
+        private TieredFileQueue fileQueue;
+        private final MappedFile compactFile;
+
+        public CompactTask(TieredMessageStoreConfig storeConfig, MappedFile originFile,
+            TieredFileQueue fileQueue) throws IOException {
+            this.storeConfig = storeConfig;
+            this.maxHashSlotNum = storeConfig.getTieredStoreIndexFileMaxHashSlotNum();
+            this.maxIndexNum = storeConfig.getTieredStoreIndexFileMaxIndexNum();
+            this.originFile = originFile;
+            this.fileQueue = fileQueue;
+            String compactFilePath = storeConfig.getStorePathRootDir() + File.separator + INDEX_FILE_DIR_NAME + File.separator + COMPACT_FILE_NAME;
+            fileMaxSize = IndexHeader.INDEX_HEADER_SIZE + (storeConfig.getTieredStoreIndexFileMaxHashSlotNum() * INDEX_FILE_HASH_SLOT_SIZE) + (storeConfig.getTieredStoreIndexFileMaxIndexNum() * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE) + 4;
+            // TODO check magic code, upload immediately when compact complete
+            File compactFile = new File(compactFilePath);
+            if (compactFile.exists()) {
+                compactFile.delete();
+            }
+            this.compactFile = new DefaultMappedFile(compactFilePath, fileMaxSize);
+        }
+
+        @Override
+        public void run() {
+            try {
+                compact();
+            } catch (Throwable throwable) {
+                logger.error("TieredIndexFile#compactTask: compact index file failed:", throwable);
+            }
+        }
+
+        public void compact() {
+            if (!isFileSealed(originFile)) {
+                logger.error("[Bug]TieredIndexFile#CompactTask#compact: try to compact unsealed file");
+                originFile.destroy(-1);
+                compactFile.destroy(-1);
+                return;
+            }
+
+            buildCompactFile();
+            fileQueue.append(compactFile.getMappedByteBuffer());
+            fileQueue.commit(true);
+            compactFile.destroy(-1);
+            originFile.destroy(-1);
+        }
+
+        private void buildCompactFile() {
+            MappedByteBuffer originMappedByteBuffer = originFile.getMappedByteBuffer();
+            MappedByteBuffer compactMappedByteBuffer = compactFile.getMappedByteBuffer();
+            compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, INDEX_FILE_BEGIN_MAGIC_CODE);
+            compactMappedByteBuffer.putLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION, originMappedByteBuffer.getLong(INDEX_FILE_HEADER_BEGIN_TIME_STAMP_POSITION));
+            compactMappedByteBuffer.putLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION, originMappedByteBuffer.getLong(INDEX_FILE_HEADER_END_TIME_STAMP_POSITION));
+            compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_SLOT_NUM_POSITION, maxHashSlotNum);
+            compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION, originMappedByteBuffer.getInt(INDEX_FILE_HEADER_INDEX_NUM_POSITION));
+
+            int rePutSlotValue = INDEX_FILE_HEADER_SIZE + (maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE);
+            for (int i = 0; i < maxHashSlotNum; i++) {
+                int slotOffset = INDEX_FILE_HEADER_SIZE + i * INDEX_FILE_HASH_SLOT_SIZE;
+                int slotValue = originMappedByteBuffer.getInt(slotOffset);
+                if (slotValue != -1) {
+                    int indexTotalSize = 0;
+                    int indexPosition = slotValue;
+                    while (indexPosition >= 0 && indexPosition < maxIndexNum) {
+                        int indexOffset = INDEX_FILE_HEADER_SIZE + maxHashSlotNum * INDEX_FILE_HASH_SLOT_SIZE
+                            + indexPosition * INDEX_FILE_HASH_ORIGIN_INDEX_SIZE;
+                        int rePutIndexOffset = rePutSlotValue + indexTotalSize;
+
+                        compactMappedByteBuffer.putInt(rePutIndexOffset, originMappedByteBuffer.getInt(indexOffset));
+                        compactMappedByteBuffer.putInt(rePutIndexOffset + 4, originMappedByteBuffer.getInt(indexOffset + 4));
+                        compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 4, originMappedByteBuffer.getInt(indexOffset + 4 + 4));
+                        compactMappedByteBuffer.putLong(rePutIndexOffset + 4 + 4 + 4, originMappedByteBuffer.getLong(indexOffset + 4 + 4 + 4));
+                        compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 4 + 4 + 8, originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8));
+                        compactMappedByteBuffer.putInt(rePutIndexOffset + 4 + 4 + 4 + 8 + 4, originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4));
+
+                        indexTotalSize += INDEX_FILE_HASH_COMPACT_INDEX_SIZE;
+                        indexPosition = originMappedByteBuffer.getInt(indexOffset + 4 + 4 + 4 + 8 + 4 + 4);
+                    }
+                    compactMappedByteBuffer.putInt(slotOffset, rePutSlotValue);
+                    compactMappedByteBuffer.putInt(slotOffset + 4, indexTotalSize);
+                    rePutSlotValue += indexTotalSize;
+                }
+            }
+            compactMappedByteBuffer.putInt(INDEX_FILE_HEADER_MAGIC_CODE_POSITION, INDEX_FILE_END_MAGIC_CODE);
+            compactMappedByteBuffer.putInt(rePutSlotValue, INDEX_FILE_BEGIN_MAGIC_CODE);
+            compactMappedByteBuffer.limit(rePutSlotValue + 4);
+        }
+    }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/exception/TieredStoreErrorCode.java
similarity index 66%
copy from tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
copy to tieredstore/src/main/java/org/apache/rocketmq/store/tiered/exception/TieredStoreErrorCode.java
index c85317177..691d869fa 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/exception/TieredStoreErrorCode.java
@@ -14,18 +14,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.store.tiered.common;
+package org.apache.rocketmq.store.tiered.exception;
 
-import java.io.File;
-
-public class TieredMessageStoreConfig {
-    private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
-
-    public String getStorePathRootDir() {
-        return storePathRootDir;
-    }
-
-    public void setStorePathRootDir(String storePathRootDir) {
-        this.storePathRootDir = storePathRootDir;
-    }
+public enum TieredStoreErrorCode {
+    ILLEGAL_OFFSET,
+    ILLEGAL_PARAM,
+    DOWNLOAD_LENGTH_NOT_CORRECT,
+    NO_NEW_DATA,
+    STORAGE_PROVIDER_ERROR,
+    IO_ERROR,
+    UNKNOWN
 }
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/exception/TieredStoreException.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/exception/TieredStoreException.java
new file mode 100644
index 000000000..f29840228
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/exception/TieredStoreException.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.exception;
+
+public class TieredStoreException extends RuntimeException {
+    private TieredStoreErrorCode errorCode;
+    private int position = -1;
+
+    private String requestId;
+
+    public TieredStoreException(TieredStoreErrorCode errorCode, String errorMessage) {
+        super(errorMessage);
+        this.errorCode = errorCode;
+    }
+
+    public TieredStoreException(TieredStoreErrorCode errorCode, String errorMessage, String requestId) {
+        super(errorMessage);
+        this.errorCode = errorCode;
+        this.requestId = requestId;
+    }
+
+    public TieredStoreErrorCode getErrorCode() {
+        return errorCode;
+    }
+
+    public void setErrorCode(TieredStoreErrorCode errorCode) {
+        this.errorCode = errorCode;
+    }
+
+    public int getPosition() {
+        return position;
+    }
+
+    public void setPosition(int position) {
+        this.position = position;
+    }
+
+    @Override
+    public String toString() {
+        String errStr = super.toString();
+        if (requestId != null) {
+            errStr += " requestId: " + requestId;
+        }
+        if (position != -1) {
+            errStr += ", position: " + position;
+        }
+        return errStr;
+    }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataManager.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataManager.java
new file mode 100644
index 000000000..da9bb5ce2
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataManager.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.metadata;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import javax.annotation.Nullable;
+import org.apache.rocketmq.common.ConfigManager;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.container.TieredFileSegment;
+
+public class TieredMetadataManager extends ConfigManager implements TieredMetadataStore {
+    private final AtomicInteger maxTopicId = new AtomicInteger(0);
+    private final ConcurrentMap<String /*topic*/, TopicMetadata> topicMetadataTable = new ConcurrentHashMap<>(1024);
+    private final ConcurrentMap<String /*topic*/, ConcurrentMap<Integer /*queueId*/, QueueMetadata>> queueMetadataTable = new ConcurrentHashMap<>(1024);
+    private final ConcurrentMap<MessageQueue, ConcurrentMap<Long /*baseOffset*/, FileSegmentMetadata>> commitLogFileSegmentTable = new ConcurrentHashMap<>(1024);
+    private final ConcurrentMap<MessageQueue, ConcurrentMap<Long /*baseOffset*/, FileSegmentMetadata>> consumeQueueFileSegmentTable = new ConcurrentHashMap<>(1024);
+    private final ConcurrentMap<MessageQueue, ConcurrentMap<Long /*baseOffset*/, FileSegmentMetadata>> indexFileSegmentTable = new ConcurrentHashMap<>(1024);
+    private final TieredMessageStoreConfig storeConfig;
+
+    public TieredMetadataManager(TieredMessageStoreConfig storeConfig) {
+        this.storeConfig = storeConfig;
+    }
+
+    @Override
+    public String encode() {
+        return encode(false);
+    }
+
+    @Override
+    public String encode(boolean prettyFormat) {
+        TieredMetadataSerializeWrapper dataWrapper = new TieredMetadataSerializeWrapper();
+        dataWrapper.setMaxTopicId(maxTopicId);
+        dataWrapper.setTopicMetadataTable(topicMetadataTable);
+        dataWrapper.setQueueMetadataTable(new HashMap<>(queueMetadataTable));
+        return dataWrapper.toJson(false);
+    }
+
+    @Override
+    public String configFilePath() {
+        return storeConfig.getStorePathRootDir() + File.separator + "config" + File.separator + "tieredStoreMetadata.json";
+    }
+
+    @Override
+    public void decode(String jsonString) {
+        if (jsonString != null) {
+            TieredMetadataSerializeWrapper dataWrapper =
+                TieredMetadataSerializeWrapper.fromJson(jsonString, TieredMetadataSerializeWrapper.class);
+            if (dataWrapper != null) {
+                maxTopicId.set(dataWrapper.getMaxTopicId().get());
+                topicMetadataTable.putAll(dataWrapper.getTopicMetadataTable());
+                dataWrapper.getQueueMetadataTable()
+                    .forEach((topic, map) -> queueMetadataTable.put(topic, new ConcurrentHashMap<>(map)));
+            }
+        }
+    }
+
+    @Override
+    public void setMaxTopicId(int maxTopicId) {
+        this.maxTopicId.set(maxTopicId);
+    }
+
+    @Nullable
+    @Override
+    public TopicMetadata getTopic(String topic) {
+        return topicMetadataTable.get(topic);
+    }
+
+    @Override
+    public void iterateTopic(Consumer<TopicMetadata> callback) {
+        topicMetadataTable.values().forEach(callback);
+    }
+
+    @Override
+    public TopicMetadata addTopic(String topic, long reserveTime) {
+        TopicMetadata old = getTopic(topic);
+        if (old != null) {
+            return old;
+        }
+        TopicMetadata metadata = new TopicMetadata(maxTopicId.getAndIncrement(), topic, reserveTime);
+        topicMetadataTable.put(topic, metadata);
+        return metadata;
+    }
+
+    @Override
+    public void updateTopicReserveTime(String topic, long reserveTime) {
+        TopicMetadata metadata = getTopic(topic);
+        if (metadata == null) {
+            return;
+        }
+        metadata.setReserveTime(reserveTime);
+        metadata.setUpdateTimestamp(System.currentTimeMillis());
+    }
+
+    @Override
+    public void updateTopicStatus(String topic, int status) {
+        TopicMetadata metadata = getTopic(topic);
+        if (metadata == null) {
+            return;
+        }
+        metadata.setStatus(status);
+        metadata.setUpdateTimestamp(System.currentTimeMillis());
+    }
+
+    @Override
+    public void deleteTopic(String topic) {
+        topicMetadataTable.remove(topic);
+    }
+
+    @Nullable
+    @Override
+    public QueueMetadata getQueue(MessageQueue queue) {
+        if (!queueMetadataTable.containsKey(queue.getTopic())) {
+            return null;
+        }
+        return queueMetadataTable.get(queue.getTopic())
+            .get(queue.getQueueId());
+    }
+
+    @Override
+    public void iterateQueue(String topic, Consumer<QueueMetadata> callback) {
+        queueMetadataTable.get(topic)
+            .values()
+            .forEach(callback);
+    }
+
+    @Override
+    public QueueMetadata addQueue(MessageQueue queue, long baseOffset) {
+        QueueMetadata old = getQueue(queue);
+        if (old != null) {
+            return old;
+        }
+        QueueMetadata metadata = new QueueMetadata(queue, baseOffset, baseOffset);
+        queueMetadataTable.computeIfAbsent(queue.getTopic(), topic -> new ConcurrentHashMap<>())
+            .put(queue.getQueueId(), metadata);
+        return metadata;
+    }
+
+    @Override
+    public void updateQueue(QueueMetadata metadata) {
+        MessageQueue queue = metadata.getQueue();
+        if (queueMetadataTable.containsKey(queue.getTopic())) {
+            ConcurrentMap<Integer, QueueMetadata> metadataMap = queueMetadataTable.get(queue.getTopic());
+            if (metadataMap.containsKey(queue.getQueueId())) {
+                metadata.setUpdateTimestamp(System.currentTimeMillis());
+                metadataMap.put(queue.getQueueId(), metadata);
+            }
+        }
+    }
+
+    @Override
+    public void deleteQueue(MessageQueue queue) {
+        if (queueMetadataTable.containsKey(queue.getTopic())) {
+            queueMetadataTable.get(queue.getTopic())
+                .remove(queue.getQueueId());
+        }
+    }
+
+    @Nullable
+    @Override
+    public FileSegmentMetadata getFileSegment(TieredFileSegment fileSegment) {
+        switch (fileSegment.getFileType()) {
+            case COMMIT_LOG:
+                if (commitLogFileSegmentTable.containsKey(fileSegment.getMessageQueue())) {
+                    return commitLogFileSegmentTable.get(fileSegment.getMessageQueue())
+                        .get(fileSegment.getBaseOffset());
+                }
+                break;
+            case CONSUME_QUEUE:
+                if (consumeQueueFileSegmentTable.containsKey(fileSegment.getMessageQueue())) {
+                    return consumeQueueFileSegmentTable.get(fileSegment.getMessageQueue())
+                        .get(fileSegment.getBaseOffset());
+                }
+                break;
+            case INDEX:
+                if (indexFileSegmentTable.containsKey(fileSegment.getMessageQueue())) {
+                    return indexFileSegmentTable.get(fileSegment.getMessageQueue())
+                        .get(fileSegment.getBaseOffset());
+                }
+                break;
+        }
+        return null;
+    }
+
+    @Override
+    public void iterateFileSegment(Consumer<FileSegmentMetadata> callback) {
+        commitLogFileSegmentTable.forEach((mq, map) -> map.forEach((offset, metadata) -> callback.accept(metadata)));
+        consumeQueueFileSegmentTable.forEach((mq, map) -> map.forEach((offset, metadata) -> callback.accept(metadata)));
+        indexFileSegmentTable.forEach((mq, map) -> map.forEach((offset, metadata) -> callback.accept(metadata)));
+    }
+
+    @Override
+    public void iterateFileSegment(TieredFileSegment.FileSegmentType type, String topic, int queueId,
+        Consumer<FileSegmentMetadata> callback) {
+        MessageQueue messageQueue = new MessageQueue(topic, storeConfig.getBrokerName(), queueId);
+        switch (type) {
+            case COMMIT_LOG:
+                if (commitLogFileSegmentTable.containsKey(messageQueue)) {
+                    commitLogFileSegmentTable.get(messageQueue)
+                        .forEach((offset, metadata) -> callback.accept(metadata));
+                }
+                break;
+            case CONSUME_QUEUE:
+                if (consumeQueueFileSegmentTable.containsKey(messageQueue)) {
+                    consumeQueueFileSegmentTable.get(messageQueue)
+                        .forEach((offset, metadata) -> callback.accept(metadata));
+                }
+                break;
+            case INDEX:
+                if (indexFileSegmentTable.containsKey(messageQueue)) {
+                    indexFileSegmentTable.get(messageQueue)
+                        .forEach((offset, metadata) -> callback.accept(metadata));
+                }
+                break;
+        }
+    }
+
+    @Override
+    public FileSegmentMetadata updateFileSegment(TieredFileSegment fileSegment) {
+        FileSegmentMetadata old = getFileSegment(fileSegment);
+
+        if (old == null) {
+            FileSegmentMetadata metadata = new FileSegmentMetadata(fileSegment.getMessageQueue(),
+                fileSegment.getFileType().getType(),
+                fileSegment.getBaseOffset(),
+                fileSegment.getPath());
+            if (fileSegment.isClosed()) {
+                metadata.setStatus(FileSegmentMetadata.STATUS_DELETED);
+            }
+            metadata.setBeginTimestamp(fileSegment.getBeginTimestamp());
+            metadata.setEndTimestamp(fileSegment.getEndTimestamp());
+            switch (fileSegment.getFileType()) {
+                case COMMIT_LOG:
+                    commitLogFileSegmentTable.computeIfAbsent(fileSegment.getMessageQueue(), mq -> new ConcurrentHashMap<>())
+                        .put(fileSegment.getBaseOffset(), metadata);
+                    break;
+                case CONSUME_QUEUE:
+                    consumeQueueFileSegmentTable.computeIfAbsent(fileSegment.getMessageQueue(), mq -> new ConcurrentHashMap<>())
+                        .put(fileSegment.getBaseOffset(), metadata);
+                    break;
+                case INDEX:
+                    indexFileSegmentTable.computeIfAbsent(fileSegment.getMessageQueue(), mq -> new ConcurrentHashMap<>())
+                        .put(fileSegment.getBaseOffset(), metadata);
+                    break;
+            }
+            return metadata;
+        }
+
+        if (old.getStatus() == FileSegmentMetadata.STATUS_NEW && fileSegment.isFull() && !fileSegment.needCommit()) {
+            old.setStatus(FileSegmentMetadata.STATUS_SEALED);
+            old.setSealTimestamp(System.currentTimeMillis());
+        }
+        if (fileSegment.isClosed()) {
+            old.setStatus(FileSegmentMetadata.STATUS_DELETED);
+        }
+        old.setSize(fileSegment.getCommitPosition());
+        old.setBeginTimestamp(fileSegment.getBeginTimestamp());
+        old.setEndTimestamp(fileSegment.getEndTimestamp());
+        return old;
+    }
+
+    @Override
+    public void deleteFileSegment(MessageQueue mq) {
+        commitLogFileSegmentTable.remove(mq);
+        consumeQueueFileSegmentTable.remove(mq);
+        indexFileSegmentTable.remove(mq);
+    }
+
+    @Override
+    public void deleteFileSegment(TieredFileSegment fileSegment) {
+        switch (fileSegment.getFileType()) {
+            case COMMIT_LOG:
+                if (commitLogFileSegmentTable.containsKey(fileSegment.getMessageQueue())) {
+                    commitLogFileSegmentTable.get(fileSegment.getMessageQueue())
+                        .remove(fileSegment.getBaseOffset());
+                }
+                break;
+            case CONSUME_QUEUE:
+                if (consumeQueueFileSegmentTable.containsKey(fileSegment.getMessageQueue())) {
+                    consumeQueueFileSegmentTable.get(fileSegment.getMessageQueue())
+                        .remove(fileSegment.getBaseOffset());
+                }
+                break;
+            case INDEX:
+                if (indexFileSegmentTable.containsKey(fileSegment.getMessageQueue())) {
+                    indexFileSegmentTable.get(fileSegment.getMessageQueue())
+                        .remove(fileSegment.getBaseOffset());
+                }
+                break;
+        }
+    }
+
+    @Override
+    public void destroy() {
+        maxTopicId.set(0);
+        topicMetadataTable.clear();
+        queueMetadataTable.clear();
+        commitLogFileSegmentTable.clear();
+        consumeQueueFileSegmentTable.clear();
+        indexFileSegmentTable.clear();
+    }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataSerializeWrapper.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataSerializeWrapper.java
similarity index 96%
rename from tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataSerializeWrapper.java
rename to tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataSerializeWrapper.java
index e4e068aff..82f969e24 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataSerializeWrapper.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataSerializeWrapper.java
@@ -20,7 +20,7 @@ import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.rocketmq.remoting.protocol.RemotingSerializable;
 
-public class TieredStoreMetadataSerializeWrapper extends RemotingSerializable {
+public class TieredMetadataSerializeWrapper extends RemotingSerializable {
     private AtomicInteger maxTopicId;
     private Map<String /*topic*/, TopicMetadata> topicMetadataTable;
     private Map<String /*topic*/, Map<Integer /*queueId*/, QueueMetadata>> queueMetadataTable;
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataStore.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataStore.java
similarity index 59%
rename from tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataStore.java
rename to tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataStore.java
index 7701258af..be746c7ea 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataStore.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredMetadataStore.java
@@ -19,20 +19,66 @@ package org.apache.rocketmq.store.tiered.metadata;
 import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.container.TieredFileSegment;
+
+public interface TieredMetadataStore {
+    /**
+     * Topic metadata operation
+     *
+     * @see org.apache.rocketmq.store.tiered.metadata.TopicMetadata
+     */
+    void setMaxTopicId(int maxTopicId);
 
-public interface TieredStoreMetadataStore {
     @Nullable
     TopicMetadata getTopic(String topic);
+
     void iterateTopic(Consumer<TopicMetadata> callback);
+
     TopicMetadata addTopic(String topic, long reserveTime);
+
     void updateTopicReserveTime(String topic, long reserveTime);
+
     void updateTopicStatus(String topic, int status);
+
     void deleteTopic(String topic);
 
+    /**
+     * Queue metadata operation
+     *
+     * @see org.apache.rocketmq.store.tiered.metadata.QueueMetadata
+     */
     @Nullable
     QueueMetadata getQueue(MessageQueue queue);
+
     void iterateQueue(String topic, Consumer<QueueMetadata> callback);
+
     QueueMetadata addQueue(MessageQueue queue, long baseOffset);
+
     void updateQueue(QueueMetadata metadata);
+
     void deleteQueue(MessageQueue queue);
+
+    /**
+     * File segment metadata operation
+     *
+     * @see org.apache.rocketmq.store.tiered.metadata.FileSegmentMetadata
+     */
+    @Nullable
+    FileSegmentMetadata getFileSegment(TieredFileSegment fileSegment);
+
+    void iterateFileSegment(Consumer<FileSegmentMetadata> callback);
+
+    void iterateFileSegment(TieredFileSegment.FileSegmentType type, String topic, int queueId,
+        Consumer<FileSegmentMetadata> callback);
+
+    FileSegmentMetadata updateFileSegment(TieredFileSegment fileSegment);
+
+    void deleteFileSegment(MessageQueue mq);
+
+    void deleteFileSegment(TieredFileSegment fileSegment);
+
+    /**
+     * Clean all metadata
+     */
+    void destroy();
 }
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataManager.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataManager.java
deleted file mode 100644
index e4f241af1..000000000
--- a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/metadata/TieredStoreMetadataManager.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.store.tiered.metadata;
-
-import java.io.File;
-import java.util.HashMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Consumer;
-import javax.annotation.Nullable;
-import org.apache.rocketmq.common.ConfigManager;
-import org.apache.rocketmq.common.message.MessageQueue;
-import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
-
-public class TieredStoreMetadataManager extends ConfigManager implements TieredStoreMetadataStore {
-    private final AtomicInteger maxTopicId = new AtomicInteger(0);
-    private final ConcurrentMap<String /*topic*/, TopicMetadata> topicMetadataTable = new ConcurrentHashMap<>(1024);
-    private final ConcurrentMap<String /*topic*/, ConcurrentMap<Integer /*queueId*/, QueueMetadata>> queueMetadataTable = new ConcurrentHashMap<>(1024);
-    private final TieredMessageStoreConfig storeConfig;
-
-    public TieredStoreMetadataManager(TieredMessageStoreConfig storeConfig) {
-        this.storeConfig = storeConfig;
-    }
-    @Override
-    public String encode() {
-        return encode(false);
-    }
-
-    @Override
-    public String encode(boolean prettyFormat) {
-        TieredStoreMetadataSerializeWrapper dataWrapper = new TieredStoreMetadataSerializeWrapper();
-        dataWrapper.setMaxTopicId(maxTopicId);
-        dataWrapper.setTopicMetadataTable(topicMetadataTable);
-        dataWrapper.setQueueMetadataTable(new HashMap<>(queueMetadataTable));
-        return dataWrapper.toJson(false);
-    }
-
-    @Override
-    public String configFilePath() {
-        return storeConfig.getStorePathRootDir() + File.separator + "config" + File.separator + "tieredStoreMetadata.json";
-    }
-
-    @Override
-    public void decode(String jsonString) {
-        if (jsonString != null) {
-            TieredStoreMetadataSerializeWrapper dataWrapper =
-                TieredStoreMetadataSerializeWrapper.fromJson(jsonString, TieredStoreMetadataSerializeWrapper.class);
-            if (dataWrapper != null) {
-                maxTopicId.set(dataWrapper.getMaxTopicId().get());
-                topicMetadataTable.putAll(dataWrapper.getTopicMetadataTable());
-                dataWrapper.getQueueMetadataTable()
-                    .forEach((topic, map) -> queueMetadataTable.put(topic, new ConcurrentHashMap<>(map)));
-            }
-        }
-    }
-
-    @Override
-    @Nullable
-    public TopicMetadata getTopic(String topic) {
-        return topicMetadataTable.get(topic);
-    }
-
-    @Override
-    public void iterateTopic(Consumer<TopicMetadata> callback) {
-        topicMetadataTable.values().forEach(callback);
-    }
-
-    @Override
-    public TopicMetadata addTopic(String topic, long reserveTime) {
-        TopicMetadata old = getTopic(topic);
-        if (old != null) {
-            return old;
-        }
-        TopicMetadata metadata = new TopicMetadata(maxTopicId.getAndIncrement(), topic, reserveTime);
-        topicMetadataTable.put(topic, metadata);
-        return metadata;
-    }
-
-    @Override
-    public void updateTopicReserveTime(String topic, long reserveTime) {
-        TopicMetadata metadata = getTopic(topic);
-        if (metadata == null) {
-            return;
-        }
-        metadata.setReserveTime(reserveTime);
-        metadata.setUpdateTimestamp(System.currentTimeMillis());
-    }
-
-    @Override
-    public void updateTopicStatus(String topic, int status) {
-        TopicMetadata metadata = getTopic(topic);
-        if (metadata == null) {
-            return;
-        }
-        metadata.setStatus(status);
-        metadata.setUpdateTimestamp(System.currentTimeMillis());
-    }
-
-    @Override
-    public void deleteTopic(String topic) {
-        topicMetadataTable.remove(topic);
-    }
-
-    @Override
-    @Nullable
-    public QueueMetadata getQueue(MessageQueue queue) {
-        if (!queueMetadataTable.containsKey(queue.getTopic())) {
-            return null;
-        }
-        return queueMetadataTable.get(queue.getTopic())
-            .get(queue.getQueueId());
-    }
-
-    @Override
-    public void iterateQueue(String topic, Consumer<QueueMetadata> callback) {
-        queueMetadataTable.get(topic)
-            .values()
-            .forEach(callback);
-    }
-
-    @Override
-    public QueueMetadata addQueue(MessageQueue queue, long baseOffset) {
-        QueueMetadata old = getQueue(queue);
-        if (old != null) {
-            return old;
-        }
-        QueueMetadata metadata = new QueueMetadata(queue, baseOffset, baseOffset);
-        queueMetadataTable.computeIfAbsent(queue.getTopic(), topic -> new ConcurrentHashMap<>())
-            .put(queue.getQueueId(), metadata);
-        return metadata;
-    }
-
-    @Override
-    public void updateQueue(QueueMetadata metadata) {
-        MessageQueue queue = metadata.getQueue();
-        if (queueMetadataTable.containsKey(queue.getTopic())) {
-            ConcurrentMap<Integer, QueueMetadata> metadataMap = queueMetadataTable.get(queue.getTopic());
-            if (metadataMap.containsKey(queue.getQueueId())) {
-                metadata.setUpdateTimestamp(System.currentTimeMillis());
-                metadataMap.put(queue.getQueueId(), metadata);
-            }
-        }
-    }
-
-    @Override
-    public void deleteQueue(MessageQueue queue) {
-        if (queueMetadataTable.containsKey(queue.getTopic())) {
-            queueMetadataTable.get(queue.getTopic())
-                .remove(queue.getQueueId());
-        }
-    }
-}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/CQItemBufferUtil.java
similarity index 65%
copy from tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
copy to tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/CQItemBufferUtil.java
index c85317177..9a17fcf72 100644
--- a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/common/TieredMessageStoreConfig.java
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/CQItemBufferUtil.java
@@ -14,18 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.rocketmq.store.tiered.common;
+package org.apache.rocketmq.store.tiered.util;
 
-import java.io.File;
+import java.nio.ByteBuffer;
 
-public class TieredMessageStoreConfig {
-    private String storePathRootDir = System.getProperty("user.home") + File.separator + "store";
+public class CQItemBufferUtil {
+    public static long getCommitLogOffset(ByteBuffer cqItem) {
+        return cqItem.getLong(cqItem.position());
+    }
 
-    public String getStorePathRootDir() {
-        return storePathRootDir;
+    public static int getSize(ByteBuffer cqItem) {
+        return cqItem.getInt(cqItem.position() + 8);
     }
 
-    public void setStorePathRootDir(String storePathRootDir) {
-        this.storePathRootDir = storePathRootDir;
+    public static long getTagCode(ByteBuffer cqItem) {
+        return cqItem.getLong(cqItem.position() + 12);
     }
 }
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/MessageBufferUtil.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/MessageBufferUtil.java
new file mode 100644
index 000000000..7306a12b3
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/MessageBufferUtil.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.util;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.tiered.container.TieredCommitLog;
+import org.apache.rocketmq.store.tiered.container.TieredConsumeQueue;
+
+public class MessageBufferUtil {
+    private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+    public static final int QUEUE_OFFSET_POSITION = 4 /* total size */
+        + 4 /* magic code */
+        + 4 /* body CRC */
+        + 4 /* queue id */
+        + 4; /* flag */
+
+    public static final int PHYSICAL_OFFSET_POSITION = 4 /* total size */
+        + 4 /* magic code */
+        + 4 /* body CRC */
+        + 4 /* queue id */
+        + 4 /* flag */
+        + 8; /* queue offset */
+
+    public static final int SYS_FLAG_OFFSET_POSITION = 4 /* total size */
+        + 4 /* magic code */
+        + 4 /* body CRC */
+        + 4 /* queue id */
+        + 4 /* flag */
+        + 8 /* queue offset */
+        + 8; /* physical offset */
+
+    public static final int STORE_TIMESTAMP_POSITION = 4 /* total size */
+        + 4 /* magic code */
+        + 4 /* body CRC */
+        + 4 /* queue id */
+        + 4 /* flag */
+        + 8 /* queue offset */
+        + 8 /* physical offset */
+        + 4 /* sys flag */
+        + 8 /* born timestamp */
+        + 8; /* born host */
+
+    public static final int STORE_HOST_POSITION = 4 /* total size */
+        + 4 /* magic code */
+        + 4 /* body CRC */
+        + 4 /* queue id */
+        + 4 /* flag */
+        + 8 /* queue offset */
+        + 8 /* physical offset */
+        + 4 /* sys flag */
+        + 8 /* born timestamp */
+        + 8 /* born host */
+        + 8; /* store timestamp */
+
+    public static int getTotalSize(ByteBuffer message) {
+        return message.getInt(message.position());
+    }
+
+    public static int getMagicCode(ByteBuffer message) {
+        return message.getInt(message.position() + 4);
+    }
+
+    public static long getQueueOffset(ByteBuffer message) {
+        return message.getLong(message.position() + QUEUE_OFFSET_POSITION);
+    }
+
+    public static long getCommitLogOffset(ByteBuffer message) {
+        return message.getLong(message.position() + PHYSICAL_OFFSET_POSITION);
+    }
+
+    public static long getStoreTimeStamp(ByteBuffer message) {
+        return message.getLong(message.position() + STORE_TIMESTAMP_POSITION);
+    }
+
+    public static ByteBuffer getOffsetIdBuffer(ByteBuffer message) {
+        ByteBuffer idBuffer = ByteBuffer.allocate(TieredStoreUtil.MSG_ID_LENGTH);
+        idBuffer.limit(TieredStoreUtil.MSG_ID_LENGTH);
+        idBuffer.putLong(message.getLong(message.position() + STORE_HOST_POSITION));
+        idBuffer.putLong(getCommitLogOffset(message));
+        idBuffer.flip();
+        return idBuffer;
+    }
+
+    public static String getOffsetId(ByteBuffer message) {
+        return UtilAll.bytes2string(getOffsetIdBuffer(message).array());
+    }
+
+    public static Map<String, String> getProperties(ByteBuffer message) {
+        ByteBuffer slice = message.slice();
+        return MessageDecoder.decodeProperties(slice);
+    }
+
+    public static List<Pair<Integer/* offset of msgBuffer */, Integer/* msg size */>> splitMessageBuffer(
+        ByteBuffer cqBuffer, ByteBuffer msgBuffer) {
+        cqBuffer.rewind();
+        msgBuffer.rewind();
+        List<Pair<Integer, Integer>> messageList = new ArrayList<>(cqBuffer.remaining() / TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+        if (cqBuffer.remaining() % TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE != 0) {
+            logger.warn("MessageBufferUtil#splitMessage: consume queue buffer size {} is not an integer multiple of CONSUME_QUEUE_STORE_UNIT_SIZE {}",
+                cqBuffer.remaining(), TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+            return messageList;
+        }
+        try {
+            long startCommitLogOffset = CQItemBufferUtil.getCommitLogOffset(cqBuffer);
+            for (int pos = cqBuffer.position(); pos < cqBuffer.limit(); pos += TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE) {
+                cqBuffer.position(pos);
+                int diff = (int) (CQItemBufferUtil.getCommitLogOffset(cqBuffer) - startCommitLogOffset);
+                int size = CQItemBufferUtil.getSize(cqBuffer);
+                if (diff + size > msgBuffer.limit()) {
+                    logger.error("MessageBufferUtil#splitMessage: message buffer size is incorrect: record in consume queue: {}, actual: {}", diff + size, msgBuffer.remaining());
+                    return messageList;
+                }
+                msgBuffer.position(diff);
+
+                int magicCode = getMagicCode(msgBuffer);
+                if (magicCode == TieredCommitLog.BLANK_MAGIC_CODE) {
+                    logger.warn("MessageBufferUtil#splitMessage: message decode error: blank magic code, this message may be coda, try to fix offset");
+                    diff = diff + TieredCommitLog.CODA_SIZE;
+                    msgBuffer.position(diff);
+                    magicCode = getMagicCode(msgBuffer);
+                }
+                if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE && magicCode != MessageDecoder.MESSAGE_MAGIC_CODE_V2) {
+                    logger.warn("MessageBufferUtil#splitMessage: message decode error: unknown magic code");
+                    continue;
+                }
+
+                if (getTotalSize(msgBuffer) != size) {
+                    logger.warn("MessageBufferUtil#splitMessage: message size is not right: except: {}, actual: {}", size, getTotalSize(msgBuffer));
+                    continue;
+                }
+
+                messageList.add(Pair.of(diff, size));
+            }
+        } catch (Exception e) {
+            logger.error("MessageBufferUtil#splitMessage: split message failed, maybe decode consume queue item failed", e);
+        } finally {
+            cqBuffer.rewind();
+            msgBuffer.rewind();
+        }
+        return messageList;
+    }
+}
diff --git a/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/TieredStoreUtil.java b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/TieredStoreUtil.java
new file mode 100644
index 000000000..17597fb08
--- /dev/null
+++ b/tieredstore/src/main/java/org/apache/rocketmq/store/tiered/util/TieredStoreUtil.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.util;
+
+import java.lang.reflect.Constructor;
+import java.math.BigInteger;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.text.DecimalFormat;
+import java.text.NumberFormat;
+import java.util.LinkedList;
+import java.util.List;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.topic.TopicValidator;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.metadata.TieredMetadataStore;
+
+public class TieredStoreUtil {
+    private static final Logger logger = LoggerFactory.getLogger(TieredStoreUtil.TIERED_STORE_LOGGER_NAME);
+
+    private static final long BYTE = 1L;
+    private static final long KB = BYTE << 10;
+    private static final long MB = KB << 10;
+    private static final long GB = MB << 10;
+    private static final long TB = GB << 10;
+    private static final long PB = TB << 10;
+    private static final long EB = PB << 10;
+
+    public static final String TIERED_STORE_LOGGER_NAME = "RocketmqTieredStore";
+    public static final String RMQ_SYS_TIERED_STORE_INDEX_TOPIC = "rmq_sys_INDEX";
+    public static final String PROXY_HOUSEKEEPING_TOPIC_PREFIX = "rocketmq-proxy-";
+    public final static int MSG_ID_LENGTH = 8 + 8;
+
+    private static final DecimalFormat DEC_FORMAT = new DecimalFormat("#.##");
+
+    private final static List<String> SYSTEM_TOPIC_LIST = new LinkedList<String>() {
+        {
+            add(RMQ_SYS_TIERED_STORE_INDEX_TOPIC);
+        }
+    };
+
+    private final static List<String> SYSTEM_TOPIC_WHITE_LIST = new LinkedList<String>() {
+        {
+        }
+    };
+
+    private volatile static TieredMetadataStore metadataStoreInstance;
+
+    private static String formatSize(long size, long divider, String unitName) {
+        return DEC_FORMAT.format((double) size / divider) + unitName;
+    }
+
+    public static String toHumanReadable(long size) {
+        if (size < 0)
+            return String.valueOf(size);
+        if (size >= EB)
+            return formatSize(size, EB, "EB");
+        if (size >= PB)
+            return formatSize(size, PB, "PB");
+        if (size >= TB)
+            return formatSize(size, TB, "TB");
+        if (size >= GB)
+            return formatSize(size, GB, "GB");
+        if (size >= MB)
+            return formatSize(size, MB, "MB");
+        if (size >= KB)
+            return formatSize(size, KB, "KB");
+        return formatSize(size, BYTE, "Bytes");
+    }
+
+    public static String getHash(String str) {
+        try {
+            MessageDigest md = MessageDigest.getInstance("MD5");
+            md.update(str.getBytes(StandardCharsets.UTF_8));
+            byte[] digest = md.digest();
+            return String.format("%032x", new BigInteger(1, digest)).substring(0, 8);
+        } catch (Exception ignore) {
+            return "";
+        }
+    }
+
+    public static String offset2FileName(final long offset) {
+        final NumberFormat numberFormat = NumberFormat.getInstance();
+
+        numberFormat.setMinimumIntegerDigits(20);
+        numberFormat.setMaximumFractionDigits(0);
+        numberFormat.setGroupingUsed(false);
+
+        try {
+            MessageDigest md = MessageDigest.getInstance("MD5");
+
+            md.update(Long.toString(offset).getBytes(StandardCharsets.UTF_8));
+
+            byte[] digest = md.digest();
+            String hash = String.format("%032x", new BigInteger(1, digest)).substring(0, 8);
+            return hash + numberFormat.format(offset);
+        } catch (Exception ignore) {
+            return numberFormat.format(offset);
+        }
+    }
+
+    public static long fileName2Offset(final String fileName) {
+        return Long.parseLong(fileName.substring(fileName.length() - 20));
+    }
+
+    public static void addSystemTopic(final String topic) {
+        SYSTEM_TOPIC_LIST.add(topic);
+    }
+
+    public static boolean isSystemTopic(final String topic) {
+        if (StringUtils.isBlank(topic)) {
+            return false;
+        }
+
+        if (SYSTEM_TOPIC_WHITE_LIST.contains(topic)) {
+            return false;
+        }
+
+        if (SYSTEM_TOPIC_LIST.contains(topic)) {
+            return true;
+        }
+        return TopicValidator.isSystemTopic(topic) || topic.toLowerCase().startsWith(PROXY_HOUSEKEEPING_TOPIC_PREFIX);
+    }
+
+    public static TieredMetadataStore getMetadataStore(TieredMessageStoreConfig storeConfig) {
+        if (metadataStoreInstance == null) {
+            synchronized (TieredMetadataStore.class) {
+                if (metadataStoreInstance == null) {
+                    try {
+                        Class<? extends TieredMetadataStore> clazz = Class.forName(storeConfig.getTieredMetadataServiceProvider()).asSubclass(TieredMetadataStore.class);
+                        Constructor<? extends TieredMetadataStore> constructor = clazz.getConstructor(TieredMessageStoreConfig.class);
+                        metadataStoreInstance = constructor.newInstance(storeConfig);
+                    } catch (Exception e) {
+                        logger.error("TieredMetadataStore#getInstance: build metadata store failed, provider class: {}", storeConfig.getTieredMetadataServiceProvider(), e);
+                    }
+                }
+            }
+        }
+        return metadataStoreInstance;
+    }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredFileQueueTest.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredFileQueueTest.java
new file mode 100644
index 000000000..a6bf09f6f
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredFileQueueTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.container;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.commons.io.FileUtils;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.metadata.TieredMetadataStore;
+import org.apache.rocketmq.store.tiered.mock.MemoryFileSegment;
+import org.apache.rocketmq.store.tiered.util.TieredStoreUtil;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TieredFileQueueTest {
+    TieredMessageStoreConfig storeConfig;
+    MessageQueue queue;
+
+    @Before
+    public void setUp() {
+        storeConfig = new TieredMessageStoreConfig();
+        storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "rmqut");
+        storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.store.tiered.mock.MemoryFileSegment");
+        queue = new MessageQueue("TieredFileQueueTest", storeConfig.getBrokerName(), 0);
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        FileUtils.deleteDirectory(new File("/tmp/rmqut"));
+        TieredStoreUtil.getMetadataStore(storeConfig).destroy();
+    }
+
+    @Test
+    public void testGetFileSegment() throws ClassNotFoundException, NoSuchMethodException {
+        TieredFileQueue fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.COMMIT_LOG,
+            queue, storeConfig);
+        fileQueue.setBaseOffset(0);
+        TieredFileSegment segment1 = fileQueue.getFileToWrite();
+        segment1.initPosition(1000);
+        segment1.append(ByteBuffer.allocate(100), 0);
+        segment1.setFull();
+        segment1.commit();
+
+        TieredFileSegment segment2 = fileQueue.getFileToWrite();
+        Assert.assertNotSame(segment1, segment2);
+        Assert.assertEquals(1000 + 100 + TieredCommitLog.CODA_SIZE, segment1.getMaxOffset());
+        Assert.assertEquals(1000 + 100 + TieredCommitLog.CODA_SIZE, segment2.getBaseOffset());
+
+        Assert.assertSame(fileQueue.getSegmentIndexByOffset(1000), 0);
+        Assert.assertSame(fileQueue.getSegmentIndexByOffset(1050), 0);
+        Assert.assertSame(fileQueue.getSegmentIndexByOffset(1100 + TieredCommitLog.CODA_SIZE), 1);
+        Assert.assertSame(fileQueue.getSegmentIndexByOffset(1150), -1);
+    }
+
+    @Test
+    public void testAppendAndRead() throws ClassNotFoundException, NoSuchMethodException {
+        TieredFileQueue fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+            queue, storeConfig);
+        fileQueue.setBaseOffset(0);
+        Assert.assertEquals(0, fileQueue.getMinOffset());
+        Assert.assertEquals(0, fileQueue.getCommitMsgQueueOffset());
+
+        TieredFileSegment segment1 = fileQueue.getFileToWrite();
+        segment1.initPosition(segment1.getSize());
+        Assert.assertEquals(0, segment1.getBaseOffset());
+        Assert.assertEquals(1000, fileQueue.getCommitOffset());
+        Assert.assertEquals(1000, fileQueue.getMaxOffset());
+
+        ByteBuffer buffer = ByteBuffer.allocate(100);
+        long currentTimeMillis = System.currentTimeMillis();
+        buffer.putLong(currentTimeMillis);
+        buffer.rewind();
+        fileQueue.append(buffer);
+        Assert.assertEquals(1100, segment1.getMaxOffset());
+
+        segment1.setFull();
+        fileQueue.commit(true);
+        Assert.assertEquals(1100, segment1.getCommitOffset());
+
+        ByteBuffer readBuffer = fileQueue.readAsync(1000, 8).join();
+        Assert.assertEquals(currentTimeMillis, readBuffer.getLong());
+
+        TieredFileSegment segment2 = fileQueue.getFileToWrite();
+        Assert.assertNotEquals(segment1, segment2);
+        segment2.initPosition(segment2.getSize());
+        buffer.rewind();
+        fileQueue.append(buffer);
+        fileQueue.commit(true);
+        readBuffer = fileQueue.readAsync(1000, 1200).join();
+        Assert.assertEquals(currentTimeMillis, readBuffer.getLong(1100));
+    }
+
+    @Test
+    public void testLoadFromMetadata() throws ClassNotFoundException, NoSuchMethodException {
+        TieredMetadataStore metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
+
+        MemoryFileSegment fileSegment1 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG,
+            queue, 100, storeConfig);
+        fileSegment1.initPosition(fileSegment1.getSize());
+        fileSegment1.setFull();
+        metadataStore.updateFileSegment(fileSegment1);
+        metadataStore.updateFileSegment(fileSegment1);
+
+        MemoryFileSegment fileSegment2 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG,
+            queue, 1100, storeConfig);
+        metadataStore.updateFileSegment(fileSegment2);
+
+        TieredFileQueue fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.COMMIT_LOG,
+            queue, storeConfig);
+        Assert.assertEquals(2, fileQueue.needCommitFileSegmentList.size());
+        TieredFileSegment file1 = fileQueue.getFileByIndex(0);
+        Assert.assertNotNull(file1);
+        Assert.assertEquals(100, file1.getBaseOffset());
+        Assert.assertFalse(file1.isFull());
+
+        TieredFileSegment file2 = fileQueue.getFileByIndex(1);
+        Assert.assertNotNull(file2);
+        Assert.assertEquals(1100, file2.getBaseOffset());
+        Assert.assertFalse(file2.isFull());
+
+        TieredFileSegment file3 = fileQueue.getFileByIndex(2);
+        Assert.assertNull(file3);
+    }
+
+    @Test
+    public void testCheckFileSize() throws ClassNotFoundException, NoSuchMethodException {
+        TieredMetadataStore metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
+
+        TieredFileSegment fileSegment1 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+            queue, 100, storeConfig);
+        fileSegment1.initPosition(fileSegment1.getSize() - 100);
+        fileSegment1.setFull(false);
+        metadataStore.updateFileSegment(fileSegment1);
+        metadataStore.updateFileSegment(fileSegment1);
+
+        TieredFileSegment fileSegment2 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+            queue, 1100, storeConfig);
+        fileSegment2.initPosition(fileSegment2.getSize() - 100);
+        metadataStore.updateFileSegment(fileSegment2);
+        metadataStore.updateFileSegment(fileSegment2);
+
+        TieredFileQueue fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+            queue, storeConfig);
+        Assert.assertEquals(1, fileQueue.needCommitFileSegmentList.size());
+
+        fileSegment1 = fileQueue.getFileByIndex(0);
+        Assert.assertTrue(fileSegment1.isFull());
+        Assert.assertEquals(fileSegment1.getSize() + 100, fileSegment1.getCommitOffset());
+
+        fileSegment2 = fileQueue.getFileByIndex(1);
+        Assert.assertEquals(1000, fileSegment2.getCommitPosition());
+
+        fileSegment2.setFull();
+        fileQueue.commit(true);
+        Assert.assertEquals(0, fileQueue.needCommitFileSegmentList.size());
+
+        fileQueue.getFileToWrite();
+        Assert.assertEquals(1, fileQueue.needCommitFileSegmentList.size());
+    }
+
+    @Test
+    public void testCleanExpiredFile() throws ClassNotFoundException, NoSuchMethodException {
+        TieredMetadataStore metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
+
+        TieredFileSegment fileSegment1 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+            queue, 100, storeConfig);
+        fileSegment1.initPosition(fileSegment1.getSize() - 100);
+        fileSegment1.setFull(false);
+        fileSegment1.setEndTimestamp(System.currentTimeMillis() - 1);
+        metadataStore.updateFileSegment(fileSegment1);
+        metadataStore.updateFileSegment(fileSegment1);
+
+        long file1CreateTimeStamp = System.currentTimeMillis();
+
+        TieredFileSegment fileSegment2 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+            queue, 1100, storeConfig);
+        fileSegment2.initPosition(fileSegment2.getSize());
+        fileSegment2.setEndTimestamp(System.currentTimeMillis() + 1);
+        metadataStore.updateFileSegment(fileSegment2);
+        metadataStore.updateFileSegment(fileSegment2);
+
+        TieredFileQueue fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+            queue, storeConfig);
+        Assert.assertEquals(2, fileQueue.getFileSegmentCount());
+
+        fileQueue.cleanExpiredFile(file1CreateTimeStamp);
+        fileQueue.destroyExpiredFile();
+        Assert.assertEquals(1, fileQueue.getFileSegmentCount());
+        Assert.assertNull(metadataStore.getFileSegment(fileSegment1));
+        Assert.assertNotNull(metadataStore.getFileSegment(fileSegment2));
+
+        fileQueue.cleanExpiredFile(Long.MAX_VALUE);
+        fileQueue.destroyExpiredFile();
+        Assert.assertEquals(0, fileQueue.getFileSegmentCount());
+        Assert.assertNull(metadataStore.getFileSegment(fileSegment1));
+        Assert.assertNull(metadataStore.getFileSegment(fileSegment2));
+    }
+
+    @Test
+    public void testRollingNewFile() throws ClassNotFoundException, NoSuchMethodException {
+        TieredMetadataStore metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
+
+        TieredFileSegment fileSegment1 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+            queue, 100, storeConfig);
+        fileSegment1.initPosition(fileSegment1.getSize() - 100);
+        metadataStore.updateFileSegment(fileSegment1);
+
+        TieredFileQueue fileQueue = new TieredFileQueue(TieredFileSegment.FileSegmentType.CONSUME_QUEUE,
+            queue, storeConfig);
+        Assert.assertEquals(1, fileQueue.getFileSegmentCount());
+
+        fileQueue.rollingNewFile();
+        Assert.assertEquals(2, fileQueue.getFileSegmentCount());
+    }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredFileSegmentTest.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredFileSegmentTest.java
new file mode 100644
index 000000000..d189aadd9
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredFileSegmentTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.container;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.mock.MemoryFileSegment;
+import org.apache.rocketmq.store.tiered.util.MessageBufferUtil;
+import org.apache.rocketmq.store.tiered.util.MessageBufferUtilTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TieredFileSegmentTest {
+    public int baseOffset = 1000;
+
+    public TieredFileSegment createFileSegment(TieredFileSegment.FileSegmentType fileType) {
+        return new MemoryFileSegment(fileType, new MessageQueue("TieredFileSegmentTest", "broker", 0),
+            baseOffset, new TieredMessageStoreConfig());
+    }
+
+    @Test
+    public void testCommitLog() {
+        TieredFileSegment segment = createFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG);
+        segment.initPosition(segment.getSize());
+        long lastSize = segment.getSize();
+        segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
+        segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
+        Assert.assertTrue(segment.needCommit());
+
+        ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+        long msg3StoreTime = System.currentTimeMillis();
+        buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, msg3StoreTime);
+        long queueOffset = baseOffset * 1000L;
+        buffer.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, queueOffset);
+        segment.append(buffer, msg3StoreTime);
+
+        Assert.assertEquals(baseOffset, segment.getBaseOffset());
+        Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
+        Assert.assertEquals(0, segment.getBeginTimestamp());
+        Assert.assertEquals(msg3StoreTime, segment.getEndTimestamp());
+
+        segment.setFull();
+        segment.commit();
+        Assert.assertFalse(segment.needCommit());
+        Assert.assertEquals(segment.getMaxOffset(), segment.getCommitOffset());
+        Assert.assertEquals(queueOffset, segment.getCommitMsgQueueOffset());
+
+        ByteBuffer msg1 = segment.read(lastSize, MessageBufferUtilTest.MSG_LEN);
+        Assert.assertEquals(baseOffset + lastSize, MessageBufferUtil.getCommitLogOffset(msg1));
+
+        ByteBuffer msg2 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN);
+        Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtil.getCommitLogOffset(msg2));
+
+        ByteBuffer msg3 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN);
+        Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtil.getCommitLogOffset(msg3));
+
+        ByteBuffer coda = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN * 3, TieredCommitLog.CODA_SIZE);
+        Assert.assertEquals(msg3StoreTime, coda.getLong(4 + 4));
+    }
+
+    private ByteBuffer buildConsumeQueue(long commitLogOffset) {
+        ByteBuffer cqItem = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+        cqItem.putLong(commitLogOffset);
+        cqItem.putInt(2);
+        cqItem.putLong(3);
+        cqItem.flip();
+        return cqItem;
+    }
+
+    @Test
+    public void testConsumeQueue() {
+        TieredFileSegment segment = createFileSegment(TieredFileSegment.FileSegmentType.CONSUME_QUEUE);
+        segment.initPosition(segment.getSize());
+        long lastSize = segment.getSize();
+        segment.append(buildConsumeQueue(baseOffset), 0);
+        segment.append(buildConsumeQueue(baseOffset + MessageBufferUtilTest.MSG_LEN), 0);
+        long cqItem3Timestamp = System.currentTimeMillis();
+        segment.append(buildConsumeQueue(baseOffset + MessageBufferUtilTest.MSG_LEN * 2), cqItem3Timestamp);
+
+        Assert.assertEquals(baseOffset + lastSize + TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 3, segment.getMaxOffset());
+        Assert.assertEquals(0, segment.getBeginTimestamp());
+        Assert.assertEquals(cqItem3Timestamp, segment.getEndTimestamp());
+
+        segment.commit();
+        Assert.assertEquals(segment.getMaxOffset(), segment.getCommitOffset());
+
+        ByteBuffer cqItem1 = segment.read(lastSize, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+        Assert.assertEquals(baseOffset, cqItem1.getLong());
+
+        ByteBuffer cqItem2 = segment.read(lastSize + TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+        Assert.assertEquals(baseOffset + MessageBufferUtilTest.MSG_LEN, cqItem2.getLong());
+
+        ByteBuffer cqItem3 = segment.read(lastSize + TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 2, TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+        Assert.assertEquals(baseOffset + MessageBufferUtilTest.MSG_LEN * 2, cqItem3.getLong());
+    }
+
+    @Test
+    public void testCommitFailed() {
+        long startTime = System.currentTimeMillis();
+        MemoryFileSegment segment = (MemoryFileSegment) createFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG);
+        long lastSize = segment.getSize();
+        segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
+        segment.append(MessageBufferUtilTest.buildMessageBuffer(), 0);
+
+        segment.blocker = new CompletableFuture<>();
+        new Thread(() -> {
+            try {
+                Thread.sleep(1000);
+            } catch (InterruptedException e) {
+                Assert.fail(e.getMessage());
+            }
+            ByteBuffer buffer = MessageBufferUtilTest.buildMessageBuffer();
+            buffer.putLong(MessageBufferUtil.STORE_TIMESTAMP_POSITION, startTime);
+            segment.append(buffer, 0);
+            segment.blocker.complete(false);
+        }).start();
+
+        segment.commit();
+        segment.blocker.join();
+
+        segment.blocker = new CompletableFuture<>();
+        segment.blocker.complete(true);
+        segment.commit();
+
+        Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getMaxOffset());
+        Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 3, segment.getCommitOffset());
+
+        ByteBuffer msg1 = segment.read(lastSize, MessageBufferUtilTest.MSG_LEN);
+        Assert.assertEquals(baseOffset + lastSize, MessageBufferUtil.getCommitLogOffset(msg1));
+
+        ByteBuffer msg2 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtilTest.MSG_LEN);
+        Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN, MessageBufferUtil.getCommitLogOffset(msg2));
+
+        ByteBuffer msg3 = segment.read(lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtilTest.MSG_LEN);
+        Assert.assertEquals(baseOffset + lastSize + MessageBufferUtilTest.MSG_LEN * 2, MessageBufferUtil.getCommitLogOffset(msg3));
+    }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredIndexFileTest.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredIndexFileTest.java
new file mode 100644
index 000000000..f4f517f65
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/container/TieredIndexFileTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.container;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.metadata.TieredMetadataStore;
+import org.apache.rocketmq.store.tiered.mock.MemoryFileSegment;
+import org.apache.rocketmq.store.tiered.util.TieredStoreUtil;
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TieredIndexFileTest {
+    MessageQueue mq;
+    TieredMessageStoreConfig storeConfig;
+    TieredMetadataStore metadataStore;
+
+    @Before
+    public void setUp() {
+        MemoryFileSegment.checkSize = false;
+        mq = new MessageQueue("TieredIndexFileTest", "broker", 1);
+        storeConfig = new TieredMessageStoreConfig();
+        storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "rmqut");
+        storeConfig.setTieredBackendServiceProvider("org.apache.rocketmq.store.tiered.mock.MemoryFileSegment");
+        storeConfig.setTieredStoreIndexFileMaxHashSlotNum(2);
+        storeConfig.setTieredStoreIndexFileMaxIndexNum(3);
+        metadataStore = TieredStoreUtil.getMetadataStore(storeConfig);
+    }
+
+    @After
+    public void tearDown() throws IOException {
+        MemoryFileSegment.checkSize = true;
+        FileUtils.deleteDirectory(new File("/tmp/rmqut"));
+//        metadataStore.reLoadStore();
+    }
+
+    @Test
+    public void testAppendAndQuery() throws IOException, ClassNotFoundException, NoSuchMethodException {
+        // skip this test on windows
+        Assume.assumeFalse(SystemUtils.IS_OS_WINDOWS);
+
+        TieredIndexFile indexFile = new TieredIndexFile(storeConfig);
+        indexFile.append(mq, 0, "key3", 3, 300, 1000);
+        indexFile.append(mq, 0, "key2", 2, 200, 1100);
+        indexFile.append(mq, 0, "key1", 1, 100, 1200);
+
+        Awaitility.waitAtMost(5, TimeUnit.SECONDS)
+            .until(() -> {
+                List<Pair<Long, ByteBuffer>> indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1000, 1200).join();
+                if (indexList.size() != 1) {
+                    return false;
+                }
+
+                ByteBuffer indexBuffer = indexList.get(0).getValue();
+                Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 2, indexBuffer.remaining());
+
+                Assert.assertEquals(1, indexBuffer.getLong(4 + 4 + 4));
+                Assert.assertEquals(100, indexBuffer.getInt(4 + 4 + 4 + 8));
+                Assert.assertEquals(200, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
+
+                Assert.assertEquals(3, indexBuffer.getLong(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4));
+                Assert.assertEquals(300, indexBuffer.getInt(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4 + 8));
+                Assert.assertEquals(0, indexBuffer.getInt(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE + 4 + 4 + 4 + 8 + 4));
+                return true;
+            });
+
+        indexFile.append(mq, 0, "key4", 4, 400, 1300);
+        indexFile.append(mq, 0, "key4", 4, 400, 1300);
+        indexFile.append(mq, 0, "key4", 4, 400, 1300);
+
+        Awaitility.waitAtMost(5, TimeUnit.SECONDS)
+            .until(() -> {
+                List<Pair<Long, ByteBuffer>> indexList = indexFile.queryAsync(mq.getTopic(), "key4", 1300, 1300).join();
+                if (indexList.size() != 1) {
+                    return false;
+                }
+
+                ByteBuffer indexBuffer = indexList.get(0).getValue();
+                Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 3, indexBuffer.remaining());
+                Assert.assertEquals(4, indexBuffer.getLong(4 + 4 + 4));
+                Assert.assertEquals(400, indexBuffer.getInt(4 + 4 + 4 + 8));
+                Assert.assertEquals(0, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
+                return true;
+            });
+
+        List<Pair<Long, ByteBuffer>> indexList = indexFile.queryAsync(mq.getTopic(), "key1", 1300, 1300).join();
+        Assert.assertEquals(0, indexList.size());
+
+        indexList = indexFile.queryAsync(mq.getTopic(), "key4", 1200, 1300).join();
+        Assert.assertEquals(2, indexList.size());
+
+        ByteBuffer indexBuffer = indexList.get(0).getValue();
+        Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE * 3, indexBuffer.remaining());
+        Assert.assertEquals(4, indexBuffer.getLong(4 + 4 + 4));
+        Assert.assertEquals(400, indexBuffer.getInt(4 + 4 + 4 + 8));
+        Assert.assertEquals(0, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
+
+        indexBuffer = indexList.get(1).getValue();
+        Assert.assertEquals(TieredIndexFile.INDEX_FILE_HASH_COMPACT_INDEX_SIZE, indexBuffer.remaining());
+        Assert.assertEquals(2, indexBuffer.getLong(4 + 4 + 4));
+        Assert.assertEquals(200, indexBuffer.getInt(4 + 4 + 4 + 8));
+        Assert.assertEquals(100, indexBuffer.getInt(4 + 4 + 4 + 8 + 4));
+    }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/metadata/MetadataStoreTest.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/metadata/MetadataStoreTest.java
index ff73c173a..a1c5b861a 100644
--- a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/metadata/MetadataStoreTest.java
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/metadata/MetadataStoreTest.java
@@ -18,10 +18,16 @@ package org.apache.rocketmq.store.tiered.metadata;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.io.FileUtils;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.container.TieredCommitLog;
+import org.apache.rocketmq.store.tiered.container.TieredFileSegment;
+import org.apache.rocketmq.store.tiered.mock.MemoryFileSegment;
+import org.apache.rocketmq.store.tiered.util.TieredStoreUtil;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -30,19 +36,20 @@ import org.junit.Test;
 public class MetadataStoreTest {
     MessageQueue mq;
     TieredMessageStoreConfig storeConfig;
-    TieredStoreMetadataStore metadataStore;
+    TieredMetadataStore metadataStore;
 
     @Before
     public void setUp() {
-        mq = new MessageQueue("MetadataStoreTest", "broker", 1);
         storeConfig = new TieredMessageStoreConfig();
-        storeConfig.setStorePathRootDir("/tmp/rmqut");
-        metadataStore = new TieredStoreMetadataManager(storeConfig);
+        storeConfig.setStorePathRootDir(FileUtils.getTempDirectory() + File.separator + "rmqut");
+        mq = new MessageQueue("MetadataStoreTest", storeConfig.getBrokerName(), 1);
+        metadataStore = new TieredMetadataManager(storeConfig);
     }
 
     @After
     public void tearDown() throws IOException {
         FileUtils.deleteDirectory(new File("/tmp/rmqut"));
+        TieredStoreUtil.getMetadataStore(storeConfig).destroy();
     }
 
     @Test
@@ -120,15 +127,54 @@ public class MetadataStoreTest {
         Assert.assertNotNull(metadataStore.getTopic(mq.getTopic() + "1"));
     }
 
+    @Test
+    public void testFileSegment() {
+        MemoryFileSegment fileSegment1 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG,
+            mq,
+            100,
+            storeConfig);
+        fileSegment1.initPosition(fileSegment1.getSize());
+        FileSegmentMetadata metadata1 = metadataStore.updateFileSegment(fileSegment1);
+        Assert.assertEquals(mq, metadata1.getQueue());
+        Assert.assertEquals(TieredFileSegment.FileSegmentType.COMMIT_LOG, TieredFileSegment.FileSegmentType.valueOf(metadata1.getType()));
+        Assert.assertEquals(100, metadata1.getBaseOffset());
+        Assert.assertEquals(0, metadata1.getSealTimestamp());
+
+        fileSegment1.setFull();
+        metadata1 = metadataStore.updateFileSegment(fileSegment1);
+        Assert.assertEquals(1000, metadata1.getSize());
+        Assert.assertEquals(0, metadata1.getSealTimestamp());
+
+        fileSegment1.commit();
+        metadata1 = metadataStore.updateFileSegment(fileSegment1);
+        Assert.assertEquals(1000 + TieredCommitLog.CODA_SIZE, metadata1.getSize());
+        Assert.assertTrue(metadata1.getSealTimestamp() > 0);
+
+        MemoryFileSegment fileSegment2 = new MemoryFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG,
+            mq,
+            1100,
+            storeConfig);
+        metadataStore.updateFileSegment(fileSegment2);
+        List<FileSegmentMetadata> list = new ArrayList<>();
+        metadataStore.iterateFileSegment(TieredFileSegment.FileSegmentType.COMMIT_LOG, "MetadataStoreTest", 1, list::add);
+        Assert.assertEquals(2, list.size());
+        Assert.assertEquals(100, list.get(0).getBaseOffset());
+        Assert.assertEquals(1100, list.get(1).getBaseOffset());
+
+        Assert.assertNotNull(metadataStore.getFileSegment(fileSegment1));
+        metadataStore.deleteFileSegment(fileSegment1);
+        Assert.assertNull(metadataStore.getFileSegment(fileSegment1));
+    }
+
     @Test
     public void testReload() {
-        TieredStoreMetadataManager metadataManager = (TieredStoreMetadataManager) metadataStore;
+        TieredMetadataManager metadataManager = (TieredMetadataManager) metadataStore;
         metadataManager.addTopic(mq.getTopic(), 1);
         metadataManager.addQueue(mq, 2);
         metadataManager.persist();
         Assert.assertTrue(new File(metadataManager.configFilePath()).exists());
 
-        metadataManager = new TieredStoreMetadataManager(storeConfig);
+        metadataManager = new TieredMetadataManager(storeConfig);
         metadataManager.load();
 
         TopicMetadata topicMetadata = metadataManager.getTopic(mq.getTopic());
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/mock/MemoryFileSegment.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/mock/MemoryFileSegment.java
new file mode 100644
index 000000000..0071963cf
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/mock/MemoryFileSegment.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.mock;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.rocketmq.common.message.MessageQueue;
+import org.apache.rocketmq.store.tiered.common.TieredMessageStoreConfig;
+import org.apache.rocketmq.store.tiered.container.TieredFileSegment;
+import org.junit.Assert;
+
+public class MemoryFileSegment extends TieredFileSegment {
+    private final ByteBuffer memStore;
+
+    public CompletableFuture<Boolean> blocker;
+
+    public static boolean checkSize = true;
+
+    public MemoryFileSegment(TieredFileSegment.FileSegmentType fileType, MessageQueue messageQueue, long baseOffset,
+        TieredMessageStoreConfig storeConfig) {
+        super(fileType, messageQueue, baseOffset, storeConfig);
+        switch (fileType) {
+            case COMMIT_LOG:
+                memStore = ByteBuffer.allocate(10000);
+                break;
+            case CONSUME_QUEUE:
+                memStore = ByteBuffer.allocate(10000);
+                break;
+            case INDEX:
+                memStore = ByteBuffer.allocate(10000);
+                break;
+            default:
+                memStore = null;
+                break;
+        }
+        memStore.position((int) getSize());
+    }
+
+    @Override public String getPath() {
+        return "/tiered/" + fileType + File.separator + baseOffset;
+    }
+
+    @Override public long getSize() {
+        if (checkSize) {
+            return 1000;
+        }
+        return 0;
+    }
+
+    @Override protected void createFile() {
+
+    }
+
+    @Override protected CompletableFuture<ByteBuffer> read0(long position, int length) {
+        ByteBuffer buffer = memStore.duplicate();
+        buffer.position((int) position);
+        ByteBuffer slice = buffer.slice();
+        slice.limit(length);
+        return CompletableFuture.completedFuture(slice);
+    }
+
+    @Override
+    protected CompletableFuture<Boolean> commit0(TieredFileSegmentInputStream inputStream, long position, int length,
+        boolean append) {
+        try {
+            if (blocker != null && !blocker.get()) {
+                throw new IllegalStateException();
+            }
+        } catch (InterruptedException | ExecutionException e) {
+            Assert.fail(e.getMessage());
+        }
+
+        Assert.assertTrue(!checkSize || position >= getSize());
+
+        byte[] buffer = new byte[1024];
+
+        int startPos = memStore.position();
+        try {
+            int len;
+            while ((len = inputStream.read(buffer)) > 0) {
+                memStore.put(buffer, 0, len);
+            }
+            Assert.assertEquals(length, memStore.position() - startPos);
+        } catch (Exception e) {
+            Assert.fail(e.getMessage());
+            return CompletableFuture.completedFuture(false);
+        }
+        return CompletableFuture.completedFuture(true);
+    }
+
+    @Override protected boolean exists() {
+        return false;
+    }
+
+    @Override protected void destroyFile() {
+
+    }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/CQItemBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/CQItemBufferUtilTest.java
new file mode 100644
index 000000000..d0e4932c9
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/CQItemBufferUtilTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.util;
+
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.store.ConsumeQueue;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class CQItemBufferUtilTest {
+    private static ByteBuffer cqItem;
+
+    @BeforeClass
+    public static void setUp() {
+        cqItem = ByteBuffer.allocate(ConsumeQueue.CQ_STORE_UNIT_SIZE);
+        cqItem.putLong(1);
+        cqItem.putInt(2);
+        cqItem.putLong(3);
+        cqItem.flip();
+    }
+
+    @Test
+    public void testGetCommitLogOffset() {
+        Assert.assertEquals(1, CQItemBufferUtil.getCommitLogOffset(cqItem));
+    }
+
+    @Test
+    public void testGetSize() {
+        Assert.assertEquals(2, CQItemBufferUtil.getSize(cqItem));
+    }
+
+    @Test
+    public void testGetTagCode() {
+        Assert.assertEquals(3, CQItemBufferUtil.getTagCode(cqItem));
+    }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/MessageBufferUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/MessageBufferUtilTest.java
new file mode 100644
index 000000000..739629a56
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/MessageBufferUtilTest.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.util;
+
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.store.tiered.container.TieredCommitLog;
+import org.apache.rocketmq.store.tiered.container.TieredConsumeQueue;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class MessageBufferUtilTest {
+    public static final int MSG_LEN = 4 //TOTALSIZE
+        + 4 //MAGICCODE
+        + 4 //BODYCRC
+        + 4 //QUEUEID
+        + 4 //FLAG
+        + 8 //QUEUEOFFSET
+        + 8 //PHYSICALOFFSET
+        + 4 //SYSFLAG
+        + 8 //BORNTIMESTAMP
+        + 8 //BORNHOST
+        + 8 //STORETIMESTAMP
+        + 8 //STOREHOSTADDRESS
+        + 4 //RECONSUMETIMES
+        + 8 //Prepared Transaction Offset
+        + 4 + 0 //BODY
+        + 2 + 0 //TOPIC
+        + 2 + 30 //properties
+        + 0;
+
+    public static ByteBuffer buildMessageBuffer() {
+        // Initialization of storage space
+        ByteBuffer buffer = ByteBuffer.allocate(MSG_LEN);
+        // 1 TOTALSIZE
+        buffer.putInt(MSG_LEN);
+        // 2 MAGICCODE
+        buffer.putInt(MessageDecoder.MESSAGE_MAGIC_CODE_V2);
+        // 3 BODYCRC
+        buffer.putInt(3);
+        // 4 QUEUEID
+        buffer.putInt(4);
+        // 5 FLAG
+        buffer.putInt(5);
+        // 6 QUEUEOFFSET
+        buffer.putLong(6);
+        // 7 PHYSICALOFFSET
+        buffer.putLong(7);
+        // 8 SYSFLAG
+        buffer.putInt(8);
+        // 9 BORNTIMESTAMP
+        buffer.putLong(9);
+        // 10 BORNHOST
+        buffer.putLong(10);
+        // 11 STORETIMESTAMP
+        buffer.putLong(11);
+        // 12 STOREHOSTADDRESS
+        buffer.putLong(10);
+        // 13 RECONSUMETIMES
+        buffer.putInt(13);
+        // 14 Prepared Transaction Offset
+        buffer.putLong(14);
+        // 15 BODY
+        buffer.putInt(0);
+        // 16 TOPIC
+        buffer.putShort((short) 0);
+        // 17 PROPERTIES
+        Map<String, String> map = new HashMap<>();
+        map.put(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, "uk");
+        map.put("userkey", "uservalue0");
+        String properties = MessageDecoder.messageProperties2String(map);
+        byte[] propertiesBytes = properties.getBytes(StandardCharsets.UTF_8);
+        buffer.putShort((short) propertiesBytes.length);
+        buffer.put(propertiesBytes);
+        buffer.flip();
+
+        Assert.assertEquals(MSG_LEN, buffer.remaining());
+        return buffer;
+    }
+
+    @Test
+    public void testGetTotalSize() {
+        ByteBuffer buffer = buildMessageBuffer();
+        int totalSize = MessageBufferUtil.getTotalSize(buffer);
+        Assert.assertEquals(MSG_LEN, totalSize);
+    }
+
+    @Test
+    public void testGetMagicCode() {
+        ByteBuffer buffer = buildMessageBuffer();
+        int magicCode = MessageBufferUtil.getMagicCode(buffer);
+        Assert.assertEquals(MessageDecoder.MESSAGE_MAGIC_CODE_V2, magicCode);
+    }
+
+    @Test
+    public void testSplitMessages() {
+        ByteBuffer msgBuffer1 = buildMessageBuffer();
+        msgBuffer1.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 10);
+        ByteBuffer msgBuffer2 = ByteBuffer.allocate(TieredCommitLog.CODA_SIZE);
+
+        msgBuffer2.putInt(TieredCommitLog.CODA_SIZE);
+        msgBuffer2.putInt(TieredCommitLog.BLANK_MAGIC_CODE);
+        msgBuffer2.putLong(System.currentTimeMillis());
+        msgBuffer2.flip();
+
+        ByteBuffer msgBuffer3 = buildMessageBuffer();
+        msgBuffer3.putLong(MessageBufferUtil.QUEUE_OFFSET_POSITION, 11);
+
+        ByteBuffer msgBuffer = ByteBuffer.allocate(msgBuffer1.remaining() + msgBuffer2.remaining() + msgBuffer3.remaining());
+        msgBuffer.put(msgBuffer1);
+        msgBuffer.put(msgBuffer2);
+        msgBuffer.put(msgBuffer3);
+        msgBuffer.flip();
+
+        ByteBuffer cqBuffer1 = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+        cqBuffer1.putLong(1000);
+        cqBuffer1.putInt(MSG_LEN);
+        cqBuffer1.putLong(0);
+        cqBuffer1.flip();
+
+        ByteBuffer cqBuffer2 = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+        cqBuffer2.putLong(1000 + TieredCommitLog.CODA_SIZE + MSG_LEN);
+        cqBuffer2.putInt(MSG_LEN);
+        cqBuffer2.putLong(0);
+        cqBuffer2.flip();
+
+        ByteBuffer cqBuffer3 = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+        cqBuffer3.putLong(1000 + MSG_LEN);
+        cqBuffer3.putInt(MSG_LEN);
+        cqBuffer3.putLong(0);
+        cqBuffer3.flip();
+
+        ByteBuffer cqBuffer4 = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+        cqBuffer4.putLong(1000 + TieredCommitLog.CODA_SIZE + MSG_LEN);
+        cqBuffer4.putInt(MSG_LEN - 10);
+        cqBuffer4.putLong(0);
+        cqBuffer4.flip();
+
+        ByteBuffer cqBuffer5 = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+        cqBuffer5.putLong(1000 + TieredCommitLog.CODA_SIZE + MSG_LEN);
+        cqBuffer5.putInt(MSG_LEN * 10);
+        cqBuffer5.putLong(0);
+        cqBuffer5.flip();
+
+        ByteBuffer cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 2);
+        cqBuffer.put(cqBuffer1);
+        cqBuffer.put(cqBuffer2);
+        cqBuffer.flip();
+        cqBuffer1.rewind();
+        cqBuffer2.rewind();
+        List<Pair<Integer, Integer>> msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
+        Assert.assertEquals(2, msgList.size());
+        Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
+        Assert.assertEquals(Pair.of(MSG_LEN + TieredCommitLog.CODA_SIZE, MSG_LEN), msgList.get(1));
+
+        cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 2);
+        cqBuffer.put(cqBuffer1);
+        cqBuffer.put(cqBuffer4);
+        cqBuffer.flip();
+        cqBuffer1.rewind();
+        cqBuffer4.rewind();
+        msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
+        Assert.assertEquals(1, msgList.size());
+        Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
+
+        cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE * 3);
+        cqBuffer.put(cqBuffer1);
+        cqBuffer.put(cqBuffer3);
+        cqBuffer.flip();
+        msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
+        Assert.assertEquals(2, msgList.size());
+        Assert.assertEquals(Pair.of(0, MSG_LEN), msgList.get(0));
+        Assert.assertEquals(Pair.of(MSG_LEN + TieredCommitLog.CODA_SIZE, MSG_LEN), msgList.get(1));
+
+        cqBuffer = ByteBuffer.allocate(TieredConsumeQueue.CONSUME_QUEUE_STORE_UNIT_SIZE);
+        cqBuffer.put(cqBuffer5);
+        cqBuffer.flip();
+        msgList = MessageBufferUtil.splitMessageBuffer(cqBuffer, msgBuffer);
+        Assert.assertEquals(0, msgList.size());
+    }
+
+    @Test
+    public void testGetQueueOffset() {
+        ByteBuffer buffer = buildMessageBuffer();
+        long queueOffset = MessageBufferUtil.getQueueOffset(buffer);
+        Assert.assertEquals(6, queueOffset);
+    }
+
+    @Test
+    public void testGetStoreTimeStamp() {
+        ByteBuffer buffer = buildMessageBuffer();
+        long storeTimeStamp = MessageBufferUtil.getStoreTimeStamp(buffer);
+        Assert.assertEquals(11, storeTimeStamp);
+    }
+
+    @Test
+    public void testGetOffsetId() {
+        ByteBuffer buffer = buildMessageBuffer();
+        InetSocketAddress inetSocketAddress = new InetSocketAddress("255.255.255.255", 65535);
+        ByteBuffer addr = ByteBuffer.allocate(Long.BYTES);
+        addr.put(inetSocketAddress.getAddress().getAddress(), 0, 4);
+        addr.putInt(inetSocketAddress.getPort());
+        addr.flip();
+        for (int i = 0; i < addr.remaining(); i++) {
+            buffer.put(MessageBufferUtil.STORE_HOST_POSITION + i, addr.get(i));
+        }
+        String excepted = MessageDecoder.createMessageId(ByteBuffer.allocate(TieredStoreUtil.MSG_ID_LENGTH), addr, 7);
+        String offsetId = MessageBufferUtil.getOffsetId(buffer);
+        Assert.assertEquals(excepted, offsetId);
+    }
+
+    @Test
+    public void testGetProperties() {
+        ByteBuffer buffer = buildMessageBuffer();
+        Map<String, String> properties = MessageBufferUtil.getProperties(buffer);
+        Assert.assertEquals(2, properties.size());
+        Assert.assertTrue(properties.containsKey(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+        Assert.assertEquals("uk", properties.get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+        Assert.assertTrue(properties.containsKey("userkey"));
+        Assert.assertEquals("uservalue0", properties.get("userkey"));
+    }
+}
diff --git a/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/TieredStoreUtilTest.java b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/TieredStoreUtilTest.java
new file mode 100644
index 000000000..1bb462d8b
--- /dev/null
+++ b/tieredstore/src/test/java/org/apache/rocketmq/store/tiered/util/TieredStoreUtilTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.tiered.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TieredStoreUtilTest {
+
+    private static final Map<Long, String> DATA_MAP = new HashMap<Long, String>() {
+        {
+            put(0L, "0Bytes");
+            put(1023L, "1023Bytes");
+            put(1024L, "1KB");
+            put(12_345L, "12.06KB");
+            put(10_123_456L, "9.65MB");
+            put(10_123_456_798L, "9.43GB");
+            put(1_777_777_777_777_777_777L, "1.54EB");
+        }
+    };
+
+    @Test
+    public void getHash() {
+        Assert.assertEquals("161c08ff", TieredStoreUtil.getHash("TieredStorageDailyTest"));
+    }
+
+    @Test
+    public void testOffset2FileName() {
+        Assert.assertEquals("cfcd208400000000000000000000", TieredStoreUtil.offset2FileName(0));
+        Assert.assertEquals("b10da56800000000004294937144", TieredStoreUtil.offset2FileName(4294937144L));
+    }
+
+    @Test
+    public void testFileName2Offset() {
+        Assert.assertEquals(0, TieredStoreUtil.fileName2Offset("cfcd208400000000000000000000"));
+        Assert.assertEquals(4294937144L, TieredStoreUtil.fileName2Offset("b10da56800000000004294937144"));
+    }
+
+    @Test
+    public void testToHumanReadable() {
+        DATA_MAP.forEach((in, expected) -> Assert.assertEquals(expected, TieredStoreUtil.toHumanReadable(in)));
+    }
+}
diff --git a/tieredstore/src/test/resources/logback.xml b/tieredstore/src/test/resources/logback.xml
new file mode 100644
index 000000000..b70b42046
--- /dev/null
+++ b/tieredstore/src/test/resources/logback.xml
@@ -0,0 +1,29 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<configuration>
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <!-- encoders are assigned the type
+             ch.qos.logback.classic.encoder.PatternLayoutEncoder by default -->
+        <encoder>
+            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <root level="debug">
+        <appender-ref ref="STDOUT" />
+    </root>
+</configuration>