You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/05/30 06:44:02 UTC

[iotdb] branch master updated: Add TieredManager for data directory control (#9972)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bcddb6eb15a Add TieredManager for data directory control (#9972)
bcddb6eb15a is described below

commit bcddb6eb15a2436fd0a7f33c0e336479af327d61
Author: Zhang.Jinrui <xi...@gmail.com>
AuthorDate: Tue May 30 14:43:56 2023 +0800

    Add TieredManager for data directory control (#9972)
---
 .../iotdb/hadoop/tsfile/TsFileWriteToHDFS.java     |   2 +-
 .../apache/iotdb/db/integration/IoTDBTtlIT.java    | 354 +++++++++++++++++++++
 .../util/TSFileConfigUtilCompletenessTest.java     |   8 +-
 .../iotdb/hadoop/fileSystem/HDFSConfUtil.java      |  11 +-
 .../apache/iotdb/hadoop/fileSystem/HDFSFile.java   |  18 ++
 .../apache/iotdb/hadoop/fileSystem/HDFSInput.java  |  32 --
 .../apache/iotdb/hadoop/tsfile/TSFHadoopTest.java  |   6 +-
 .../apache/iotdb/hive/TSFHiveInputFormatTest.java  |   4 +-
 .../apache/iotdb/hive/TSFHiveRecordReaderTest.java |   4 +-
 .../resources/conf/iotdb-common.properties         |   7 +-
 .../apache/iotdb/commons/conf/CommonConfig.java    |  20 +-
 .../iotdb/commons/conf/CommonDescriptor.java       |  22 +-
 .../apache/iotdb/commons/conf/IoTDBConstant.java   |   4 +
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 129 +++++---
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  46 ++-
 .../org/apache/iotdb/db/conf/IoTDBStartCheck.java  |   4 +-
 .../db/conf/directories/DirectoryChecker.java      |   5 +
 .../db/conf/directories/DirectoryManager.java      | 199 ------------
 .../iotdb/db/conf/directories/FolderManager.java   |   4 +-
 .../iotdb/db/conf/directories/TierManager.java     | 313 ++++++++++++++++++
 .../org/apache/iotdb/db/engine/StorageEngine.java  |   2 +-
 .../execute/recover/CompactionRecoverManager.java  |   6 +-
 .../execute/recover/CompactionRecoverTask.java     |   2 +-
 .../execute/utils/log/TsFileIdentifier.java        |   3 +-
 .../iotdb/db/engine/snapshot/SnapshotLoader.java   |   8 +-
 .../iotdb/db/engine/snapshot/SnapshotTaker.java    |   2 +-
 .../iotdb/db/engine/storagegroup/DataRegion.java   |  99 +++---
 .../engine/storagegroup/TsFileNameGenerator.java   |   9 +-
 .../db/engine/storagegroup/TsFileProcessor.java    |   4 +-
 .../db/engine/storagegroup/TsFileResource.java     |  39 ++-
 .../iotdb/db/engine/upgrade/UpgradeTask.java       |   6 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |   5 +-
 .../iotdb/db/service/metrics/SystemMetrics.java    |   2 +-
 .../org/apache/iotdb/db/utils/OpenFileNumUtil.java |   8 +-
 .../file/AbstractTsFileRecoverPerformer.java       |   9 +-
 .../db/engine/compaction/TsFileIdentifierUT.java   |  16 +-
 .../recover/SizeTieredCompactionRecoverTest.java   |  20 +-
 .../db/engine/snapshot/IoTDBSnapshotTest.java      |  42 +--
 .../iotdb/db/engine/storagegroup/TTLTest.java      |   6 +-
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |  17 +-
 .../iotdb/tsfile/common/conf/TSFileConfig.java     |  68 +++-
 .../iotdb/tsfile/fileSystem/FSFactoryProducer.java |  35 +-
 .../tsfile/fileSystem/{FSType.java => FSPath.java} |  22 +-
 .../org/apache/iotdb/tsfile/fileSystem/FSType.java |   3 +-
 .../fileInputFactory/HDFSInputFactory.java         |   8 +-
 ...putFactory.java => HybridFileInputFactory.java} |  57 ++--
 ...FSInputFactory.java => OSFileInputFactory.java} |  18 +-
 .../fileOutputFactory/HDFSOutputFactory.java       |   8 +-
 ...utFactory.java => HybridFileOutputFactory.java} |  57 ++--
 ...OutputFactory.java => OSFileOutputFactory.java} |  18 +-
 .../tsfile/fileSystem/fsFactory/FSFactory.java     |  13 +-
 .../tsfile/fileSystem/fsFactory/HDFSFactory.java   |  70 +++-
 .../fileSystem/fsFactory/HybridFSFactory.java      | 165 ++++++++++
 .../fileSystem/fsFactory/LocalFSFactory.java       |  22 +-
 .../{HDFSFactory.java => OSFSFactory.java}         | 133 +++++---
 .../iotdb/tsfile/read/reader/LocalTsFileInput.java |  48 ---
 .../iotdb/tsfile/read/reader/TsFileInput.java      |  39 +--
 .../org/apache/iotdb/tsfile/utils/FSUtils.java     | 153 +++++++++
 .../apache/iotdb/tsfile/utils/FilePathUtils.java   |  23 +-
 59 files changed, 1750 insertions(+), 707 deletions(-)

diff --git a/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TsFileWriteToHDFS.java b/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TsFileWriteToHDFS.java
index 52e7fc670d7..aaf1619a400 100644
--- a/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TsFileWriteToHDFS.java
+++ b/example/hadoop/src/main/java/org/apache/iotdb/hadoop/tsfile/TsFileWriteToHDFS.java
@@ -43,7 +43,7 @@ public class TsFileWriteToHDFS {
   private static final Logger logger = LoggerFactory.getLogger(TsFileWriteToHDFS.class);
 
   public static void main(String[] args) {
-    config.setTSFileStorageFs(FSType.HDFS);
+    config.setTSFileStorageFs(new FSType[] {FSType.HDFS});
 
     String path = "hdfs://localhost:9000/test.tsfile";
     File f = FSFactoryProducer.getFSFactory().getFile(path);
diff --git a/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
new file mode 100644
index 00000000000..7c30e59b045
--- /dev/null
+++ b/integration/src/test/java/org/apache/iotdb/db/integration/IoTDBTtlIT.java
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.iotdb.db.integration;
+
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.integration.env.EnvFactory;
+import org.apache.iotdb.itbase.category.ClusterTest;
+import org.apache.iotdb.itbase.category.LocalStandaloneTest;
+import org.apache.iotdb.itbase.category.RemoteTest;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@Category({LocalStandaloneTest.class, ClusterTest.class, RemoteTest.class})
+public class IoTDBTtlIT {
+
+  @Before
+  public void setUp() throws Exception {
+    EnvFactory.getEnv().initBeforeTest();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanAfterTest();
+  }
+
+  @Test
+  @Category({ClusterTest.class})
+  public void testTTL() throws SQLException, InterruptedException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      try {
+        statement.execute("SET TTL TO root.TTL_SG1 1000");
+      } catch (SQLException e) {
+        assertEquals(TSStatusCode.PATH_NOT_EXIST.getStatusCode(), e.getErrorCode());
+      }
+      try {
+        statement.execute("UNSET TTL TO root.TTL_SG1");
+      } catch (SQLException e) {
+        assertEquals(TSStatusCode.PATH_NOT_EXIST.getStatusCode(), e.getErrorCode());
+      }
+
+      statement.execute("CREATE DATABASE root.TTL_SG1");
+      statement.execute("CREATE TIMESERIES root.TTL_SG1.s1 WITH DATATYPE=INT64,ENCODING=PLAIN");
+      try {
+        statement.execute("SET TTL TO root.TTL_SG1.s1 1000");
+      } catch (SQLException e) {
+        assertEquals(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode(), e.getErrorCode());
+      }
+
+      statement.execute("CREATE DATABASE root.TTL_SG2");
+      statement.execute("CREATE TIMESERIES root.TTL_SG2.s1 WITH DATATYPE=INT64,ENCODING=PLAIN");
+      try {
+        statement.execute("SET TTL TO root.TTL_SG2.s1 1000");
+      } catch (SQLException e) {
+        assertEquals(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode(), e.getErrorCode());
+      }
+
+      try {
+        statement.execute("SET TTL TO root.** 1000");
+      } catch (SQLException e) {
+        assertEquals(TSStatusCode.PATH_NOT_EXIST.getStatusCode(), e.getErrorCode());
+      }
+      try {
+        statement.execute("UNSET TTL TO root.**");
+      } catch (SQLException e) {
+        assertEquals(TSStatusCode.PATH_NOT_EXIST.getStatusCode(), e.getErrorCode());
+      }
+
+      try {
+        statement.execute("SET TTL TO root.**.s1 1000");
+      } catch (SQLException e) {
+        assertEquals(TSStatusCode.DATABASE_NOT_EXIST.getStatusCode(), e.getErrorCode());
+      }
+
+      long now = System.currentTimeMillis();
+      for (int i = 0; i < 100; i++) {
+        statement.execute(
+            String.format(
+                "INSERT INTO root.TTL_SG1(timestamp, s1) VALUES (%d, %d)", now - 100 + i, i));
+      }
+      for (int i = 0; i < 100; i++) {
+        statement.execute(
+            String.format(
+                "INSERT INTO root.TTL_SG1(timestamp, s1) VALUES (%d, %d)", now - 100000 + i, i));
+      }
+
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.TTL_SG1")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertEquals(200, cnt);
+      }
+
+      statement.execute("SET TTL TO root.TTL_SG1 10000");
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.TTL_SG1")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertEquals(100, cnt);
+      }
+      for (int i = 0; i < 100; i++) {
+        boolean caught = false;
+        try {
+          statement.execute(
+              String.format(
+                  "INSERT INTO root.TTL_SG1(timestamp, s1) VALUES (%d, %d)", now - 500000 + i, i));
+        } catch (SQLException e) {
+          if (TSStatusCode.OUT_OF_TTL.getStatusCode() == e.getErrorCode()) {
+            caught = true;
+          }
+        }
+        assertTrue(caught);
+      }
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.TTL_SG1")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertEquals(100, cnt);
+      }
+
+      statement.execute("UNSET TTL TO root.TTL_SG1");
+      // make sure other nodes have applied UNSET TTL
+      Thread.sleep(1000);
+      for (int i = 0; i < 100; i++) {
+        statement.execute(
+            String.format(
+                "INSERT INTO root.TTL_SG1(timestamp, s1) VALUES (%d, %d)", now - 30000 + i, i));
+      }
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.TTL_SG1")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertTrue(cnt >= 200);
+      }
+      statement.execute("CREATE DATABASE root.sg.TTL_SG3");
+      statement.execute("CREATE DATABASE root.sg.TTL_SG4");
+      // SG2
+      for (int i = 0; i < 100; i++) {
+        statement.execute(
+            String.format(
+                "INSERT INTO root.sg.TTL_SG3(timestamp, s1) VALUES (%d, %d)", now - 100 + i, i));
+      }
+      for (int i = 100; i < 200; i++) {
+        statement.execute(
+            String.format(
+                "INSERT INTO root.sg.TTL_SG3(timestamp, s1) VALUES (%d, %d)", now - 100000 + i, i));
+      }
+
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg.TTL_SG3")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertEquals(200, cnt);
+      }
+      // SG1
+      for (int i = 200; i < 300; i++) {
+        statement.execute(
+            String.format(
+                "INSERT INTO root.sg.TTL_SG4(timestamp, s1) VALUES (%d, %d)", now - 100 + i, i));
+      }
+      for (int i = 300; i < 400; i++) {
+        statement.execute(
+            String.format(
+                "INSERT INTO root.sg.TTL_SG4(timestamp, s1) VALUES (%d, %d)", now - 100000 + i, i));
+      }
+
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg.TTL_SG4")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertEquals(200, cnt);
+      }
+
+      statement.execute("SET TTL TO root.sg.** 10000");
+      try (ResultSet resultSet = statement.executeQuery("SELECT * FROM root.sg.**")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertEquals(200, cnt);
+      }
+      for (int i = 0; i < 100; i++) {
+        boolean caught = false;
+        try {
+          statement.execute(
+              String.format(
+                  "INSERT INTO root.sg.TTL_SG3(timestamp, s1) VALUES (%d, %d)",
+                  now - 500000 + i, i));
+        } catch (SQLException e) {
+          if (TSStatusCode.OUT_OF_TTL.getStatusCode() == e.getErrorCode()) {
+            caught = true;
+          }
+        }
+        assertTrue(caught);
+      }
+      for (int i = 100; i < 200; i++) {
+        boolean caught = false;
+        try {
+          statement.execute(
+              String.format(
+                  "INSERT INTO root.sg.TTL_SG4(timestamp, s1) VALUES (%d, %d)",
+                  now - 500000 + i, i));
+        } catch (SQLException e) {
+          if (TSStatusCode.OUT_OF_TTL.getStatusCode() == e.getErrorCode()) {
+            caught = true;
+          }
+        }
+        assertTrue(caught);
+      }
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg.**")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertEquals(200, cnt);
+      }
+
+      statement.execute("UNSET TTL TO root.sg.**");
+      // make sure other nodes have applied UNSET TTL
+      Thread.sleep(1000);
+      for (int i = 0; i < 100; i++) {
+        statement.execute(
+            String.format(
+                "INSERT INTO root.sg.TTL_SG3(timestamp, s1) VALUES (%d, %d)", now - 30000 + i, i));
+      }
+      for (int i = 100; i < 200; i++) {
+        statement.execute(
+            String.format(
+                "INSERT INTO root.sg.TTL_SG4(timestamp, s1) VALUES (%d, %d)", now - 30000 + i, i));
+      }
+      try (ResultSet resultSet = statement.executeQuery("SELECT s1 FROM root.sg.**")) {
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt++;
+        }
+        assertTrue(cnt >= 400);
+      }
+    }
+  }
+
+  @Test
+  @Category({ClusterTest.class})
+  public void testShowTTL() throws SQLException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE root.group1");
+      statement.execute("CREATE DATABASE root.group2");
+      String result = doQuery(statement, "SHOW ALL TTL");
+      assertTrue(
+          result.equals("root.group1,null\n" + "root.group2,null\n")
+              || result.equals("root.group2,null\n" + "root.group1,null\n"));
+      result = doQuery(statement, "SHOW TTL ON root.group1");
+      assertEquals("root.group1,null\n", result);
+
+      statement.execute("SET TTL TO root.group1 10000");
+      result = doQuery(statement, "SHOW ALL TTL");
+      assertTrue(
+          result.equals("root.group1,10000\n" + "root.group2,null\n")
+              || result.equals("root.group2,null\n" + "root.group1,10000\n"));
+      result = doQuery(statement, "SHOW TTL ON root.group1");
+      assertEquals("root.group1,10000\n", result);
+
+      statement.execute("UNSET TTL TO root.group1");
+      result = doQuery(statement, "SHOW ALL TTL");
+      assertTrue(
+          result.equals("root.group1,null\n" + "root.group2,null\n")
+              || result.equals("root.group2,null\n" + "root.group1,null\n"));
+      result = doQuery(statement, "SHOW TTL ON root.group1");
+      assertEquals("root.group1,null\n", result);
+    }
+  }
+
+  private String doQuery(Statement statement, String query) throws SQLException {
+    StringBuilder ret;
+    try (ResultSet resultSet = statement.executeQuery(query)) {
+      ret = new StringBuilder();
+      while (resultSet.next()) {
+        ret.append(resultSet.getString(1));
+        ret.append(",");
+        ret.append(resultSet.getString(2));
+        ret.append("\n");
+      }
+    }
+    return ret.toString();
+  }
+
+  @Test
+  public void testDefaultTTL() throws SQLException {
+    CommonDescriptor.getInstance().getConfig().setTierTTLInMs(new long[] {10000});
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE root.group1");
+      statement.execute("CREATE DATABASE root.group2");
+
+      String result = doQuery(statement, "SHOW ALL TTL");
+      assertTrue(
+          result.equals("root.group1,10000\n" + "root.group2,10000\n")
+              || result.equals("root.group2,10000\n" + "root.group1,10000\n"));
+    } finally {
+      CommonDescriptor.getInstance().getConfig().setTierTTLInMs(new long[] {Long.MAX_VALUE});
+    }
+  }
+
+  @Test
+  @Category({ClusterTest.class})
+  public void testTTLOnAnyPath() throws SQLException {
+    try (Connection connection = EnvFactory.getEnv().getConnection();
+        Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE root.group1");
+      statement.execute("CREATE DATABASE root.group2.sgroup1");
+      statement.execute("SET TTL TO root.group2.** 10000");
+      String result = doQuery(statement, "SHOW ALL TTL");
+      assertTrue(
+          result.equals("root.group1,null\n" + "root.group2.sgroup1,10000\n")
+              || result.equals("root.group2.sgroup1,10000\n" + "root.group1,null\n"));
+    }
+  }
+}
diff --git a/iotdb-connector/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TSFileConfigUtilCompletenessTest.java b/iotdb-connector/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TSFileConfigUtilCompletenessTest.java
index c46c700f2c2..c209271f82c 100644
--- a/iotdb-connector/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TSFileConfigUtilCompletenessTest.java
+++ b/iotdb-connector/flink-tsfile-connector/src/test/java/org/apache/iotdb/flink/util/TSFileConfigUtilCompletenessTest.java
@@ -78,7 +78,13 @@ public class TSFileConfigUtilCompletenessTest {
       "setMaxTsBlockLineNumber",
       "setMaxTsBlockSizeInBytes",
       "setPatternMatchingThreshold",
-      "setCustomizedProperties"
+      "setCustomizedProperties",
+      "setObjectStorageTsFileInput",
+      "setObjectStorageTsFileOutput",
+      "setHdfsFile",
+      "setHdfsTsFileInput",
+      "setHdfsTsFileOutput",
+      "setObjectStorageFile"
     };
     Set<String> addedSetters = new HashSet<>();
     Collections.addAll(addedSetters, setters);
diff --git a/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSConfUtil.java b/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSConfUtil.java
index 40cefcf83be..d18c123ad5c 100644
--- a/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSConfUtil.java
+++ b/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSConfUtil.java
@@ -39,9 +39,18 @@ class HDFSConfUtil {
   private static final Logger logger = LoggerFactory.getLogger(HDFSConfUtil.class);
 
   static Configuration setConf(Configuration conf) {
-    if (!tsFileConfig.getTSFileStorageFs().equals(FSType.HDFS)) {
+    boolean enableHDFS = false;
+    for (FSType type : tsFileConfig.getTSFileStorageFs()) {
+      if (type.equals(FSType.HDFS)) {
+        enableHDFS = true;
+        break;
+      }
+    }
+
+    if (!enableHDFS) {
       return conf;
     }
+
     try {
       conf.addResource(new File(tsFileConfig.getCoreSitePath()).toURI().toURL());
       conf.addResource(new File(tsFileConfig.getHdfsSitePath()).toURI().toURL());
diff --git a/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java b/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
index afa0ecc0607..3fb545e9318 100644
--- a/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
+++ b/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSFile.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,7 +36,9 @@ import java.io.File;
 import java.io.FileFilter;
 import java.io.FilenameFilter;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.net.URI;
 import java.net.URL;
@@ -283,6 +286,21 @@ public class HDFSFile extends File {
     return files;
   }
 
+  private void copyToLocal(File destFile) throws IOException {
+    fs.copyToLocalFile(hdfsPath, new Path(destFile.getPath()));
+  }
+
+  private void copyFromLocal(File srcFile) throws IOException {
+    fs.copyFromLocalFile(new Path(srcFile.getPath()), hdfsPath);
+  }
+
+  private void copyTo(File destFile) throws IOException {
+    try (InputStream in = fs.open(hdfsPath);
+        OutputStream out = fs.create(((HDFSFile) destFile).hdfsPath, true)) {
+      IOUtils.copyBytes(in, out, 4096);
+    }
+  }
+
   @Override
   public File getAbsoluteFile() {
     return new HDFSFile(getAbsolutePath());
diff --git a/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java b/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java
index 242087bb378..5c0940d495f 100644
--- a/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java
+++ b/iotdb-connector/hadoop/src/main/java/org/apache/iotdb/hadoop/fileSystem/HDFSInput.java
@@ -19,7 +19,6 @@
 package org.apache.iotdb.hadoop.fileSystem;
 
 import org.apache.iotdb.tsfile.read.reader.TsFileInput;
-import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumFileSystem;
@@ -30,7 +29,6 @@ import org.apache.hadoop.fs.Path;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
 
 public class HDFSInput implements TsFileInput {
 
@@ -100,21 +98,6 @@ public class HDFSInput implements TsFileInput {
     return res;
   }
 
-  @Override
-  public int read() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int read(byte[] b, int off, int len) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public FileChannel wrapAsFileChannel() {
-    throw new UnsupportedOperationException();
-  }
-
   @Override
   public InputStream wrapAsInputStream() {
     return fsDataInputStream;
@@ -125,21 +108,6 @@ public class HDFSInput implements TsFileInput {
     fsDataInputStream.close();
   }
 
-  @Override
-  public int readInt() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public synchronized String readVarIntString(long position) throws IOException {
-    long srcPosition = fsDataInputStream.getPos();
-
-    fsDataInputStream.seek(position);
-    String res = ReadWriteIOUtils.readVarIntString(fsDataInputStream);
-    fsDataInputStream.seek(srcPosition);
-    return res;
-  }
-
   @Override
   public String getFilePath() {
     return path.toString();
diff --git a/iotdb-connector/hadoop/src/test/java/org/apache/iotdb/hadoop/tsfile/TSFHadoopTest.java b/iotdb-connector/hadoop/src/test/java/org/apache/iotdb/hadoop/tsfile/TSFHadoopTest.java
index b86e8123b74..3307ec01bc9 100644
--- a/iotdb-connector/hadoop/src/test/java/org/apache/iotdb/hadoop/tsfile/TSFHadoopTest.java
+++ b/iotdb-connector/hadoop/src/test/java/org/apache/iotdb/hadoop/tsfile/TSFHadoopTest.java
@@ -71,7 +71,7 @@ public class TSFHadoopTest {
           .concat(File.separator)
           .concat("1-0-0-0.tsfile");
 
-  private FSType beforeFSType;
+  private FSType[] beforeFSType;
 
   @Before
   public void setUp() {
@@ -152,7 +152,7 @@ public class TSFHadoopTest {
       TSFInputFormat.setInputPaths(job, tsfilePath);
       List<InputSplit> inputSplits = inputFormat.getSplits(job);
       beforeFSType = TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs();
-      TSFileDescriptor.getInstance().getConfig().setTSFileStorageFs(FSType.HDFS);
+      TSFileDescriptor.getInstance().getConfig().setTSFileStorageFs(new FSType[] {FSType.HDFS});
       TsFileSequenceReader reader =
           new TsFileSequenceReader(new HDFSInput(tsfilePath, job.getConfiguration()));
       System.out.println(reader.readFileMetadata());
@@ -184,7 +184,7 @@ public class TSFHadoopTest {
       TSFInputFormat.setReadTime(job, false);
       List<InputSplit> inputSplits = inputFormat.getSplits(job);
       beforeFSType = TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs();
-      TSFileDescriptor.getInstance().getConfig().setTSFileStorageFs(FSType.HDFS);
+      TSFileDescriptor.getInstance().getConfig().setTSFileStorageFs(new FSType[] {FSType.HDFS});
       TsFileSequenceReader reader =
           new TsFileSequenceReader(new HDFSInput(tsfilePath, job.getConfiguration()));
 
diff --git a/iotdb-connector/hive-connector/src/test/java/org/apache/iotdb/hive/TSFHiveInputFormatTest.java b/iotdb-connector/hive-connector/src/test/java/org/apache/iotdb/hive/TSFHiveInputFormatTest.java
index 57d1c4ffc51..031a1b5b8a1 100644
--- a/iotdb-connector/hive-connector/src/test/java/org/apache/iotdb/hive/TSFHiveInputFormatTest.java
+++ b/iotdb-connector/hive-connector/src/test/java/org/apache/iotdb/hive/TSFHiveInputFormatTest.java
@@ -46,7 +46,7 @@ public class TSFHiveInputFormatTest {
   private TSFInputSplit inputSplit;
   private TSFHiveInputFormat inputFormat;
   private JobConf job;
-  private FSType beforeFSType;
+  private FSType[] beforeFSType;
   private final String filePath =
       TestConstant.BASE_OUTPUT_PATH
           .concat("data")
@@ -75,7 +75,7 @@ public class TSFHiveInputFormatTest {
     String[] hosts = {"127.0.0.1"};
     inputSplit = new TSFInputSplit(path, hosts, 0, 3727688L);
     beforeFSType = TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs();
-    TSFileDescriptor.getInstance().getConfig().setTSFileStorageFs(FSType.HDFS);
+    TSFileDescriptor.getInstance().getConfig().setTSFileStorageFs(new FSType[] {FSType.HDFS});
   }
 
   @After
diff --git a/iotdb-connector/hive-connector/src/test/java/org/apache/iotdb/hive/TSFHiveRecordReaderTest.java b/iotdb-connector/hive-connector/src/test/java/org/apache/iotdb/hive/TSFHiveRecordReaderTest.java
index 8480dcd0ae1..92c20e2c62e 100644
--- a/iotdb-connector/hive-connector/src/test/java/org/apache/iotdb/hive/TSFHiveRecordReaderTest.java
+++ b/iotdb-connector/hive-connector/src/test/java/org/apache/iotdb/hive/TSFHiveRecordReaderTest.java
@@ -61,7 +61,7 @@ public class TSFHiveRecordReaderTest {
           .concat("0")
           .concat(File.separator)
           .concat("1-0-0-0.tsfile");
-  private FSType beforeFSType;
+  private FSType[] beforeFSType;
 
   @Before
   public void setUp() throws IOException {
@@ -70,7 +70,7 @@ public class TSFHiveRecordReaderTest {
     Path path = new Path(filePath);
     String[] hosts = {"127.0.0.1"};
     beforeFSType = TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs();
-    TSFileDescriptor.getInstance().getConfig().setTSFileStorageFs(FSType.HDFS);
+    TSFileDescriptor.getInstance().getConfig().setTSFileStorageFs(new FSType[] {FSType.HDFS});
     TSFInputSplit inputSplit = new TSFInputSplit(path, hosts, 0, 3727528L);
     String[] deviceIds = {"device_1"}; // configure reading which deviceIds
     job.set(READ_DELTAOBJECTS, String.join(",", deviceIds));
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index 04fd6c87254..b50efbe2726 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -457,13 +457,12 @@ cluster_name=defaultCluster
 # Datatype: String
 # timestamp_precision=ms
 
-# Default TTL for databases that are not set TTL by statements, If not set (default),
-# the TTL will be unlimited.
+# Default TTL for databases that are not set TTL by statements, If not set (default), the TTL will be unlimited.
+# Negative value means the TTL is unlimited.
 # Notice: if this property is changed, previous created database which are not set TTL will also be affected.
-# data.
 # Datatype: long
 # Unit: ms
-# default_ttl_in_ms=36000000
+# default_ttl_in_ms=-1
 
 # When the waiting time (in ms) of an inserting exceeds this, throw an exception. 10000 by default.
 # If the insertion has been rejected and the read load is low, it can be set larger
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index b3a65b9152d..a44810612f2 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -87,12 +87,12 @@ public class CommonConfig {
   private FSType systemFileStorageFs = FSType.LOCAL;
 
   /**
-   * default TTL for databases that are not set TTL by statements, in ms.
-   *
-   * <p>Notice: if this property is changed, previous created database which are not set TTL will
-   * also be affected. Unit: millisecond
+   * Default TTL for databases that are not set TTL by statements. If tiered storage is enabled,
+   * data matches the last ttl will be deleted and other data will be migrated to the next tier.
+   * Notice: if this property is changed, previous created database which are not set TTL will also
+   * be affected. Unit: millisecond
    */
-  private long defaultTTLInMs = Long.MAX_VALUE;
+  private long[] tierTTLInMs = {Long.MAX_VALUE};
 
   /** Thrift socket and connection timeout between data node and config node. */
   private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20);
@@ -250,11 +250,15 @@ public class CommonConfig {
   }
 
   public long getDefaultTTLInMs() {
-    return defaultTTLInMs;
+    return tierTTLInMs[tierTTLInMs.length - 1];
   }
 
-  public void setDefaultTTLInMs(long defaultTTLInMs) {
-    this.defaultTTLInMs = defaultTTLInMs;
+  public long[] getTierTTLInMs() {
+    return tierTTLInMs;
+  }
+
+  public void setTierTTLInMs(long[] tierTTLInMs) {
+    this.tierTTLInMs = tierTTLInMs;
   }
 
   public int getConnectionTimeoutInMS() {
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
index c860af5d593..ef947180567 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonDescriptor.java
@@ -82,11 +82,23 @@ public class CommonDescriptor {
             "iotdb_server_encrypt_decrypt_provider_parameter",
             config.getEncryptDecryptProviderParameter()));
 
-    config.setDefaultTTLInMs(
-        Long.parseLong(
-            properties
-                .getProperty("default_ttl_in_ms", String.valueOf(config.getDefaultTTLInMs()))
-                .trim()));
+    String[] tierTTLStr = new String[config.getTierTTLInMs().length];
+    for (int i = 0; i < tierTTLStr.length; ++i) {
+      tierTTLStr[i] = String.valueOf(config.getTierTTLInMs()[i]);
+    }
+    tierTTLStr =
+        properties
+            .getProperty("default_ttl_in_ms", String.join(IoTDBConstant.TIER_SEPARATOR, tierTTLStr))
+            .split(IoTDBConstant.TIER_SEPARATOR);
+    long[] tierTTL = new long[tierTTLStr.length];
+    for (int i = 0; i < tierTTL.length; ++i) {
+      tierTTL[i] = Long.parseLong(tierTTLStr[i]);
+      if (tierTTL[i] < 0) {
+        tierTTL[i] = Long.MAX_VALUE;
+      }
+    }
+    config.setTierTTLInMs(tierTTL);
+
     config.setSyncDir(properties.getProperty("dn_sync_dir", config.getSyncDir()).trim());
 
     config.setWalDirs(
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
index 344bf05893e..4763c6f08e6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/IoTDBConstant.java
@@ -328,4 +328,8 @@ public class IoTDBConstant {
   public static final String DOUBLE_COLONS = "::";
 
   public static final int MAX_DATABASE_NAME_LENGTH = 64;
+
+  public static final String TIER_SEPARATOR = ";";
+
+  public static final String OBJECT_STORAGE_DIR = "object_storage";
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 6654a38e6db..48c3e0f853d 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -25,7 +25,6 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.audit.AuditLogOperation;
 import org.apache.iotdb.db.audit.AuditLogStorage;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
 import org.apache.iotdb.db.engine.compaction.execute.performer.constant.CrossCompactionPerformer;
 import org.apache.iotdb.db.engine.compaction.execute.performer.constant.InnerSeqCompactionPerformer;
 import org.apache.iotdb.db.engine.compaction.execute.performer.constant.InnerUnseqCompactionPerformer;
@@ -46,6 +45,7 @@ import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.iotdb.tsfile.fileSystem.FSType;
+import org.apache.iotdb.tsfile.utils.FSUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +61,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static org.apache.iotdb.commons.conf.IoTDBConstant.OBJECT_STORAGE_DIR;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
 
 public class IoTDBConfig {
@@ -294,13 +295,13 @@ public class IoTDBConfig {
   private String mqttDir =
       IoTDBConstant.EXT_FOLDER_NAME + File.separator + IoTDBConstant.MQTT_FOLDER_NAME;
 
-  /** Data directories. It can be settled as dataDirs = {"data1", "data2", "data3"}; */
-  private String[] dataDirs = {
-    IoTDBConstant.DEFAULT_BASE_DIR + File.separator + IoTDBConstant.DATA_FOLDER_NAME
+  /** Tiered data directories. It can be settled as dataDirs = {{"data1"}, {"data2", "data3"}}; */
+  private String[][] tierDataDirs = {
+    {IoTDBConstant.DEFAULT_BASE_DIR + File.separator + IoTDBConstant.DATA_FOLDER_NAME}
   };
 
   private String loadTsFileDir =
-      dataDirs[0] + File.separator + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME;
+      tierDataDirs[0][0] + File.separator + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME;
 
   /** Strategy of multiple directories. */
   private String multiDirStrategyClassName = null;
@@ -725,6 +726,9 @@ public class IoTDBConfig {
   /** Default TSfile storage is in local file system */
   private FSType tsFileStorageFs = FSType.LOCAL;
 
+  /** Enable hdfs or not */
+  private boolean enableHDFS = false;
+
   /** Default core-site.xml file path is /etc/hadoop/conf/core-site.xml */
   private String coreSitePath = "/etc/hadoop/conf/core-site.xml";
 
@@ -1236,7 +1240,7 @@ public class IoTDBConfig {
     this.timeIndexLevel = TimeIndexLevel.valueOf(timeIndexLevel);
   }
 
-  void updatePath() {
+  public void updatePath() {
     formulateFolders();
     confirmMultiDirStrategy();
   }
@@ -1259,53 +1263,53 @@ public class IoTDBConfig {
     pipeDir = addDataHomeDir(pipeDir);
     pipeTemporaryLibDir = addDataHomeDir(pipeTemporaryLibDir);
     mqttDir = addDataHomeDir(mqttDir);
-
     extPipeDir = addDataHomeDir(extPipeDir);
+    queryDir = addDataHomeDir(queryDir);
+    formulateDataDirs(tierDataDirs);
+  }
 
-    if (TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs().equals(FSType.HDFS)) {
-      String hdfsDir = getHdfsDir();
-      queryDir = hdfsDir + File.separatorChar + queryDir;
-      for (int i = 0; i < dataDirs.length; i++) {
-        dataDirs[i] = hdfsDir + File.separatorChar + dataDirs[i];
-      }
-    } else {
-      queryDir = addDataHomeDir(queryDir);
-      for (int i = 0; i < dataDirs.length; i++) {
-        dataDirs[i] = addDataHomeDir(dataDirs[i]);
+  private void formulateDataDirs(String[][] tierDataDirs) {
+    for (int i = 0; i < tierDataDirs.length; i++) {
+      for (int j = 0; j < tierDataDirs[i].length; j++) {
+        if (tierDataDirs[i][j].equals(OBJECT_STORAGE_DIR)) {
+          // Notice: dataNodeId hasn't been initialized
+          tierDataDirs[i][j] = FSUtils.getOSDefaultPath(getObjectStorageBucket(), dataNodeId);
+        }
+        switch (FSUtils.getFSType(tierDataDirs[i][j])) {
+          case HDFS:
+            tierDataDirs[i][j] = getHdfsDir() + File.separatorChar + tierDataDirs[i][j];
+            break;
+          case LOCAL:
+            tierDataDirs[i][j] = addDataHomeDir(tierDataDirs[i][j]);
+            break;
+          case OBJECT_STORAGE:
+            tierDataDirs[i][j] = FSUtils.getOSDefaultPath(getObjectStorageBucket(), dataNodeId);
+            break;
+          default:
+            break;
+        }
       }
     }
   }
 
-  void reloadDataDirs(String[] dataDirs) throws LoadConfigurationException {
+  void reloadDataDirs(String[][] tierDataDirs) throws LoadConfigurationException {
     // format data directories
-    if (TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs().equals(FSType.HDFS)) {
-      String hdfsDir = getHdfsDir();
-      for (int i = 0; i < dataDirs.length; i++) {
-        dataDirs[i] = hdfsDir + File.separatorChar + dataDirs[i];
-      }
-    } else {
-      for (int i = 0; i < dataDirs.length; i++) {
-        dataDirs[i] = addDataHomeDir(dataDirs[i]);
-      }
-    }
+    formulateDataDirs(tierDataDirs);
     // make sure old data directories not removed
-    HashSet<String> newDirs = new HashSet<>(Arrays.asList(dataDirs));
-    for (String oldDir : this.dataDirs) {
-      if (!newDirs.contains(oldDir)) {
-        String msg =
-            String.format("%s is removed from data_dirs parameter, please add it back.", oldDir);
-        logger.error(msg);
-        throw new LoadConfigurationException(msg);
+    for (int i = 0; i < this.tierDataDirs.length; ++i) {
+      HashSet<String> newDirs = new HashSet<>(Arrays.asList(tierDataDirs[i]));
+      for (String oldDir : this.tierDataDirs[i]) {
+        if (!newDirs.contains(oldDir)) {
+          String msg =
+              String.format("%s is removed from data_dirs parameter, please add it back.", oldDir);
+          logger.error(msg);
+          throw new LoadConfigurationException(msg);
+        }
       }
     }
-    this.dataDirs = dataDirs;
-    DirectoryManager.getInstance().updateFileFolders();
+    this.tierDataDirs = tierDataDirs;
   }
 
-  //  private String addHomeDir(String dir) {
-  //    return addDirPrefix(System.getProperty(IoTDBConstant.IOTDB_HOME, null), dir);
-  //  }
-
   // if IOTDB_DATA_HOME is not set, then we keep dataHomeDir prefix being the same with IOTDB_HOME
   // In this way, we can keep consistent with v0.13.0~2.
   private String addDataHomeDir(String dir) {
@@ -1358,15 +1362,27 @@ public class IoTDBConfig {
   }
 
   public String[] getDataDirs() {
-    return dataDirs;
+    return Arrays.stream(tierDataDirs).flatMap(Arrays::stream).toArray(String[]::new);
+  }
+
+  public String[] getLocalDataDirs() {
+    return Arrays.stream(tierDataDirs)
+        .flatMap(Arrays::stream)
+        .filter(FSUtils::isLocal)
+        .toArray(String[]::new);
   }
 
-  public void setDataDirs(String[] dataDirs) {
-    this.dataDirs = dataDirs;
+  public String[][] getTierDataDirs() {
+    return tierDataDirs;
+  }
+
+  public void setTierDataDirs(String[][] tierDataDirs) {
+    formulateDataDirs(tierDataDirs);
+    this.tierDataDirs = tierDataDirs;
     // TODO(szywilliam): rewrite the logic here when ratis supports complete snapshot semantic
     setRatisDataRegionSnapshotDir(
-        dataDirs[0] + File.separator + IoTDBConstant.SNAPSHOT_FOLDER_NAME);
-    setLoadTsFileDir(dataDirs[0] + File.separator + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME);
+        tierDataDirs[0][0] + File.separator + IoTDBConstant.SNAPSHOT_FOLDER_NAME);
+    setLoadTsFileDir(tierDataDirs[0][0] + File.separator + IoTDBConstant.LOAD_TSFILE_FOLDER_NAME);
   }
 
   public String getRpcAddress() {
@@ -2433,6 +2449,14 @@ public class IoTDBConfig {
     this.tsFileStorageFs = FSType.valueOf(tsFileStorageFs);
   }
 
+  public boolean isEnableHDFS() {
+    return enableHDFS;
+  }
+
+  public void setEnableHDFS(boolean enableHDFS) {
+    this.enableHDFS = enableHDFS;
+  }
+
   String getCoreSitePath() {
     return coreSitePath;
   }
@@ -3500,7 +3524,14 @@ public class IoTDBConfig {
           continue;
         }
         String configType = configField.getGenericType().getTypeName();
-        if (configType.contains("java.lang.String[]")) {
+        if (configType.contains("java.lang.String[][]")) {
+          String[][] configList = (String[][]) configField.get(this);
+          StringBuilder builder = new StringBuilder();
+          for (String[] strings : configList) {
+            builder.append(Arrays.asList(strings)).append(";");
+          }
+          configContent = builder.toString();
+        } else if (configType.contains("java.lang.String[]")) {
           String[] configList = (String[]) configField.get(this);
           configContent = Arrays.asList(configList).toString();
         } else {
@@ -3910,4 +3941,8 @@ public class IoTDBConfig {
   public void setClusterSchemaLimitThreshold(long clusterSchemaLimitThreshold) {
     this.clusterSchemaLimitThreshold = clusterSchemaLimitThreshold;
   }
+
+  public String getObjectStorageBucket() {
+    throw new UnsupportedOperationException("object storage is not supported yet");
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index e0bd3c536f3..74b59fff731 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
 import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
 import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.conf.directories.TierManager;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.compaction.execute.performer.constant.CrossCompactionPerformer;
 import org.apache.iotdb.db.engine.compaction.execute.performer.constant.InnerSeqCompactionPerformer;
@@ -67,6 +67,8 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.net.MalformedURLException;
 import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 import java.util.ServiceLoader;
 
@@ -369,9 +371,14 @@ public class IoTDBDescriptor {
 
     conf.setQueryDir(
         FilePathUtils.regularizePath(conf.getSystemDir() + IoTDBConstant.QUERY_FOLDER_NAME));
-
-    conf.setDataDirs(
-        properties.getProperty("dn_data_dirs", String.join(",", conf.getDataDirs())).split(","));
+    String[] defaultTierDirs = new String[conf.getTierDataDirs().length];
+    for (int i = 0; i < defaultTierDirs.length; ++i) {
+      defaultTierDirs[i] = String.join(",", conf.getTierDataDirs()[i]);
+    }
+    conf.setTierDataDirs(
+        parseDataDirs(
+            properties.getProperty(
+                "dn_data_dirs", String.join(IoTDBConstant.TIER_SEPARATOR, defaultTierDirs))));
 
     conf.setConsensusDir(properties.getProperty("dn_consensus_dir", conf.getConsensusDir()));
 
@@ -768,6 +775,9 @@ public class IoTDBDescriptor {
 
     conf.setTsFileStorageFs(
         properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().toString()));
+    conf.setEnableHDFS(
+        Boolean.parseBoolean(
+            properties.getProperty("enable_hdfs", String.valueOf(conf.isEnableHDFS()))));
     conf.setCoreSitePath(properties.getProperty("core_site_path", conf.getCoreSitePath()));
     conf.setHdfsSitePath(properties.getProperty("hdfs_site_path", conf.getHdfsSitePath()));
     conf.setHdfsIp(properties.getProperty("hdfs_ip", conf.getRawHDFSIp()).split(","));
@@ -963,11 +973,13 @@ public class IoTDBDescriptor {
     conf.setExtPipeDir(properties.getProperty("ext_pipe_dir", conf.getExtPipeDir()).trim());
 
     // At the same time, set TSFileConfig
-    TSFileDescriptor.getInstance()
-        .getConfig()
-        .setTSFileStorageFs(
-            FSType.valueOf(
-                properties.getProperty("tsfile_storage_fs", conf.getTsFileStorageFs().name())));
+    List<FSType> fsTypes = new ArrayList<>();
+    fsTypes.add(FSType.LOCAL);
+    if (Boolean.parseBoolean(
+        properties.getProperty("enable_hdfs", String.valueOf(conf.isEnableHDFS())))) {
+      fsTypes.add(FSType.HDFS);
+    }
+    TSFileDescriptor.getInstance().getConfig().setTSFileStorageFs(fsTypes.toArray(new FSType[0]));
     TSFileDescriptor.getInstance()
         .getConfig()
         .setCoreSitePath(properties.getProperty("core_site_path", conf.getCoreSitePath()));
@@ -1502,23 +1514,33 @@ public class IoTDBDescriptor {
     }
   }
 
+  private String[][] parseDataDirs(String dataDirs) {
+    String[] tiers = dataDirs.split(IoTDBConstant.TIER_SEPARATOR);
+    String[][] tierDataDirs = new String[tiers.length][];
+    for (int i = 0; i < tiers.length; ++i) {
+      tierDataDirs[i] = tiers[i].split(",");
+    }
+    return tierDataDirs;
+  }
+
   public void loadHotModifiedProps(Properties properties) throws QueryProcessException {
     try {
       // update data dirs
       String dataDirs = properties.getProperty("dn_data_dirs", null);
       if (dataDirs != null) {
-        conf.reloadDataDirs(dataDirs.split(","));
+        conf.reloadDataDirs(parseDataDirs(dataDirs));
       }
 
-      // update dir strategy, must update after data dirs
+      // update dir strategy
       String multiDirStrategyClassName = properties.getProperty("dn_multi_dir_strategy", null);
       if (multiDirStrategyClassName != null
           && !multiDirStrategyClassName.equals(conf.getMultiDirStrategyClassName())) {
         conf.setMultiDirStrategyClassName(multiDirStrategyClassName);
         conf.confirmMultiDirStrategy();
-        DirectoryManager.getInstance().updateDirectoryStrategy();
       }
 
+      TierManager.getInstance().resetFolders();
+
       // update timed flush & close conf
       loadTimedService(properties);
       StorageEngine.getInstance().rebootTimedService();
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
index 88f27c912d4..20680724bae 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java
@@ -255,8 +255,8 @@ public class IoTDBStartCheck {
    * accessing same director.
    */
   public void checkDirectory() throws ConfigurationException, IOException {
-    // check data dirs
-    for (String dataDir : config.getDataDirs()) {
+    // check data dirs TODO(zhm) only check local directories
+    for (String dataDir : config.getLocalDataDirs()) {
       DirectoryChecker.getInstance().registerDirectory(new File(dataDir));
     }
     if (config.isClusterMode()
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryChecker.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryChecker.java
index 71c021dfa2e..411ebe98c13 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryChecker.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryChecker.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.conf.directories;
 
 import org.apache.iotdb.commons.exception.ConfigurationException;
 import org.apache.iotdb.commons.utils.ProcessIdUtils;
+import org.apache.iotdb.tsfile.utils.FSUtils;
 
 import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
@@ -93,6 +94,10 @@ public class DirectoryChecker {
     }
     Path root = mountOf(new File(dirs[0]).toPath());
     for (int i = 1; i < dirs.length; i++) {
+      // cross storage media
+      if (!FSUtils.isLocal(dirs[i])) {
+        return true;
+      }
       Path path = mountOf(new File(dirs[i]).toPath());
       if (!path.equals(root)) {
         return true;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
deleted file mode 100644
index 4b0d1471a79..00000000000
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/DirectoryManager.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.conf.directories;
-
-import org.apache.iotdb.commons.cluster.NodeStatus;
-import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategy;
-import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
-import org.apache.iotdb.db.exception.LoadConfigurationException;
-import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-/** The main class of multiple directories. Used to allocate folders to data files. */
-public class DirectoryManager {
-
-  private static final Logger logger = LoggerFactory.getLogger(DirectoryManager.class);
-
-  private List<String> sequenceFileFolders;
-  private List<String> unsequenceFileFolders;
-  private DirectoryStrategy sequenceStrategy;
-  private DirectoryStrategy unsequenceStrategy;
-
-  private DirectoryManager() {
-    sequenceFileFolders =
-        new ArrayList<>(Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()));
-    for (int i = 0; i < sequenceFileFolders.size(); i++) {
-      sequenceFileFolders.set(
-          i, sequenceFileFolders.get(i) + File.separator + IoTDBConstant.SEQUENCE_FLODER_NAME);
-    }
-    mkDataDirs(sequenceFileFolders);
-
-    unsequenceFileFolders =
-        new ArrayList<>(Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()));
-    for (int i = 0; i < unsequenceFileFolders.size(); i++) {
-      unsequenceFileFolders.set(
-          i, unsequenceFileFolders.get(i) + File.separator + IoTDBConstant.UNSEQUENCE_FLODER_NAME);
-    }
-    mkDataDirs(unsequenceFileFolders);
-
-    String strategyName = "";
-    try {
-      strategyName = IoTDBDescriptor.getInstance().getConfig().getMultiDirStrategyClassName();
-      Class<?> clazz = Class.forName(strategyName);
-      sequenceStrategy = (DirectoryStrategy) clazz.newInstance();
-      sequenceStrategy.setFolders(sequenceFileFolders);
-      unsequenceStrategy = (DirectoryStrategy) clazz.newInstance();
-      unsequenceStrategy.setFolders(unsequenceFileFolders);
-    } catch (DiskSpaceInsufficientException e) {
-      logger.error("All disks of folders are full.", e);
-    } catch (Exception e) {
-      logger.error("Can't find strategy {} for mult-directories.", strategyName, e);
-    }
-  }
-
-  public void updateFileFolders() throws LoadConfigurationException {
-    try {
-      List<String> sequenceFileFolders =
-          new ArrayList<>(Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()));
-      for (int i = 0; i < sequenceFileFolders.size(); i++) {
-        sequenceFileFolders.set(
-            i, sequenceFileFolders.get(i) + File.separator + IoTDBConstant.SEQUENCE_FLODER_NAME);
-      }
-      mkDataDirs(sequenceFileFolders);
-
-      List<String> unsequenceFileFolders =
-          new ArrayList<>(Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()));
-      for (int i = 0; i < unsequenceFileFolders.size(); i++) {
-        unsequenceFileFolders.set(
-            i,
-            unsequenceFileFolders.get(i) + File.separator + IoTDBConstant.UNSEQUENCE_FLODER_NAME);
-      }
-      mkDataDirs(unsequenceFileFolders);
-      sequenceStrategy.setFolders(sequenceFileFolders);
-      unsequenceStrategy.setFolders(unsequenceFileFolders);
-      this.sequenceFileFolders = sequenceFileFolders;
-      this.unsequenceFileFolders = unsequenceFileFolders;
-      logger.info("Success to update file folders.");
-    } catch (DiskSpaceInsufficientException e) {
-      logger.error("Fail to update file folders, use previous folders.", e);
-      throw new LoadConfigurationException(
-          "Fail to update file folders because all disks of folders are full, use previous folders.");
-    }
-  }
-
-  public void updateDirectoryStrategy() throws LoadConfigurationException {
-    String strategyName = "";
-    try {
-      strategyName = IoTDBDescriptor.getInstance().getConfig().getMultiDirStrategyClassName();
-      Class<?> clazz = Class.forName(strategyName);
-      sequenceStrategy = (DirectoryStrategy) clazz.newInstance();
-      sequenceStrategy.setFolders(sequenceFileFolders);
-      unsequenceStrategy = (DirectoryStrategy) clazz.newInstance();
-      unsequenceStrategy.setFolders(unsequenceFileFolders);
-      logger.info("Success to update directory strategy.");
-    } catch (Exception e) {
-      logger.error("Fail to update directory strategy {}, use previous strategy", strategyName, e);
-      throw new LoadConfigurationException(
-          String.format(
-              "Fail to update directory strategy because can't find strategy %s for mult-directories, use previous strategy",
-              strategyName));
-    }
-  }
-
-  public static DirectoryManager getInstance() {
-    return DirectoriesHolder.INSTANCE;
-  }
-
-  private void mkDataDirs(List<String> folders) {
-    for (String folder : folders) {
-      File file = FSFactoryProducer.getFSFactory().getFile(folder);
-      if (file.mkdirs()) {
-        logger.info("folder {} doesn't exist, create it", file.getPath());
-      } else {
-        logger.info(
-            "create folder {} failed. Is the folder existed: {}", file.getPath(), file.exists());
-      }
-    }
-  }
-
-  public String getNextFolderForSequenceFile() throws DiskSpaceInsufficientException {
-    try {
-      return sequenceFileFolders.get(sequenceStrategy.nextFolderIndex());
-    } catch (DiskSpaceInsufficientException e) {
-      logger.error("All disks of wal folders are full, change system mode to read-only.", e);
-      CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
-      throw e;
-    }
-  }
-
-  public List<String> getAllSequenceFileFolders() {
-    return new ArrayList<>(sequenceFileFolders);
-  }
-
-  public String getNextFolderForUnSequenceFile() throws DiskSpaceInsufficientException {
-    try {
-      return unsequenceFileFolders.get(unsequenceStrategy.nextFolderIndex());
-    } catch (DiskSpaceInsufficientException e) {
-      logger.error("All disks of wal folders are full, change system mode to read-only.", e);
-      CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
-      throw e;
-    }
-  }
-
-  public List<String> getAllUnSequenceFileFolders() {
-    return new ArrayList<>(unsequenceFileFolders);
-  }
-
-  public List<String> getAllFilesFolders() {
-    List<String> folders = new ArrayList<>(sequenceFileFolders);
-    folders.addAll(unsequenceFileFolders);
-    return folders;
-  }
-
-  @TestOnly
-  public void resetFolders() {
-    sequenceFileFolders =
-        new ArrayList<>(Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()));
-    for (int i = 0; i < sequenceFileFolders.size(); i++) {
-      sequenceFileFolders.set(
-          i, sequenceFileFolders.get(i) + File.separator + IoTDBConstant.SEQUENCE_FLODER_NAME);
-    }
-    unsequenceFileFolders =
-        new ArrayList<>(Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()));
-    for (int i = 0; i < unsequenceFileFolders.size(); i++) {
-      unsequenceFileFolders.set(
-          i, unsequenceFileFolders.get(i) + File.separator + IoTDBConstant.UNSEQUENCE_FLODER_NAME);
-    }
-  }
-
-  private static class DirectoriesHolder {
-    private static final DirectoryManager INSTANCE = new DirectoryManager();
-  }
-}
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/FolderManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/FolderManager.java
index 571a07a9ad4..9fb73749bc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/FolderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/FolderManager.java
@@ -61,7 +61,7 @@ public class FolderManager {
     try {
       this.selectStrategy.setFolders(folders);
     } catch (DiskSpaceInsufficientException e) {
-      logger.error("All disks of wal folders are full, change system mode to read-only.", e);
+      logger.error("All folders are full, change system mode to read-only.", e);
       CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
       throw e;
     }
@@ -71,7 +71,7 @@ public class FolderManager {
     try {
       return folders.get(selectStrategy.nextFolderIndex());
     } catch (DiskSpaceInsufficientException e) {
-      logger.error("All disks of wal folders are full, change system mode to read-only.", e);
+      logger.error("All folders are full, change system mode to read-only.", e);
       CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
       throw e;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
new file mode 100644
index 00000000000..e4dafa2df77
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.conf.directories;
+
+import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.strategy.DirectoryStrategyType;
+import org.apache.iotdb.db.conf.directories.strategy.MaxDiskUsableSpaceFirstStrategy;
+import org.apache.iotdb.db.conf.directories.strategy.MinFolderOccupiedSpaceFirstStrategy;
+import org.apache.iotdb.db.conf.directories.strategy.RandomOnDiskUsableSpaceStrategy;
+import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.fileSystem.FSType;
+import org.apache.iotdb.tsfile.utils.FSUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** The main class of multiple directories. Used to allocate folders to data files. */
+public class TierManager {
+  private static final Logger logger = LoggerFactory.getLogger(TierManager.class);
+  private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
+  private DirectoryStrategyType directoryStrategyType = DirectoryStrategyType.SEQUENCE_STRATEGY;
+  /**
+   * seq folder manager of each storage tier, managing both data directories and multi-dir strategy
+   */
+  private final List<FolderManager> seqTiers = new ArrayList<>();
+  /**
+   * unSeq folder manager of each storage tier, managing both data directories and multi-dir
+   * strategy
+   */
+  private final List<FolderManager> unSeqTiers = new ArrayList<>();
+  /** seq file folder's rawFsPath path -> tier level */
+  private final Map<String, Integer> seqDir2TierLevel = new HashMap<>();
+  /** unSeq file folder's rawFsPath path -> tier level */
+  private final Map<String, Integer> unSeqDir2TierLevel = new HashMap<>();
+  /** total space of each tier, Long.MAX_VALUE when one tier contains remote storage */
+  private long[] tierDiskTotalSpace;
+
+  private TierManager() {
+    initFolders();
+  }
+
+  public synchronized void initFolders() {
+    try {
+      String strategyName = Class.forName(config.getMultiDirStrategyClassName()).getSimpleName();
+      if (strategyName.equals(MaxDiskUsableSpaceFirstStrategy.class.getSimpleName())) {
+        directoryStrategyType = DirectoryStrategyType.MAX_DISK_USABLE_SPACE_FIRST_STRATEGY;
+      } else if (strategyName.equals(MinFolderOccupiedSpaceFirstStrategy.class.getSimpleName())) {
+        directoryStrategyType = DirectoryStrategyType.MIN_FOLDER_OCCUPIED_SPACE_FIRST_STRATEGY;
+      } else if (strategyName.equals(RandomOnDiskUsableSpaceStrategy.class.getSimpleName())) {
+        directoryStrategyType = DirectoryStrategyType.RANDOM_ON_DISK_USABLE_SPACE_STRATEGY;
+      }
+    } catch (Exception e) {
+      logger.error(
+          "Can't find strategy {} for mult-directories.", config.getMultiDirStrategyClassName(), e);
+    }
+
+    config.updatePath();
+    String[][] tierDirs = config.getTierDataDirs();
+    for (int i = 0; i < tierDirs.length; ++i) {
+      for (int j = 0; j < tierDirs[i].length; ++j) {
+        switch (FSUtils.getFSType(tierDirs[i][j])) {
+          case LOCAL:
+            try {
+              tierDirs[i][j] = new File(tierDirs[i][j]).getCanonicalPath();
+            } catch (IOException e) {
+              logger.error("Fail to get canonical path of data dir {}", tierDirs[i][j], e);
+            }
+            break;
+          case OBJECT_STORAGE:
+          case HDFS:
+          default:
+            break;
+        }
+      }
+    }
+
+    for (int tierLevel = 0; tierLevel < tierDirs.length; ++tierLevel) {
+      List<String> seqDirs =
+          Arrays.stream(tierDirs[tierLevel])
+              .filter(Objects::nonNull)
+              .map(
+                  v ->
+                      FSFactoryProducer.getFSFactory()
+                          .getFile(v, IoTDBConstant.SEQUENCE_FLODER_NAME)
+                          .getPath())
+              .collect(Collectors.toList());
+      mkDataDirs(seqDirs);
+      try {
+        seqTiers.add(new FolderManager(seqDirs, directoryStrategyType));
+      } catch (DiskSpaceInsufficientException e) {
+        logger.error("All disks of tier {} are full.", tierLevel, e);
+      }
+      for (String dir : seqDirs) {
+        seqDir2TierLevel.put(dir, tierLevel);
+      }
+
+      List<String> unSeqDirs =
+          Arrays.stream(tierDirs[tierLevel])
+              .filter(Objects::nonNull)
+              .map(
+                  v ->
+                      FSFactoryProducer.getFSFactory()
+                          .getFile(v, IoTDBConstant.UNSEQUENCE_FLODER_NAME)
+                          .getPath())
+              .collect(Collectors.toList());
+      mkDataDirs(unSeqDirs);
+      try {
+        unSeqTiers.add(new FolderManager(unSeqDirs, directoryStrategyType));
+      } catch (DiskSpaceInsufficientException e) {
+        logger.error("All disks of tier {} are full.", tierLevel, e);
+      }
+      for (String dir : unSeqDirs) {
+        unSeqDir2TierLevel.put(dir, tierLevel);
+      }
+    }
+
+    tierDiskTotalSpace = getTierDiskSpace(DiskSpaceType.TOTAL);
+  }
+
+  public synchronized void resetFolders() {
+    seqTiers.clear();
+    unSeqTiers.clear();
+    seqDir2TierLevel.clear();
+    unSeqDir2TierLevel.clear();
+
+    initFolders();
+  }
+
+  private void mkDataDirs(List<String> folders) {
+    for (String folder : folders) {
+      File file = FSFactoryProducer.getFSFactory().getFile(folder);
+      if (FSUtils.getFSType(file) == FSType.OBJECT_STORAGE) {
+        continue;
+      }
+      if (file.mkdirs()) {
+        logger.info("folder {} doesn't exist, create it", file.getPath());
+      } else {
+        logger.info(
+            "create folder {} failed. Is the folder existed: {}", file.getPath(), file.exists());
+      }
+    }
+  }
+
+  public String getNextFolderForTsFile(int tierLevel, boolean sequence)
+      throws DiskSpaceInsufficientException {
+    return sequence
+        ? seqTiers.get(tierLevel).getNextFolder()
+        : unSeqTiers.get(tierLevel).getNextFolder();
+  }
+
+  public List<String> getAllFilesFolders() {
+    List<String> folders = new ArrayList<>(seqDir2TierLevel.keySet());
+    folders.addAll(unSeqDir2TierLevel.keySet());
+    return folders;
+  }
+
+  public List<String> getAllLocalFilesFolders() {
+    return getAllFilesFolders().stream().filter(FSUtils::isLocal).collect(Collectors.toList());
+  }
+
+  public List<String> getAllSequenceFileFolders() {
+    return new ArrayList<>(seqDir2TierLevel.keySet());
+  }
+
+  public List<String> getAllLocalSequenceFileFolders() {
+    return seqDir2TierLevel.keySet().stream().filter(FSUtils::isLocal).collect(Collectors.toList());
+  }
+
+  public List<String> getAllUnSequenceFileFolders() {
+    return new ArrayList<>(unSeqDir2TierLevel.keySet());
+  }
+
+  public List<String> getAllLocalUnSequenceFileFolders() {
+    return unSeqDir2TierLevel.keySet().stream()
+        .filter(FSUtils::isLocal)
+        .collect(Collectors.toList());
+  }
+
+  public int getTiersNum() {
+    return seqTiers.size();
+  }
+
+  public int getFileTierLevel(File file) {
+    // If the file does not exist on Local disk, it is assumed be on remote Object Storage
+    if (!file.exists()) {
+      return getTiersNum() - 1;
+    }
+
+    String filePath;
+    try {
+      filePath = file.getCanonicalPath();
+    } catch (IOException e) {
+      logger.error("Fail to get canonical path of data dir {}", file, e);
+      filePath = file.getPath();
+    }
+
+    for (Map.Entry<String, Integer> entry : seqDir2TierLevel.entrySet()) {
+      if (filePath.startsWith(entry.getKey())) {
+        return entry.getValue();
+      }
+    }
+    for (Map.Entry<String, Integer> entry : unSeqDir2TierLevel.entrySet()) {
+      if (filePath.startsWith(entry.getKey())) {
+        return entry.getValue();
+      }
+    }
+    return 0;
+  }
+
+  public long[] getTierDiskTotalSpace() {
+    return Arrays.copyOf(tierDiskTotalSpace, tierDiskTotalSpace.length);
+  }
+
+  public long[] getTierDiskUsableSpace() {
+    return getTierDiskSpace(DiskSpaceType.USABLE);
+  }
+
+  private long[] getTierDiskSpace(DiskSpaceType type) {
+    String[][] tierDirs = config.getTierDataDirs();
+    long[] tierDiskSpace = new long[tierDirs.length];
+    for (int tierLevel = 0; tierLevel < tierDirs.length; ++tierLevel) {
+      Set<FileStore> tierFileStores = new HashSet<>();
+      for (String dir : tierDirs[tierLevel]) {
+        if (!FSUtils.isLocal(dir)) {
+          tierDiskSpace[tierLevel] = Long.MAX_VALUE;
+          break;
+        }
+        // get the FileStore of each local dir
+        Path path = Paths.get(dir);
+        FileStore fileStore = null;
+        try {
+          fileStore = Files.getFileStore(path);
+        } catch (IOException e) {
+          // check parent if path is not exists
+          path = path.getParent();
+          try {
+            fileStore = Files.getFileStore(path);
+          } catch (IOException innerException) {
+            logger.error("Failed to get storage path of {}, because", dir, innerException);
+          }
+        }
+        // update space info
+        if (fileStore != null && !tierFileStores.contains(fileStore)) {
+          tierFileStores.add(fileStore);
+          try {
+            switch (type) {
+              case TOTAL:
+                tierDiskSpace[tierLevel] += fileStore.getTotalSpace();
+                break;
+              case USABLE:
+                tierDiskSpace[tierLevel] += fileStore.getUsableSpace();
+                break;
+              default:
+                break;
+            }
+          } catch (IOException e) {
+            logger.error("Failed to statistic the size of {}, because", fileStore, e);
+          }
+        }
+      }
+    }
+    return tierDiskSpace;
+  }
+
+  private enum DiskSpaceType {
+    TOTAL,
+    USABLE,
+  }
+
+  public static TierManager getInstance() {
+    return TierManagerHolder.INSTANCE;
+  }
+
+  private static class TierManagerHolder {
+
+    private static final TierManager INSTANCE = new TierManager();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
index 61dc6a418cd..875188e8638 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/StorageEngine.java
@@ -674,7 +674,7 @@ public class StorageEngine implements IService {
               .deleteWALNode(
                   region.getDatabaseName() + FILE_NAME_SEPARATOR + region.getDataRegionId());
           // delete snapshot
-          for (String dataDir : config.getDataDirs()) {
+          for (String dataDir : config.getLocalDataDirs()) {
             File regionSnapshotDir =
                 new File(
                     dataDir + File.separator + IoTDBConstant.SNAPSHOT_FOLDER_NAME,
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/recover/CompactionRecoverManager.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/recover/CompactionRecoverManager.java
index b51ff61268d..390ddf2b512 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/recover/CompactionRecoverManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/recover/CompactionRecoverManager.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.engine.compaction.execute.recover;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.conf.directories.TierManager;
 import org.apache.iotdb.db.engine.compaction.execute.utils.log.CompactionLogger;
 import org.apache.iotdb.db.engine.storagegroup.TsFileManager;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -68,9 +68,9 @@ public class CompactionRecoverManager {
   private void recoverCompaction(boolean isInnerSpace, boolean isLogSequence) {
     List<String> dirs;
     if (isLogSequence) {
-      dirs = DirectoryManager.getInstance().getAllSequenceFileFolders();
+      dirs = TierManager.getInstance().getAllLocalSequenceFileFolders();
     } else {
-      dirs = DirectoryManager.getInstance().getAllUnSequenceFileFolders();
+      dirs = TierManager.getInstance().getAllLocalUnSequenceFileFolders();
     }
     for (String dir : dirs) {
       File storageGroupDir =
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/recover/CompactionRecoverTask.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/recover/CompactionRecoverTask.java
index 6f45ba2a7b6..b75940a7970 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/recover/CompactionRecoverTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/recover/CompactionRecoverTask.java
@@ -248,7 +248,7 @@ public class CompactionRecoverTask {
    * the file is not found, it will return null.
    */
   private File getFileFromDataDirs(String filePath) {
-    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs();
     for (String dataDir : dataDirs) {
       File f = new File(dataDir, filePath);
       if (f.exists()) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/log/TsFileIdentifier.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/log/TsFileIdentifier.java
index 8cac1223993..7842d115fea 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/log/TsFileIdentifier.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/execute/utils/log/TsFileIdentifier.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.engine.compaction.execute.utils.log;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 
 import java.io.File;
 
@@ -192,7 +193,7 @@ public class TsFileIdentifier {
             + File.separator
             + filename;
     for (String dataDir : dataDirs) {
-      File file = new File(dataDir, partialFileString);
+      File file = FSFactoryProducer.getFSFactory().getFile(dataDir, partialFileString);
       if (file.exists()) {
         return file;
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
index a5922082d3f..6fdd3bacabf 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotLoader.java
@@ -171,7 +171,7 @@ public class SnapshotLoader {
   }
 
   private void deleteAllFilesInDataDirs() throws IOException {
-    String[] dataDirPaths = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    String[] dataDirPaths = IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs();
 
     // delete
     List<File> timePartitions = new ArrayList<>();
@@ -250,7 +250,7 @@ public class SnapshotLoader {
     }
     FolderManager folderManager =
         new FolderManager(
-            Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getDataDirs()),
+            Arrays.asList(IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs()),
             DirectoryStrategyType.SEQUENCE_STRATEGY);
     File[] timePartitionFolders = seqFileDir.listFiles();
     if (timePartitionFolders != null) {
@@ -318,7 +318,7 @@ public class SnapshotLoader {
     String snapshotId = logAnalyzer.getSnapshotId();
     int loggedFileNum = logAnalyzer.getTotalFileCountInSnapshot();
     Set<String> fileInfoSet = logAnalyzer.getFileInfoSet();
-    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs();
     int fileCnt = 0;
     for (String dataDir : dataDirs) {
       String snapshotDir =
@@ -454,7 +454,7 @@ public class SnapshotLoader {
     SnapshotLogAnalyzer analyzer = new SnapshotLogAnalyzer(logFile);
     try {
       String snapshotId = analyzer.getSnapshotId();
-      String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+      String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs();
       List<File> fileList = new LinkedList<>();
       for (String dataDir : dataDirs) {
         String snapshotDir =
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
index c801e40ddd7..4b47d151978 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/snapshot/SnapshotTaker.java
@@ -249,7 +249,7 @@ public class SnapshotTaker {
 
   private void cleanUpWhenFail(String snapshotId) {
     LOGGER.info("Cleaning up snapshot dir for {}", snapshotId);
-    for (String dataDir : IoTDBDescriptor.getInstance().getConfig().getDataDirs()) {
+    for (String dataDir : IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs()) {
       File dataDirForThisSnapshot =
           new File(
               dataDir
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index e3ee6e73df9..fe7185a4772 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -37,7 +37,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.conf.directories.TierManager;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.TsFileMetricManager;
 import org.apache.iotdb.db.engine.cache.BloomFilterCache;
@@ -103,8 +103,10 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.fileSystem.FSType;
 import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
 import org.apache.iotdb.tsfile.read.filter.basic.Filter;
+import org.apache.iotdb.tsfile.utils.FSUtils;
 import org.apache.iotdb.tsfile.utils.Pair;
 import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
 import org.apache.iotdb.tsfile.write.writer.RestorableTsFileIOWriter;
@@ -326,25 +328,20 @@ public class DataRegion implements IDataRegionForQuery {
           "Skip recovering data region {}[{}] when consensus protocol is ratis and storage engine is not ready.",
           databaseName,
           dataRegionId);
-      for (String fileFolder : DirectoryManager.getInstance().getAllFilesFolders()) {
+      for (String fileFolder : TierManager.getInstance().getAllFilesFolders()) {
         File dataRegionFolder =
             fsFactory.getFile(fileFolder, databaseName + File.separator + dataRegionId);
-        if (dataRegionFolder.exists()) {
-          File[] timePartitions = dataRegionFolder.listFiles();
-          if (timePartitions != null) {
-            for (File timePartition : timePartitions) {
-              try {
-                FileUtils.forceDelete(timePartition);
-              } catch (IOException e) {
-                logger.error(
-                    "Exception occurs when deleting time partition directory {} for {}-{}",
-                    timePartitions,
-                    databaseName,
-                    dataRegionId,
-                    e);
-              }
-            }
-          }
+        try {
+          fsFactory.deleteDirectory(dataRegionFolder.getPath());
+        } catch (IOException e) {
+          logger.error(
+              "Exception occurs when deleting data region folder for {}-{}",
+              databaseName,
+              dataRegionId,
+              e);
+        }
+        if (FSUtils.getFSType(dataRegionFolder) == FSType.LOCAL) {
+          dataRegionFolder.mkdirs();
         }
       }
     } else {
@@ -443,12 +440,12 @@ public class DataRegion implements IDataRegionForQuery {
     try {
       // collect candidate TsFiles from sequential and unsequential data directory
       Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair =
-          getAllFiles(DirectoryManager.getInstance().getAllSequenceFileFolders());
+          getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
       List<TsFileResource> tmpSeqTsFiles = seqTsFilesPair.left;
       List<TsFileResource> oldSeqTsFiles = seqTsFilesPair.right;
       upgradeSeqFileList.addAll(oldSeqTsFiles);
       Pair<List<TsFileResource>, List<TsFileResource>> unseqTsFilesPair =
-          getAllFiles(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
+          getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
       List<TsFileResource> tmpUnseqTsFiles = unseqTsFilesPair.left;
       List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right;
       upgradeUnseqFileList.addAll(oldUnseqTsFiles);
@@ -683,7 +680,8 @@ public class DataRegion implements IDataRegionForQuery {
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity warning
   private Pair<List<TsFileResource>, List<TsFileResource>> getAllFiles(List<String> folders)
       throws IOException, DataRegionException {
-    List<File> tsFiles = new ArrayList<>();
+    // "{partition id}/{tsfile name}" -> tsfile file, remove duplicate files in one time partition
+    Map<String, File> tsFilePartitionPath2File = new HashMap<>();
     List<File> upgradeFiles = new ArrayList<>();
     for (String baseDir : folders) {
       File fileFolder = fsFactory.getFile(baseDir + File.separator + databaseName, dataRegionId);
@@ -707,9 +705,14 @@ public class DataRegion implements IDataRegionForQuery {
             // resources
             continueFailedRenames(partitionFolder, TEMP_SUFFIX);
 
-            Collections.addAll(
-                tsFiles,
-                fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX));
+            String partitionName = partitionFolder.getName();
+            File[] tsFilesInThisFolder =
+                fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), TSFILE_SUFFIX);
+            for (File f : tsFilesInThisFolder) {
+              String tsFilePartitionPath = partitionName + File.separator + f.getName();
+              tsFilePartitionPath2File.put(tsFilePartitionPath, f);
+            }
+
           } else {
             // collect old TsFiles for upgrading
             Collections.addAll(
@@ -720,19 +723,19 @@ public class DataRegion implements IDataRegionForQuery {
       }
     }
 
-    tsFiles.sort(this::compareFileName);
-    if (!tsFiles.isEmpty()) {
-      checkTsFileTime(tsFiles.get(tsFiles.size() - 1));
-    }
-    List<TsFileResource> ret = new ArrayList<>();
-    tsFiles.forEach(f -> ret.add(new TsFileResource(f)));
+    List<File> sortedFiles = new ArrayList<>(tsFilePartitionPath2File.values());
+    sortedFiles.sort(this::compareFileName);
 
-    upgradeFiles.sort(this::compareFileName);
-    if (!upgradeFiles.isEmpty()) {
-      checkTsFileTime(upgradeFiles.get(upgradeFiles.size() - 1));
+    long currentTime = System.currentTimeMillis();
+    List<TsFileResource> ret = new ArrayList<>();
+    for (File f : sortedFiles) {
+      checkTsFileTime(f, currentTime);
+      ret.add(new TsFileResource(f));
     }
+    upgradeFiles.sort(this::compareFileName);
     List<TsFileResource> upgradeRet = new ArrayList<>();
     for (File f : upgradeFiles) {
+      checkTsFileTime(f, currentTime);
       TsFileResource fileResource = new TsFileResource(f);
       fileResource.setStatus(TsFileResourceStatus.NORMAL);
       // make sure the flush command is called before IoTDB is down.
@@ -757,10 +760,9 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   /** check if the tsfile's time is smaller than system current time. */
-  private void checkTsFileTime(File tsFile) throws DataRegionException {
+  private void checkTsFileTime(File tsFile, long currentTime) throws DataRegionException {
     String[] items = tsFile.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
     long fileTime = Long.parseLong(items[0]);
-    long currentTime = System.currentTimeMillis();
     if (fileTime > currentTime) {
       throw new DataRegionException(
           String.format(
@@ -1554,7 +1556,7 @@ public class DataRegion implements IDataRegionForQuery {
               TsFileMetricManager.getInstance().decreaseModFileSize(x.getModFile().getSize());
             }
           });
-      deleteAllSGFolders(DirectoryManager.getInstance().getAllFilesFolders());
+      deleteAllSGFolders(TierManager.getInstance().getAllFilesFolders());
 
       this.workSequenceTsFileProcessors.clear();
       this.workUnsequenceTsFileProcessors.clear();
@@ -1570,9 +1572,17 @@ public class DataRegion implements IDataRegionForQuery {
     for (String tsfilePath : folder) {
       File dataRegionDataFolder =
           fsFactory.getFile(tsfilePath, databaseName + File.separator + dataRegionId);
-      if (dataRegionDataFolder.exists()) {
-        org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent(
-            dataRegionDataFolder);
+      if (FSUtils.getFSType(dataRegionDataFolder) != FSType.LOCAL) {
+        try {
+          fsFactory.deleteDirectory(dataRegionDataFolder.getPath());
+        } catch (IOException e) {
+          logger.error("Fail to delete data region folder {}", dataRegionDataFolder);
+        }
+      } else {
+        if (dataRegionDataFolder.exists()) {
+          org.apache.iotdb.commons.utils.FileUtils.deleteDirectoryAndEmptyParent(
+              dataRegionDataFolder);
+        }
       }
     }
   }
@@ -1611,7 +1621,6 @@ public class DataRegion implements IDataRegionForQuery {
     }
     tsFileManager.remove(resource, isSeq);
     // ensure that the file is not used by any queries
-
     resource.writeLock();
     try {
       // try to delete physical data file
@@ -2543,7 +2552,7 @@ public class DataRegion implements IDataRegionForQuery {
     File targetFile;
     targetFile =
         fsFactory.getFile(
-            DirectoryManager.getInstance().getNextFolderForUnSequenceFile(),
+            TierManager.getInstance().getNextFolderForTsFile(0, false),
             databaseName
                 + File.separatorChar
                 + dataRegionId
@@ -2716,7 +2725,7 @@ public class DataRegion implements IDataRegionForQuery {
    * @param fileToBeUnloaded tsfile to be unloaded
    * @return whether the file to be unloaded exists. @UsedBy load external tsfile module.
    */
-  public boolean unloadTsfile(File fileToBeUnloaded, File targetDir) {
+  public boolean unloadTsfile(File fileToBeUnloaded, File targetDir) throws IOException {
     writeLock("unloadTsfile");
     TsFileResource tsFileResourceToBeMoved = null;
     try {
@@ -3076,8 +3085,8 @@ public class DataRegion implements IDataRegionForQuery {
   /** @return the disk space occupied by this data region, unit is MB */
   public long countRegionDiskSize() {
     AtomicLong diskSize = new AtomicLong(0);
-    DirectoryManager.getInstance()
-        .getAllFilesFolders()
+    TierManager.getInstance()
+        .getAllLocalFilesFolders()
         .forEach(
             folder -> {
               folder = folder + File.separator + databaseName + File.separator + dataRegionId;
@@ -3091,7 +3100,7 @@ public class DataRegion implements IDataRegionForQuery {
    * @param diskSize the disk space occupied by this folder, unit is MB
    */
   private void countFolderDiskSize(String folder, AtomicLong diskSize) {
-    File file = new File(folder);
+    File file = FSFactoryProducer.getFSFactory().getFile(folder);
     File[] allFile = file.listFiles();
     if (allFile == null) {
       return;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
index 71fc9f11f68..b4e936a6cfc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.storagegroup;
 
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.conf.directories.TierManager;
 import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
 import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
@@ -78,11 +78,8 @@ public class TsFileNameGenerator {
       String virtualStorageGroup,
       long timePartitionId)
       throws DiskSpaceInsufficientException {
-    DirectoryManager directoryManager = DirectoryManager.getInstance();
-    String baseDir =
-        sequence
-            ? directoryManager.getNextFolderForSequenceFile()
-            : directoryManager.getNextFolderForUnSequenceFile();
+    TierManager tierManager = TierManager.getInstance();
+    String baseDir = tierManager.getNextFolderForTsFile(0, sequence);
     return baseDir
         + File.separator
         + logicalStorageGroup
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index f8d752df5f8..cbc609e3605 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -189,11 +189,13 @@ public class TsFileProcessor {
       boolean sequence)
       throws IOException {
     this.storageGroupName = storageGroupName;
+    // this.sequence should be assigned at first because `this` will be passed as parameter to other
+    // val later
+    this.sequence = sequence;
     this.tsFileResource = new TsFileResource(tsfile, this);
     this.dataRegionInfo = dataRegionInfo;
     this.writer = new RestorableTsFileIOWriter(tsfile);
     this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
-    this.sequence = sequence;
     this.walNode =
         WALManager.getInstance()
             .applyForWALNode(WALManager.getApplicantUniqueId(storageGroupName, sequence));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index c10ce6f1ae5..e6c430da223 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.TierManager;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion.SettleTsFileCallBack;
@@ -135,6 +136,8 @@ public class TsFileResource {
 
   private long ramSize;
 
+  private volatile int tierLevel = 0;
+
   private volatile long tsFileSize = -1L;
 
   private TsFileProcessor processor;
@@ -177,6 +180,8 @@ public class TsFileResource {
     this.minPlanIndex = other.minPlanIndex;
     this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
     this.tsFileSize = other.tsFileSize;
+    this.isSeq = other.isSeq;
+    this.tierLevel = other.tierLevel;
     this.maxProgressIndex = other.maxProgressIndex;
   }
 
@@ -185,6 +190,10 @@ public class TsFileResource {
     this.file = file;
     this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
     this.timeIndex = CONFIG.getTimeIndexLevel().getTimeIndex();
+    this.isSeq = FilePathUtils.isSequence(this.file.getAbsolutePath());
+    // This method is invoked when DataNode recovers, so the tierLevel should be calculated when
+    // restarting
+    this.tierLevel = TierManager.getInstance().getFileTierLevel(file);
   }
 
   /** Used for compaction to create target files. */
@@ -199,6 +208,10 @@ public class TsFileResource {
     this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
     this.timeIndex = CONFIG.getTimeIndexLevel().getTimeIndex();
     this.processor = processor;
+    this.isSeq = processor.isSequence();
+    // this method is invoked when a new TsFile is created and a newly created TsFile's the
+    // tierLevel is 0 by default
+    this.tierLevel = 0;
   }
 
   /** unsealed TsFile, for query */
@@ -214,6 +227,8 @@ public class TsFileResource {
     this.pathToChunkMetadataListMap.put(path, chunkMetadataList);
     this.originTsFileResource = originTsFileResource;
     this.version = originTsFileResource.version;
+    this.isSeq = originTsFileResource.isSeq;
+    this.tierLevel = originTsFileResource.tierLevel;
   }
 
   /** unsealed TsFile, for query */
@@ -229,6 +244,8 @@ public class TsFileResource {
     generatePathToTimeSeriesMetadataMap();
     this.originTsFileResource = originTsFileResource;
     this.version = originTsFileResource.version;
+    this.isSeq = originTsFileResource.isSeq;
+    this.tierLevel = originTsFileResource.tierLevel;
   }
 
   @TestOnly
@@ -396,9 +413,10 @@ public class TsFileResource {
     return compactionModFile;
   }
 
-  public void resetModFile() {
+  public void resetModFile() throws IOException {
     if (modFile != null) {
       synchronized (this) {
+        modFile.close();
         modFile = null;
       }
     }
@@ -416,6 +434,14 @@ public class TsFileResource {
     return file.getPath();
   }
 
+  public void increaseTierLevel() {
+    this.tierLevel++;
+  }
+
+  public int getTierLevel() {
+    return tierLevel;
+  }
+
   public long getTsFileSize() {
     if (isClosed()) {
       if (tsFileSize == -1) {
@@ -460,8 +486,7 @@ public class TsFileResource {
   public DeviceTimeIndex buildDeviceTimeIndex() throws IOException {
     readLock();
     try (InputStream inputStream =
-        FSFactoryProducer.getFSFactory()
-            .getBufferedInputStream(file.getPath() + TsFileResource.RESOURCE_SUFFIX)) {
+        FSFactoryProducer.getFSFactory().getBufferedInputStream(file.getPath() + RESOURCE_SUFFIX)) {
       ReadWriteIOUtils.readByte(inputStream);
       ITimeIndex timeIndexFromResourceFile = ITimeIndex.createTimeIndex(inputStream);
       if (!(timeIndexFromResourceFile instanceof DeviceTimeIndex)) {
@@ -470,7 +495,7 @@ public class TsFileResource {
       return (DeviceTimeIndex) timeIndexFromResourceFile;
     } catch (Exception e) {
       throw new IOException(
-          "Can't read file " + file.getPath() + TsFileResource.RESOURCE_SUFFIX + " from disk", e);
+          "Can't read file " + file.getPath() + RESOURCE_SUFFIX + " from disk", e);
     } finally {
       readUnlock();
     }
@@ -617,7 +642,7 @@ public class TsFileResource {
     return true;
   }
 
-  void moveTo(File targetDir) {
+  void moveTo(File targetDir) throws IOException {
     fsFactory.moveFile(file, fsFactory.getFile(targetDir, file.getName()));
     fsFactory.moveFile(
         fsFactory.getFile(file.getPath() + RESOURCE_SUFFIX),
@@ -664,6 +689,10 @@ public class TsFileResource {
     return getStatus() == TsFileResourceStatus.COMPACTION_CANDIDATE;
   }
 
+  public boolean onRemote() {
+    return !isDeleted() && !file.exists();
+  }
+
   private boolean compareAndSetStatus(
       TsFileResourceStatus expectedValue, TsFileResourceStatus newValue) {
     return atomicStatus.compareAndSet(expectedValue, newValue);
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
index 4ec2b14aaae..97940fb472e 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/upgrade/UpgradeTask.java
@@ -19,7 +19,7 @@
 package org.apache.iotdb.db.engine.upgrade;
 
 import org.apache.iotdb.commons.concurrent.WrappedRunnable;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.conf.directories.TierManager;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
 import org.apache.iotdb.db.service.UpgradeSevice;
 import org.apache.iotdb.db.tools.upgrade.TsFileOnlineUpgradeTool;
@@ -70,8 +70,8 @@ public class UpgradeTask extends WrappedRunnable {
           UpgradeSevice.getTotalUpgradeFileNum().get());
       if (UpgradeSevice.getTotalUpgradeFileNum().get() == 0) {
         logger.info("Start delete empty tmp folders");
-        clearTmpFolders(DirectoryManager.getInstance().getAllSequenceFileFolders());
-        clearTmpFolders(DirectoryManager.getInstance().getAllUnSequenceFileFolders());
+        clearTmpFolders(TierManager.getInstance().getAllLocalSequenceFileFolders());
+        clearTmpFolders(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
         UpgradeSevice.getINSTANCE().stop();
         logger.info("All files upgraded successfully! ");
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index e153f902fbd..af67df5e1dc 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -57,6 +57,7 @@ import org.apache.iotdb.db.conf.DataNodeStartupCheck;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.conf.IoTDBStartCheck;
+import org.apache.iotdb.db.conf.directories.TierManager;
 import org.apache.iotdb.db.conf.rest.IoTDBRestServiceDescriptor;
 import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
@@ -167,7 +168,9 @@ public class DataNode implements DataNodeMBean {
         // Send restart request of this DataNode
         sendRestartRequestToConfigNode();
       }
-
+      // TierManager need DataNodeId to do some operations so the reset method need to be invoked
+      // after DataNode adding
+      TierManager.getInstance().resetFolders();
       // Active DataNode
       active();
 
diff --git a/server/src/main/java/org/apache/iotdb/db/service/metrics/SystemMetrics.java b/server/src/main/java/org/apache/iotdb/db/service/metrics/SystemMetrics.java
index 81b3c104678..b2bdc7e2dbd 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/metrics/SystemMetrics.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/metrics/SystemMetrics.java
@@ -187,7 +187,7 @@ public class SystemMetrics implements IMetricSet {
   }
 
   private void collectSystemDiskInfo(AbstractMetricService metricService) {
-    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
+    String[] dataDirs = IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs();
     for (String dataDir : dataDirs) {
       Path path = Paths.get(dataDir);
       FileStore fileStore = null;
diff --git a/server/src/main/java/org/apache/iotdb/db/utils/OpenFileNumUtil.java b/server/src/main/java/org/apache/iotdb/db/utils/OpenFileNumUtil.java
index d4fd49b76fb..e01f241047b 100644
--- a/server/src/main/java/org/apache/iotdb/db/utils/OpenFileNumUtil.java
+++ b/server/src/main/java/org/apache/iotdb/db/utils/OpenFileNumUtil.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.conf.directories.TierManager;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +54,7 @@ public class OpenFileNumUtil {
 
   private static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
-  private static DirectoryManager directoryManager = DirectoryManager.getInstance();
+  private static TierManager tierManager = TierManager.getInstance();
   private static final String[] COMMAND_TEMPLATE = {"/bin/bash", "-c", ""};
   private static boolean isOutputValid = false;
   private int pid;
@@ -266,8 +266,8 @@ public class OpenFileNumUtil {
 
   public enum OpenFileNumStatistics {
     TOTAL_OPEN_FILE_NUM(null),
-    SEQUENCE_FILE_OPEN_NUM(directoryManager.getAllSequenceFileFolders()),
-    UNSEQUENCE_FILE_OPEN_NUM(directoryManager.getAllUnSequenceFileFolders()),
+    SEQUENCE_FILE_OPEN_NUM(tierManager.getAllLocalSequenceFileFolders()),
+    UNSEQUENCE_FILE_OPEN_NUM(tierManager.getAllLocalUnSequenceFileFolders()),
     WAL_OPEN_FILE_NUM(Arrays.asList(commonConfig.getWalDirs())),
     DIGEST_OPEN_FILE_NUM(Collections.singletonList(config.getSystemDir())),
     SOCKET_OPEN_FILE_NUM(null);
diff --git a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java
index 1d9047d349c..aa08daf4151 100644
--- a/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java
+++ b/server/src/main/java/org/apache/iotdb/db/wal/recover/file/AbstractTsFileRecoverPerformer.java
@@ -63,10 +63,6 @@ public abstract class AbstractTsFileRecoverPerformer implements Closeable {
       // delete chunk metadata temp file
       FileUtils.delete(chunkMetadataTempFile);
     }
-    if (!tsFile.exists()) {
-      logger.error("TsFile {} is missing, will skip its recovery.", tsFile);
-      return;
-    }
 
     if (tsFileResource.resourceFileExists()) {
       // .resource file exists, just deserialize it into memory
@@ -74,6 +70,11 @@ public abstract class AbstractTsFileRecoverPerformer implements Closeable {
       return;
     }
 
+    if (!tsFile.exists()) {
+      logger.error("TsFile {} is missing, will skip its recovery.", tsFile);
+      return;
+    }
+
     // try to remove corrupted part of the TsFile
     try {
       writer = new RestorableTsFileIOWriter(tsFile);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/TsFileIdentifierUT.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/TsFileIdentifierUT.java
index b05c34d0d27..5fa8328a2af 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/TsFileIdentifierUT.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/TsFileIdentifierUT.java
@@ -156,14 +156,18 @@ public class TsFileIdentifierUT {
 
   @Test
   public void testGetInfoFromFileFromMultiDirs() throws Exception {
+
     IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
-    String[] originDataDirs = config.getDataDirs();
+    String[][] originDataDirs = config.getTierDataDirs();
     Class configClass = config.getClass();
-    Field dataDirField = configClass.getDeclaredField("dataDirs");
-    dataDirField.setAccessible(true);
-    dataDirField.set(
+    Field dataDirsField = configClass.getDeclaredField("tierDataDirs");
+    dataDirsField.setAccessible(true);
+    dataDirsField.set(
         config,
-        new String[] {"target" + File.separator + "data1", "target" + File.separator + "data2"});
+        new String[][] {
+          {"target" + File.separator + "data1", "target" + File.separator + "data2"}
+        });
+
     String filePath =
         "sequence"
             + File.separator
@@ -187,7 +191,7 @@ public class TsFileIdentifierUT {
       Assert.assertTrue(testFile.createNewFile());
       Assert.assertTrue(Files.isSameFile(testFile.toPath(), info.getFileFromDataDirs().toPath()));
     } finally {
-      dataDirField.set(config, originDataDirs);
+      dataDirsField.set(config, originDataDirs);
       Files.deleteIfExists(testFile.toPath());
       FileUtils.deleteDirectory(new File("target" + File.separator + "data2"));
     }
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverTest.java b/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverTest.java
index c6e62c33fcb..d195c6af061 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/compaction/recover/SizeTieredCompactionRecoverTest.java
@@ -119,14 +119,14 @@ public class SizeTieredCompactionRecoverTest {
   static final MeasurementSchema[] schemas = new MeasurementSchema[fullPaths.length];
   static String logFilePath =
       TestConstant.BASE_OUTPUT_PATH + File.separator + "test-compaction.compaction.log";
-  static String[] originDataDirs = null;
-  static String[] testDataDirs = new String[] {TestConstant.BASE_OUTPUT_PATH + "data"};
+  static String[][] originDataDirs = null;
+  static String[][] testDataDirs = new String[][] {{TestConstant.BASE_OUTPUT_PATH + "data"}};
   static IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
 
   @Before
   public void setUp() throws Exception {
     CompactionTaskManager.getInstance().start();
-    originDataDirs = config.getDataDirs();
+    originDataDirs = config.getTierDataDirs();
     setDataDirs(testDataDirs);
     if (!new File(SEQ_FILE_DIR).exists()) {
       Assert.assertTrue(new File(SEQ_FILE_DIR).mkdirs());
@@ -142,7 +142,7 @@ public class SizeTieredCompactionRecoverTest {
     new CompactionConfigRestorer().restoreCompactionConfig();
     CompactionTaskManager.getInstance().stop();
     setDataDirs(originDataDirs);
-    File dataDir = new File(testDataDirs[0]);
+    File dataDir = new File(testDataDirs[0][0]);
     if (dataDir.exists()) {
       FileUtils.forceDelete(dataDir);
     }
@@ -166,9 +166,9 @@ public class SizeTieredCompactionRecoverTest {
     }
   }
 
-  public void setDataDirs(String[] dataDirs) throws Exception {
+  public void setDataDirs(String[][] dataDirs) throws Exception {
     Class configClass = config.getClass();
-    Field dataDirsField = configClass.getDeclaredField("dataDirs");
+    Field dataDirsField = configClass.getDeclaredField("tierDataDirs");
     dataDirsField.setAccessible(true);
     dataDirsField.set(config, dataDirs);
   }
@@ -470,7 +470,7 @@ public class SizeTieredCompactionRecoverTest {
       FileUtils.moveDirectory(
           new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data"),
           new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"));
-      setDataDirs(new String[] {TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"});
+      setDataDirs(new String[][] {{TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"}});
       CompactionRecoverTask recoverTask =
           new CompactionRecoverTask(
               COMPACTION_TEST_SG, "0", tsFileManager, new File(logFilePath), true);
@@ -573,7 +573,7 @@ public class SizeTieredCompactionRecoverTest {
       FileUtils.moveDirectory(
           new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data"),
           new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"));
-      setDataDirs(new String[] {TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"});
+      setDataDirs(new String[][] {{TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"}});
       CompactionRecoverTask recoverTask =
           new CompactionRecoverTask(
               COMPACTION_TEST_SG, "0", tsFileManager, new File(logFilePath), true);
@@ -667,7 +667,7 @@ public class SizeTieredCompactionRecoverTest {
       FileUtils.moveDirectory(
           new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data"),
           new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"));
-      setDataDirs(new String[] {TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"});
+      setDataDirs(new String[][] {{TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"}});
       CompactionRecoverTask recoverTask =
           new CompactionRecoverTask(
               COMPACTION_TEST_SG, "0", tsFileManager, new File(logFilePath), true);
@@ -770,7 +770,7 @@ public class SizeTieredCompactionRecoverTest {
       FileUtils.moveDirectory(
           new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data"),
           new File(TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"));
-      setDataDirs(new String[] {TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"});
+      setDataDirs(new String[][] {{TestConstant.BASE_OUTPUT_PATH + File.separator + "data1"}});
       CompactionRecoverTask recoverTask =
           new CompactionRecoverTask(
               COMPACTION_TEST_SG, "0", tsFileManager, new File(logFilePath), true);
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/snapshot/IoTDBSnapshotTest.java b/server/src/test/java/org/apache/iotdb/db/engine/snapshot/IoTDBSnapshotTest.java
index 69bcead7257..74e3119d281 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/snapshot/IoTDBSnapshotTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/snapshot/IoTDBSnapshotTest.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.db.engine.snapshot;
 
 import org.apache.iotdb.commons.utils.FileUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.conf.directories.TierManager;
 import org.apache.iotdb.db.engine.snapshot.exception.DirectoryNotLegalException;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
@@ -46,8 +46,8 @@ import java.util.List;
 import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
 
 public class IoTDBSnapshotTest {
-  private String[] testDataDirs =
-      new String[] {"target/data/data1", "target/data/data2", "target/data/data3"};
+  private String[][] testDataDirs =
+      new String[][] {{"target/data/data1", "target/data/data2", "target/data/data3"}};
   private String testSgName = "root.testsg";
 
   @Before
@@ -66,7 +66,7 @@ public class IoTDBSnapshotTest {
     List<TsFileResource> resources = new ArrayList<>();
     for (int i = 0; i < 100; i++) {
       String filePath =
-          testDataDirs[i % 3]
+          testDataDirs[0][i % 3]
               + File.separator
               + "sequence"
               + File.separator
@@ -94,9 +94,9 @@ public class IoTDBSnapshotTest {
   @Test
   public void testCreateSnapshot()
       throws IOException, WriteProcessException, DataRegionException, DirectoryNotLegalException {
-    String[] originDataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
-    IoTDBDescriptor.getInstance().getConfig().setDataDirs(testDataDirs);
-    DirectoryManager.getInstance().resetFolders();
+    String[][] originDataDirs = IoTDBDescriptor.getInstance().getConfig().getTierDataDirs();
+    IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(testDataDirs);
+    TierManager.getInstance().resetFolders();
     try {
       List<TsFileResource> resources = writeTsFiles();
       DataRegion region = new DataRegion(testSgName, "0");
@@ -120,17 +120,17 @@ public class IoTDBSnapshotTest {
         FileUtils.recursiveDeleteFolder(snapshotDir.getAbsolutePath());
       }
     } finally {
-      IoTDBDescriptor.getInstance().getConfig().setDataDirs(originDataDirs);
-      DirectoryManager.getInstance().resetFolders();
+      IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(originDataDirs);
+      TierManager.getInstance().resetFolders();
     }
   }
 
   @Test
   public void testCreateSnapshotWithUnclosedTsFile()
       throws IOException, WriteProcessException, DirectoryNotLegalException {
-    String[] originDataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
-    IoTDBDescriptor.getInstance().getConfig().setDataDirs(testDataDirs);
-    DirectoryManager.getInstance().resetFolders();
+    String[][] originDataDirs = IoTDBDescriptor.getInstance().getConfig().getTierDataDirs();
+    IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(testDataDirs);
+    TierManager.getInstance().resetFolders();
     try {
       List<TsFileResource> resources = writeTsFiles();
       resources.subList(50, 100).forEach(x -> x.setStatusForTest(TsFileResourceStatus.UNCLOSED));
@@ -156,17 +156,17 @@ public class IoTDBSnapshotTest {
         FileUtils.recursiveDeleteFolder(snapshotDir.getAbsolutePath());
       }
     } finally {
-      IoTDBDescriptor.getInstance().getConfig().setDataDirs(originDataDirs);
-      DirectoryManager.getInstance().resetFolders();
+      IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(originDataDirs);
+      TierManager.getInstance().resetFolders();
     }
   }
 
   @Test
   public void testLoadSnapshot()
       throws IOException, WriteProcessException, DataRegionException, DirectoryNotLegalException {
-    String[] originDataDirs = IoTDBDescriptor.getInstance().getConfig().getDataDirs();
-    IoTDBDescriptor.getInstance().getConfig().setDataDirs(testDataDirs);
-    DirectoryManager.getInstance().resetFolders();
+    String[][] originDataDirs = IoTDBDescriptor.getInstance().getConfig().getTierDataDirs();
+    IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(testDataDirs);
+    TierManager.getInstance().resetFolders();
     try {
       List<TsFileResource> resources = writeTsFiles();
       DataRegion region = new DataRegion(testSgName, "0");
@@ -186,8 +186,8 @@ public class IoTDBSnapshotTest {
         FileUtils.recursiveDeleteFolder(snapshotDir.getAbsolutePath());
       }
     } finally {
-      IoTDBDescriptor.getInstance().getConfig().setDataDirs(originDataDirs);
-      DirectoryManager.getInstance().resetFolders();
+      IoTDBDescriptor.getInstance().getConfig().setTierDataDirs(originDataDirs);
+      TierManager.getInstance().resetFolders();
     }
   }
 
@@ -195,7 +195,7 @@ public class IoTDBSnapshotTest {
   public void testGetSnapshotFile() throws IOException {
     File tsFile =
         new File(
-            IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0]
+            IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs()[0]
                 + File.separator
                 + "sequence"
                 + File.separator
@@ -213,7 +213,7 @@ public class IoTDBSnapshotTest {
         new SnapshotTaker(region).getSnapshotFilePathForTsFile(tsFile, "test-snapshotId");
     Assert.assertEquals(
         new File(
-                IoTDBDescriptor.getInstance().getConfig().getDataDirs()[0]
+                IoTDBDescriptor.getInstance().getConfig().getLocalDataDirs()[0]
                     + File.separator
                     + "snapshot"
                     + File.separator
diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 3c671aafc2f..c2392bff9ec 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.MeasurementPath;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.conf.directories.TierManager;
 import org.apache.iotdb.db.engine.compaction.execute.utils.reader.IDataBlockReader;
 import org.apache.iotdb.db.engine.compaction.execute.utils.reader.SeriesDataBlockReader;
 import org.apache.iotdb.db.engine.flush.TsFileFlushPolicy.DirectFlushPolicy;
@@ -249,8 +249,8 @@ public class TTLTest {
     dataRegion.syncCloseAllWorkingTsFileProcessors();
 
     // files before ttl
-    File seqDir = new File(DirectoryManager.getInstance().getNextFolderForSequenceFile(), sg1);
-    File unseqDir = new File(DirectoryManager.getInstance().getNextFolderForUnSequenceFile(), sg1);
+    File seqDir = new File(TierManager.getInstance().getNextFolderForTsFile(0, true), sg1);
+    File unseqDir = new File(TierManager.getInstance().getNextFolderForTsFile(0, false), sg1);
 
     List<File> seqFiles = new ArrayList<>();
     for (File directory : seqDir.listFiles()) {
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index 0596cf07e89..7807f272fd8 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -27,7 +27,7 @@ import org.apache.iotdb.commons.udf.service.UDFManagementService;
 import org.apache.iotdb.db.auth.AuthorizerManager;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.conf.directories.DirectoryManager;
+import org.apache.iotdb.db.conf.directories.TierManager;
 import org.apache.iotdb.db.constant.TestConstant;
 import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.BloomFilterCache;
@@ -52,6 +52,7 @@ import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.recover.WALRecoverManager;
 import org.apache.iotdb.rpc.TConfigurationConst;
 import org.apache.iotdb.rpc.TSocketWrapper;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.iotdb.tsfile.utils.FilePathUtils;
 import org.apache.iotdb.udf.api.exception.UDFManagementException;
 
@@ -81,7 +82,7 @@ public class EnvironmentUtils {
 
   private static final IoTDBConfig config = IoTDBDescriptor.getInstance().getConfig();
   private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
-  private static final DirectoryManager directoryManager = DirectoryManager.getInstance();
+  private static final TierManager tierManager = TierManager.getInstance();
 
   public static long TEST_QUERY_JOB_ID = 1;
   public static QueryContext TEST_QUERY_CONTEXT = new QueryContext(TEST_QUERY_JOB_ID);
@@ -230,11 +231,11 @@ public class EnvironmentUtils {
 
   public static void cleanAllDir() throws IOException {
     // delete sequential files
-    for (String path : directoryManager.getAllSequenceFileFolders()) {
+    for (String path : tierManager.getAllLocalSequenceFileFolders()) {
       cleanDir(path);
     }
     // delete unsequence files
-    for (String path : directoryManager.getAllUnSequenceFileFolders()) {
+    for (String path : tierManager.getAllLocalUnSequenceFileFolders()) {
       cleanDir(path);
     }
     // delete system info
@@ -264,7 +265,7 @@ public class EnvironmentUtils {
   }
 
   public static void cleanDir(String dir) throws IOException {
-    FileUtils.deleteDirectory(new File(dir));
+    FSFactoryProducer.getFSFactory().deleteDirectory(dir);
   }
 
   /** disable memory control</br> this function should be called before all code in the setup */
@@ -318,11 +319,11 @@ public class EnvironmentUtils {
 
   private static void createAllDir() {
     // create sequential files
-    for (String path : directoryManager.getAllSequenceFileFolders()) {
+    for (String path : tierManager.getAllLocalSequenceFileFolders()) {
       createDir(path);
     }
     // create unsequential files
-    for (String path : directoryManager.getAllUnSequenceFileFolders()) {
+    for (String path : tierManager.getAllLocalUnSequenceFileFolders()) {
       createDir(path);
     }
     // create database
@@ -353,7 +354,7 @@ public class EnvironmentUtils {
   }
 
   private static void createDir(String dir) {
-    File file = new File(dir);
+    File file = FSFactoryProducer.getFSFactory().getFile(dir);
     file.mkdirs();
   }
 
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
index 4dd967556c8..1442a1d418f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.tsfile.common.conf;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
 import org.apache.iotdb.tsfile.fileSystem.FSType;
+import org.apache.iotdb.tsfile.utils.FSUtils;
 
 import java.io.Serializable;
 import java.nio.charset.Charset;
@@ -119,7 +120,7 @@ public class TSFileConfig implements Serializable {
   /** Default endian value is BIG_ENDIAN. */
   private String endian = "BIG_ENDIAN";
   /** Default storage is in local file system */
-  private FSType TSFileStorageFs = FSType.LOCAL;
+  private FSType[] TSFileStorageFs = new FSType[] {FSType.LOCAL};
   /** Default core-site.xml file path is /etc/hadoop/conf/core-site.xml */
   private String coreSitePath = "/etc/hadoop/conf/core-site.xml";
   /** Default hdfs-site.xml file path is /etc/hadoop/conf/hdfs-site.xml */
@@ -159,6 +160,18 @@ public class TSFileConfig implements Serializable {
 
   private int patternMatchingThreshold = 1000000;
 
+  private String hdfsFile = "org.apache.iotdb.hadoop.fileSystem.HDFSFile";
+
+  private String hdfsTsFileInput = "org.apache.iotdb.hadoop.fileSystem.HDFSInput";
+
+  private String hdfsTsFileOutput = "org.apache.iotdb.hadoop.fileSystem.HDFSOutput";
+
+  private String objectStorageFile = "org.apache.iotdb.os.fileSystem.OSFile";
+
+  private String objectStorageTsFileInput = "org.apache.iotdb.os.fileSystem.OSTsFileInput";
+
+  private String objectStorageTsFileOutput = "org.apache.iotdb.os.fileSystem.OSTsFileOutput";
+
   /** customizedProperties, this should be empty by default. */
   private Properties customizedProperties = new Properties();
 
@@ -348,12 +361,13 @@ public class TSFileConfig implements Serializable {
     this.bloomFilterErrorRate = bloomFilterErrorRate;
   }
 
-  public FSType getTSFileStorageFs() {
+  public FSType[] getTSFileStorageFs() {
     return this.TSFileStorageFs;
   }
 
-  public void setTSFileStorageFs(FSType fileStorageFs) {
+  public void setTSFileStorageFs(FSType[] fileStorageFs) {
     this.TSFileStorageFs = fileStorageFs;
+    FSUtils.reload();
   }
 
   public String getCoreSitePath() {
@@ -479,4 +493,52 @@ public class TSFileConfig implements Serializable {
   public String getSprintzPredictScheme() {
     return "fire";
   }
+
+  public String getHdfsFile() {
+    return hdfsFile;
+  }
+
+  public void setHdfsFile(String hdfsFile) {
+    this.hdfsFile = hdfsFile;
+  }
+
+  public String getHdfsTsFileInput() {
+    return hdfsTsFileInput;
+  }
+
+  public void setHdfsTsFileInput(String hdfsTsFileInput) {
+    this.hdfsTsFileInput = hdfsTsFileInput;
+  }
+
+  public String getHdfsTsFileOutput() {
+    return hdfsTsFileOutput;
+  }
+
+  public void setHdfsTsFileOutput(String hdfsTsFileOutput) {
+    this.hdfsTsFileOutput = hdfsTsFileOutput;
+  }
+
+  public String getObjectStorageFile() {
+    return objectStorageFile;
+  }
+
+  public void setObjectStorageFile(String objectStorageFile) {
+    this.objectStorageFile = objectStorageFile;
+  }
+
+  public String getObjectStorageTsFileInput() {
+    return objectStorageTsFileInput;
+  }
+
+  public void setObjectStorageTsFileInput(String objectStorageTsFileInput) {
+    this.objectStorageTsFileInput = objectStorageTsFileInput;
+  }
+
+  public String getObjectStorageTsFileOutput() {
+    return objectStorageTsFileOutput;
+  }
+
+  public void setObjectStorageTsFileOutput(String objectStorageTsFileOutput) {
+    this.objectStorageTsFileOutput = objectStorageTsFileOutput;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSFactoryProducer.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSFactoryProducer.java
index bd497b8a2f6..09413d318a4 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSFactoryProducer.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSFactoryProducer.java
@@ -19,36 +19,17 @@
 
 package org.apache.iotdb.tsfile.fileSystem;
 
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.fileSystem.fileInputFactory.FileInputFactory;
-import org.apache.iotdb.tsfile.fileSystem.fileInputFactory.HDFSInputFactory;
-import org.apache.iotdb.tsfile.fileSystem.fileInputFactory.LocalFSInputFactory;
+import org.apache.iotdb.tsfile.fileSystem.fileInputFactory.HybridFileInputFactory;
 import org.apache.iotdb.tsfile.fileSystem.fileOutputFactory.FileOutputFactory;
-import org.apache.iotdb.tsfile.fileSystem.fileOutputFactory.HDFSOutputFactory;
-import org.apache.iotdb.tsfile.fileSystem.fileOutputFactory.LocalFSOutputFactory;
+import org.apache.iotdb.tsfile.fileSystem.fileOutputFactory.HybridFileOutputFactory;
 import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
-import org.apache.iotdb.tsfile.fileSystem.fsFactory.HDFSFactory;
-import org.apache.iotdb.tsfile.fileSystem.fsFactory.LocalFSFactory;
+import org.apache.iotdb.tsfile.fileSystem.fsFactory.HybridFSFactory;
 
 public class FSFactoryProducer {
-
-  private static FSType fSType = TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs();
-
-  private static FSFactory fsFactory;
-  private static FileInputFactory fileInputFactory;
-  private static FileOutputFactory fileOutputFactory;
-
-  static {
-    if (fSType.equals(FSType.HDFS)) {
-      fsFactory = new HDFSFactory();
-      fileInputFactory = new HDFSInputFactory();
-      fileOutputFactory = new HDFSOutputFactory();
-    } else {
-      fsFactory = new LocalFSFactory();
-      fileInputFactory = new LocalFSInputFactory();
-      fileOutputFactory = new LocalFSOutputFactory();
-    }
-  }
+  private static FSFactory fsFactory = new HybridFSFactory();
+  private static FileInputFactory fileInputFactory = new HybridFileInputFactory();
+  private static FileOutputFactory fileOutputFactory = new HybridFileOutputFactory();
 
   public static FSFactory getFSFactory() {
     return fsFactory;
@@ -58,6 +39,10 @@ public class FSFactoryProducer {
     return fileInputFactory;
   }
 
+  public static void setFileInputFactory(FileInputFactory fileInputFactory) {
+    FSFactoryProducer.fileInputFactory = fileInputFactory;
+  }
+
   public static FileOutputFactory getFileOutputFactory() {
     return fileOutputFactory;
   }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSPath.java
similarity index 70%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSType.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSPath.java
index 764df632d00..cf26786f609 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSPath.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *      http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
@@ -16,10 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.tsfile.fileSystem;
 
-public enum FSType {
-  LOCAL,
-  HDFS
+public class FSPath {
+  private final FSType fsType;
+  private final String path;
+
+  public FSPath(FSType fsType, String path) {
+    this.fsType = fsType;
+    this.path = path;
+  }
+
+  public FSType getFsType() {
+    return fsType;
+  }
+
+  public String getPath() {
+    return path;
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSType.java
index 764df632d00..22e275c4c4f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/FSType.java
@@ -21,5 +21,6 @@ package org.apache.iotdb.tsfile.fileSystem;
 
 public enum FSType {
   LOCAL,
-  HDFS
+  HDFS,
+  OBJECT_STORAGE,
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HDFSInputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HDFSInputFactory.java
index 6cd1c54050b..6735df82416 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HDFSInputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HDFSInputFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.tsfile.fileSystem.fileInputFactory;
 
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.read.reader.TsFileInput;
 
 import org.slf4j.Logger;
@@ -31,11 +32,12 @@ import java.lang.reflect.InvocationTargetException;
 public class HDFSInputFactory implements FileInputFactory {
 
   private static final Logger logger = LoggerFactory.getLogger(HDFSInputFactory.class);
-  private static Constructor constructor;
+  private Constructor constructor;
 
-  static {
+  public HDFSInputFactory() {
     try {
-      Class<?> clazz = Class.forName("org.apache.iotdb.hadoop.fileSystem.HDFSInput");
+      Class<?> clazz =
+          Class.forName(TSFileDescriptor.getInstance().getConfig().getHdfsTsFileInput());
       constructor = clazz.getConstructor(String.class);
     } catch (ClassNotFoundException | NoSuchMethodException e) {
       logger.error(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HDFSInputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HybridFileInputFactory.java
similarity index 52%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HDFSInputFactory.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HybridFileInputFactory.java
index 6cd1c54050b..783f9dc0145 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HDFSInputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HybridFileInputFactory.java
@@ -16,44 +16,47 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.tsfile.fileSystem.fileInputFactory;
 
+import org.apache.iotdb.tsfile.fileSystem.FSPath;
+import org.apache.iotdb.tsfile.fileSystem.FSType;
 import org.apache.iotdb.tsfile.read.reader.TsFileInput;
+import org.apache.iotdb.tsfile.utils.FSUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-public class HDFSInputFactory implements FileInputFactory {
-
-  private static final Logger logger = LoggerFactory.getLogger(HDFSInputFactory.class);
-  private static Constructor constructor;
-
-  static {
-    try {
-      Class<?> clazz = Class.forName("org.apache.iotdb.hadoop.fileSystem.HDFSInput");
-      constructor = clazz.getConstructor(String.class);
-    } catch (ClassNotFoundException | NoSuchMethodException e) {
-      logger.error(
-          "Failed to get HDFSInput in Hadoop file system. Please check your dependency of Hadoop module.",
-          e);
-    }
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HybridFileInputFactory implements FileInputFactory {
+  private static final Logger logger = LoggerFactory.getLogger(HybridFileInputFactory.class);
+  private static final Map<FSType, FileInputFactory> inputFactories = new ConcurrentHashMap<>();
+
+  private FileInputFactory getFileInputFactory(FSType fsType) {
+    return inputFactories.compute(
+        fsType,
+        (k, v) -> {
+          if (v != null) {
+            return v;
+          }
+          switch (fsType) {
+            case LOCAL:
+              return new LocalFSInputFactory();
+            case OBJECT_STORAGE:
+              return new OSFileInputFactory();
+            case HDFS:
+              return new HDFSInputFactory();
+            default:
+              return null;
+          }
+        });
   }
 
   @Override
   public TsFileInput getTsFileInput(String filePath) throws IOException {
-    try {
-      return (TsFileInput) constructor.newInstance(filePath);
-    } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
-      throw new IOException(
-          String.format(
-              "Failed to get TsFile input of file: %s. Please check your dependency of Hadoop module.",
-              filePath),
-          e);
-    }
+    FSPath path = FSUtils.parse(filePath);
+    return getFileInputFactory(path.getFsType()).getTsFileInput(path.getPath());
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HDFSInputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/OSFileInputFactory.java
similarity index 75%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HDFSInputFactory.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/OSFileInputFactory.java
index 6cd1c54050b..ce397f5edc3 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HDFSInputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/OSFileInputFactory.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.tsfile.fileSystem.fileInputFactory;
 
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.read.reader.TsFileInput;
 
 import org.slf4j.Logger;
@@ -28,18 +28,18 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 
-public class HDFSInputFactory implements FileInputFactory {
-
-  private static final Logger logger = LoggerFactory.getLogger(HDFSInputFactory.class);
-  private static Constructor constructor;
+public class OSFileInputFactory implements FileInputFactory {
+  private static final Logger logger = LoggerFactory.getLogger(OSFileInputFactory.class);
+  private Constructor constructor;
 
-  static {
+  public OSFileInputFactory() {
     try {
-      Class<?> clazz = Class.forName("org.apache.iotdb.hadoop.fileSystem.HDFSInput");
+      Class<?> clazz =
+          Class.forName(TSFileDescriptor.getInstance().getConfig().getObjectStorageTsFileInput());
       constructor = clazz.getConstructor(String.class);
     } catch (ClassNotFoundException | NoSuchMethodException e) {
       logger.error(
-          "Failed to get HDFSInput in Hadoop file system. Please check your dependency of Hadoop module.",
+          "Failed to get OSInput in object storage. Please check your dependency of object storage module.",
           e);
     }
   }
@@ -51,7 +51,7 @@ public class HDFSInputFactory implements FileInputFactory {
     } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
       throw new IOException(
           String.format(
-              "Failed to get TsFile input of file: %s. Please check your dependency of Hadoop module.",
+              "Failed to get TsFile input of file: %s. Please check your dependency of object storage module.",
               filePath),
           e);
     }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
index b1d3675047a..914801d38c9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.tsfile.fileSystem.fileOutputFactory;
 
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
 
 import org.slf4j.Logger;
@@ -30,11 +31,12 @@ import java.lang.reflect.InvocationTargetException;
 public class HDFSOutputFactory implements FileOutputFactory {
 
   private static final Logger logger = LoggerFactory.getLogger(HDFSOutputFactory.class);
-  private static Constructor constructor;
+  private Constructor constructor;
 
-  static {
+  public HDFSOutputFactory() {
     try {
-      Class<?> clazz = Class.forName("org.apache.iotdb.hadoop.fileSystem.HDFSOutput");
+      Class<?> clazz =
+          Class.forName(TSFileDescriptor.getInstance().getConfig().getHdfsTsFileOutput());
       constructor = clazz.getConstructor(String.class, boolean.class);
     } catch (ClassNotFoundException | NoSuchMethodException e) {
       logger.error(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HybridFileOutputFactory.java
similarity index 51%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HybridFileOutputFactory.java
index b1d3675047a..de971cf306d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HybridFileOutputFactory.java
@@ -16,43 +16,46 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.tsfile.fileSystem.fileOutputFactory;
 
+import org.apache.iotdb.tsfile.fileSystem.FSPath;
+import org.apache.iotdb.tsfile.fileSystem.FSType;
+import org.apache.iotdb.tsfile.utils.FSUtils;
 import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-public class HDFSOutputFactory implements FileOutputFactory {
-
-  private static final Logger logger = LoggerFactory.getLogger(HDFSOutputFactory.class);
-  private static Constructor constructor;
-
-  static {
-    try {
-      Class<?> clazz = Class.forName("org.apache.iotdb.hadoop.fileSystem.HDFSOutput");
-      constructor = clazz.getConstructor(String.class, boolean.class);
-    } catch (ClassNotFoundException | NoSuchMethodException e) {
-      logger.error(
-          "Failed to get HDFSInput in Hadoop file system. Please check your dependency of Hadoop module.",
-          e);
-    }
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HybridFileOutputFactory implements FileOutputFactory {
+  private static final Logger logger = LoggerFactory.getLogger(HybridFileOutputFactory.class);
+  private static final Map<FSType, FileOutputFactory> outputFactories = new ConcurrentHashMap<>();
+
+  private FileOutputFactory getFileOutputFactory(FSType fsType) {
+    return outputFactories.compute(
+        fsType,
+        (k, v) -> {
+          if (v != null) {
+            return v;
+          }
+          switch (fsType) {
+            case LOCAL:
+              return new LocalFSOutputFactory();
+            case OBJECT_STORAGE:
+              return new OSFileOutputFactory();
+            case HDFS:
+              return new HDFSOutputFactory();
+            default:
+              return null;
+          }
+        });
   }
 
   @Override
   public TsFileOutput getTsFileOutput(String filePath, boolean append) {
-    try {
-      return (TsFileOutput) constructor.newInstance(filePath, !append);
-    } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
-      logger.error(
-          "Failed to get TsFile output of file: {}. Please check your dependency of Hadoop module.",
-          filePath,
-          e);
-      return null;
-    }
+    FSPath path = FSUtils.parse(filePath);
+    return getFileOutputFactory(path.getFsType()).getTsFileOutput(path.getPath(), append);
   }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/OSFileOutputFactory.java
similarity index 75%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/OSFileOutputFactory.java
index b1d3675047a..9bb723c0b4f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/HDFSOutputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/OSFileOutputFactory.java
@@ -16,9 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.tsfile.fileSystem.fileOutputFactory;
 
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
 
 import org.slf4j.Logger;
@@ -27,18 +27,18 @@ import org.slf4j.LoggerFactory;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 
-public class HDFSOutputFactory implements FileOutputFactory {
-
-  private static final Logger logger = LoggerFactory.getLogger(HDFSOutputFactory.class);
-  private static Constructor constructor;
+public class OSFileOutputFactory implements FileOutputFactory {
+  private static final Logger logger = LoggerFactory.getLogger(OSFileOutputFactory.class);
+  private Constructor constructor;
 
-  static {
+  public OSFileOutputFactory() {
     try {
-      Class<?> clazz = Class.forName("org.apache.iotdb.hadoop.fileSystem.HDFSOutput");
+      Class<?> clazz =
+          Class.forName(TSFileDescriptor.getInstance().getConfig().getObjectStorageTsFileOutput());
       constructor = clazz.getConstructor(String.class, boolean.class);
     } catch (ClassNotFoundException | NoSuchMethodException e) {
       logger.error(
-          "Failed to get HDFSInput in Hadoop file system. Please check your dependency of Hadoop module.",
+          "Failed to get OSInput in object storage. Please check your dependency of object storage module.",
           e);
     }
   }
@@ -49,7 +49,7 @@ public class HDFSOutputFactory implements FileOutputFactory {
       return (TsFileOutput) constructor.newInstance(filePath, !append);
     } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
       logger.error(
-          "Failed to get TsFile output of file: {}. Please check your dependency of Hadoop module.",
+          "Failed to get TsFile output of file: {}. Please check your dependency of object storage module.",
           filePath,
           e);
       return null;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
index 804e880090e..0e53690144d 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
@@ -110,7 +110,15 @@ public interface FSFactory {
    * @param srcFile src file
    * @param destFile dest file
    */
-  void moveFile(File srcFile, File destFile);
+  void moveFile(File srcFile, File destFile) throws IOException;
+
+  /**
+   * copy file
+   *
+   * @param srcFile src file
+   * @param destFile dest file
+   */
+  void copyFile(File srcFile, File destFile) throws IOException;
 
   /**
    * list file by suffix
@@ -136,4 +144,7 @@ public interface FSFactory {
    * @param file local file or HDFS file
    */
   boolean deleteIfExists(File file) throws IOException;
+
+  /** Force delete the directory */
+  void deleteDirectory(String dir) throws IOException;
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
index b1197bffa1f..fac32d0a756 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
@@ -19,6 +19,10 @@
 
 package org.apache.iotdb.tsfile.fileSystem.fsFactory;
 
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.fileSystem.FSType;
+import org.apache.iotdb.tsfile.utils.FSUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,6 +31,7 @@ import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -35,21 +40,24 @@ import java.net.URI;
 public class HDFSFactory implements FSFactory {
 
   private static final Logger logger = LoggerFactory.getLogger(HDFSFactory.class);
-  private static Constructor constructorWithPathname;
-  private static Constructor constructorWithParentStringAndChild;
-  private static Constructor constructorWithParentFileAndChild;
-  private static Constructor constructorWithUri;
-  private static Method getBufferedReader;
-  private static Method getBufferedWriter;
-  private static Method getBufferedInputStream;
-  private static Method getBufferedOutputStream;
-  private static Method listFilesBySuffix;
-  private static Method listFilesByPrefix;
-  private static Method renameTo;
-
-  static {
+  private Constructor constructorWithPathname;
+  private Constructor constructorWithParentStringAndChild;
+  private Constructor constructorWithParentFileAndChild;
+  private Constructor constructorWithUri;
+  private Method getBufferedReader;
+  private Method getBufferedWriter;
+  private Method getBufferedInputStream;
+  private Method getBufferedOutputStream;
+  private Method listFilesBySuffix;
+  private Method listFilesByPrefix;
+  private Method renameTo;
+  private Method copyToLocal;
+  private Method copyFromLocal;
+  private Method copyTo;
+
+  public HDFSFactory() {
     try {
-      Class<?> clazz = Class.forName("org.apache.iotdb.hadoop.fileSystem.HDFSFile");
+      Class<?> clazz = Class.forName(TSFileDescriptor.getInstance().getConfig().getHdfsFile());
       constructorWithPathname = clazz.getConstructor(String.class);
       constructorWithParentStringAndChild = clazz.getConstructor(String.class, String.class);
       constructorWithParentFileAndChild = clazz.getConstructor(File.class, String.class);
@@ -61,6 +69,9 @@ public class HDFSFactory implements FSFactory {
       listFilesBySuffix = clazz.getMethod("listFilesBySuffix", String.class, String.class);
       listFilesByPrefix = clazz.getMethod("listFilesByPrefix", String.class, String.class);
       renameTo = clazz.getMethod("renameTo", File.class);
+      copyToLocal = clazz.getMethod("copyToLocal", File.class);
+      copyFromLocal = clazz.getMethod("copyFromLocal", File.class);
+      copyTo = clazz.getMethod("copyTo", File.class);
     } catch (ClassNotFoundException | NoSuchMethodException e) {
       logger.error(
           "Failed to get Hadoop file system. Please check your dependency of Hadoop module.", e);
@@ -187,7 +198,7 @@ public class HDFSFactory implements FSFactory {
   }
 
   @Override
-  public void moveFile(File srcFile, File destFile) {
+  public void moveFile(File srcFile, File destFile) throws IOException {
     try {
       renameTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()), destFile);
     } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
@@ -195,6 +206,30 @@ public class HDFSFactory implements FSFactory {
           "Failed to rename file from {} to {}. Please check your dependency of Hadoop module.",
           srcFile.getName(),
           destFile.getName());
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void copyFile(File srcFile, File destFile) throws IOException {
+    FSType srcType = FSUtils.getFSType(srcFile);
+    FSType destType = FSUtils.getFSType(destFile);
+    try {
+      if (srcType == FSType.HDFS && destType == FSType.HDFS) {
+        copyTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()), destFile);
+      } else if (srcType == FSType.LOCAL) {
+        copyFromLocal.invoke(
+            constructorWithPathname.newInstance(destFile.getAbsolutePath()), srcFile);
+      } else {
+        copyToLocal.invoke(
+            constructorWithPathname.newInstance(srcFile.getAbsolutePath()), destFile);
+      }
+    } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+      logger.error(
+          "Failed to copy file from {} to {}. Please check your dependency of object storage module.",
+          srcFile.getName(),
+          destFile.getName());
+      throw new IOException(e);
     }
   }
 
@@ -234,4 +269,9 @@ public class HDFSFactory implements FSFactory {
   public boolean deleteIfExists(File file) {
     return file.delete();
   }
+
+  @Override
+  public void deleteDirectory(String dir) throws IOException {
+    getFile(dir).delete();
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java
new file mode 100644
index 00000000000..1fe30a3c2b6
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.fileSystem.fsFactory;
+
+import org.apache.iotdb.tsfile.fileSystem.FSPath;
+import org.apache.iotdb.tsfile.fileSystem.FSType;
+import org.apache.iotdb.tsfile.utils.FSUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HybridFSFactory implements FSFactory {
+  private static final Logger logger = LoggerFactory.getLogger(HybridFSFactory.class);
+  private static final Map<FSType, FSFactory> fsFactories = new ConcurrentHashMap<>();
+
+  private FSFactory getFSFactory(FSType fsType) {
+    return fsFactories.compute(
+        fsType,
+        (k, v) -> {
+          if (v != null) {
+            return v;
+          }
+          switch (fsType) {
+            case LOCAL:
+              return new LocalFSFactory();
+            case OBJECT_STORAGE:
+              return new OSFSFactory();
+            case HDFS:
+              return new HDFSFactory();
+            default:
+              return null;
+          }
+        });
+  }
+
+  @Override
+  public File getFileWithParent(String pathname) {
+    FSPath path = FSUtils.parse(pathname);
+    return getFSFactory(path.getFsType()).getFileWithParent(path.getPath());
+  }
+
+  @Override
+  public File getFile(String pathname) {
+    FSPath path = FSUtils.parse(pathname);
+    return getFSFactory(path.getFsType()).getFile(path.getPath());
+  }
+
+  @Override
+  public File getFile(String parent, String child) {
+    FSPath parentPath = FSUtils.parse(parent);
+    return getFSFactory(parentPath.getFsType()).getFile(parentPath.getPath(), child);
+  }
+
+  @Override
+  public File getFile(File parent, String child) {
+    FSType type = FSUtils.getFSType(parent);
+    return getFSFactory(type).getFile(parent, child);
+  }
+
+  @Override
+  public File getFile(URI uri) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public BufferedReader getBufferedReader(String filePath) {
+    FSPath path = FSUtils.parse(filePath);
+    return getFSFactory(path.getFsType()).getBufferedReader(path.getPath());
+  }
+
+  @Override
+  public BufferedWriter getBufferedWriter(String filePath, boolean append) {
+    FSPath path = FSUtils.parse(filePath);
+    return getFSFactory(path.getFsType()).getBufferedWriter(path.getPath(), append);
+  }
+
+  @Override
+  public BufferedInputStream getBufferedInputStream(String filePath) {
+    FSPath path = FSUtils.parse(filePath);
+    return getFSFactory(path.getFsType()).getBufferedInputStream(path.getPath());
+  }
+
+  @Override
+  public BufferedOutputStream getBufferedOutputStream(String filePath) {
+    FSPath path = FSUtils.parse(filePath);
+    return getFSFactory(path.getFsType()).getBufferedOutputStream(path.getPath());
+  }
+
+  @Override
+  public void moveFile(File srcFile, File destFile) throws IOException {
+    FSType srcType = FSUtils.getFSType(srcFile);
+    FSType destType = FSUtils.getFSType(destFile);
+    if (srcType == destType) {
+      getFSFactory(destType).moveFile(srcFile, destFile);
+    } else {
+      throw new IOException(
+          String.format("Doesn't support move file from %s to %s.", srcType, destType));
+    }
+  }
+
+  @Override
+  public void copyFile(File srcFile, File destFile) throws IOException {
+    FSType srcType = FSUtils.getFSType(srcFile);
+    FSType destType = FSUtils.getFSType(destFile);
+    if (srcType == destType || (srcType == FSType.LOCAL && destType == FSType.OBJECT_STORAGE)) {
+      getFSFactory(destType).copyFile(srcFile, destFile);
+    } else if ((srcType == FSType.LOCAL || srcType == FSType.HDFS)
+        && (destType == FSType.LOCAL || destType == FSType.HDFS)) {
+      getFSFactory(FSType.HDFS).copyFile(srcFile, destFile);
+    } else {
+      throw new IOException(
+          String.format("Doesn't support move file from %s to %s.", srcType, destType));
+    }
+  }
+
+  @Override
+  public File[] listFilesBySuffix(String fileFolder, String suffix) {
+    FSPath folder = FSUtils.parse(fileFolder);
+    return getFSFactory(folder.getFsType()).listFilesBySuffix(folder.getPath(), suffix);
+  }
+
+  @Override
+  public File[] listFilesByPrefix(String fileFolder, String prefix) {
+    FSPath folder = FSUtils.parse(fileFolder);
+    return getFSFactory(folder.getFsType()).listFilesByPrefix(folder.getPath(), prefix);
+  }
+
+  @Override
+  public boolean deleteIfExists(File file) throws IOException {
+    FSType type = FSUtils.getFSType(file);
+    return getFSFactory(type).deleteIfExists(file);
+  }
+
+  @Override
+  public void deleteDirectory(String dir) throws IOException {
+    FSType type = FSUtils.getFSType(dir);
+    getFSFactory(type).deleteDirectory(dir);
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java
index 365ded1b9d3..d110aa9b164 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java
@@ -111,16 +111,13 @@ public class LocalFSFactory implements FSFactory {
   }
 
   @Override
-  public void moveFile(File srcFile, File destFile) {
-    try {
-      FileUtils.moveFile(srcFile, destFile);
-    } catch (IOException e) {
-      logger.error(
-          "Failed to move file from {} to {}. ",
-          srcFile.getAbsolutePath(),
-          destFile.getAbsolutePath(),
-          e);
-    }
+  public void moveFile(File srcFile, File destFile) throws IOException {
+    FileUtils.moveFile(srcFile, destFile);
+  }
+
+  @Override
+  public void copyFile(File srcFile, File destFile) throws IOException {
+    FileUtils.copyFile(srcFile, destFile);
   }
 
   @Override
@@ -137,4 +134,9 @@ public class LocalFSFactory implements FSFactory {
   public boolean deleteIfExists(File file) throws IOException {
     return Files.deleteIfExists(file.toPath());
   }
+
+  @Override
+  public void deleteDirectory(String dir) throws IOException {
+    FileUtils.deleteDirectory(new File(dir));
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
similarity index 67%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
copy to tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
index b1197bffa1f..bed220c2b63 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
@@ -16,9 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-
 package org.apache.iotdb.tsfile.fileSystem.fsFactory;
 
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.fileSystem.FSType;
+import org.apache.iotdb.tsfile.utils.FSUtils;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,57 +30,64 @@ import java.io.BufferedOutputStream;
 import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URI;
 
-public class HDFSFactory implements FSFactory {
-
-  private static final Logger logger = LoggerFactory.getLogger(HDFSFactory.class);
-  private static Constructor constructorWithPathname;
-  private static Constructor constructorWithParentStringAndChild;
-  private static Constructor constructorWithParentFileAndChild;
-  private static Constructor constructorWithUri;
-  private static Method getBufferedReader;
-  private static Method getBufferedWriter;
-  private static Method getBufferedInputStream;
-  private static Method getBufferedOutputStream;
-  private static Method listFilesBySuffix;
-  private static Method listFilesByPrefix;
-  private static Method renameTo;
-
-  static {
+public class OSFSFactory implements FSFactory {
+  private static final Logger logger = LoggerFactory.getLogger(OSFSFactory.class);
+
+  private Constructor constructorWithPathname;
+  private Constructor constructorWithParentStringAndChild;
+  private Constructor constructorWithParentFileAndChild;
+  private Constructor constructorWithUri;
+  private Method getBufferedReader;
+  private Method getBufferedWriter;
+  private Method getBufferedInputStream;
+  private Method getBufferedOutputStream;
+  private Method listFilesBySuffix;
+  private Method listFilesByPrefix;
+  private Method renameTo;
+  private Method putFile;
+  private Method copyTo;
+  private Method deleteObjectsByPrefix;
+
+  public OSFSFactory() {
     try {
-      Class<?> clazz = Class.forName("org.apache.iotdb.hadoop.fileSystem.HDFSFile");
+      Class<?> clazz =
+          Class.forName(TSFileDescriptor.getInstance().getConfig().getObjectStorageFile());
       constructorWithPathname = clazz.getConstructor(String.class);
       constructorWithParentStringAndChild = clazz.getConstructor(String.class, String.class);
       constructorWithParentFileAndChild = clazz.getConstructor(File.class, String.class);
       constructorWithUri = clazz.getConstructor(URI.class);
-      getBufferedReader = clazz.getMethod("getBufferedReader", String.class);
-      getBufferedWriter = clazz.getMethod("getBufferedWriter", String.class, boolean.class);
-      getBufferedInputStream = clazz.getMethod("getBufferedInputStream", String.class);
-      getBufferedOutputStream = clazz.getMethod("getBufferedOutputStream", String.class);
+      getBufferedReader = clazz.getMethod("getBufferedReader");
+      getBufferedWriter = clazz.getMethod("getBufferedWriter", boolean.class);
+      getBufferedInputStream = clazz.getMethod("getBufferedInputStream");
+      getBufferedOutputStream = clazz.getMethod("getBufferedOutputStream");
       listFilesBySuffix = clazz.getMethod("listFilesBySuffix", String.class, String.class);
       listFilesByPrefix = clazz.getMethod("listFilesByPrefix", String.class, String.class);
       renameTo = clazz.getMethod("renameTo", File.class);
+      putFile = clazz.getMethod("putFile", File.class);
+      copyTo = clazz.getMethod("copyTo", File.class);
+      deleteObjectsByPrefix = clazz.getMethod("deleteObjectsByPrefix");
     } catch (ClassNotFoundException | NoSuchMethodException e) {
       logger.error(
-          "Failed to get Hadoop file system. Please check your dependency of Hadoop module.", e);
+          "Failed to get object storage. Please check your dependency of object storage module.",
+          e);
     }
   }
 
   @Override
   public File getFileWithParent(String pathname) {
     try {
-      File res = (File) constructorWithPathname.newInstance(pathname);
-      if (!res.exists()) {
-        res.getParentFile().mkdirs();
-      }
-      return res;
+      return (File) constructorWithPathname.newInstance(pathname);
     } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
       logger.error(
-          "Failed to get file: {}. Please check your dependency of Hadoop module.", pathname, e);
+          "Failed to get file: {}. Please check your dependency of object storage module.",
+          pathname,
+          e);
       return null;
     }
   }
@@ -125,7 +135,7 @@ public class HDFSFactory implements FSFactory {
       return (File) constructorWithUri.newInstance(uri);
     } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
       logger.error(
-          "Failed to get file: {}. Please check your dependency of Hadoop module.", uri, e);
+          "Failed to get file: {}. Please check your dependency of object storage module.", uri, e);
       return null;
     }
   }
@@ -134,10 +144,10 @@ public class HDFSFactory implements FSFactory {
   public BufferedReader getBufferedReader(String filePath) {
     try {
       return (BufferedReader)
-          getBufferedReader.invoke(constructorWithPathname.newInstance(filePath), filePath);
+          getBufferedReader.invoke(constructorWithPathname.newInstance(filePath));
     } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
       logger.error(
-          "Failed to get buffered reader for {}. Please check your dependency of Hadoop module.",
+          "Failed to get buffered reader for {}. Please check your dependency of object storage module.",
           filePath,
           e);
       return null;
@@ -148,10 +158,10 @@ public class HDFSFactory implements FSFactory {
   public BufferedWriter getBufferedWriter(String filePath, boolean append) {
     try {
       return (BufferedWriter)
-          getBufferedWriter.invoke(constructorWithPathname.newInstance(filePath), filePath, append);
+          getBufferedWriter.invoke(constructorWithPathname.newInstance(filePath), append);
     } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
       logger.error(
-          "Failed to get buffered writer for {}. Please check your dependency of Hadoop module.",
+          "Failed to get buffered writer for {}. Please check your dependency of object storage module.",
           filePath,
           e);
       return null;
@@ -162,10 +172,10 @@ public class HDFSFactory implements FSFactory {
   public BufferedInputStream getBufferedInputStream(String filePath) {
     try {
       return (BufferedInputStream)
-          getBufferedInputStream.invoke(constructorWithPathname.newInstance(filePath), filePath);
+          getBufferedInputStream.invoke(constructorWithPathname.newInstance(filePath));
     } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
       logger.error(
-          "Failed to get buffered input stream for {}. Please check your dependency of Hadoop module.",
+          "Failed to get buffered input stream for {}. Please check your dependency of object storage module.",
           filePath,
           e);
       return null;
@@ -176,10 +186,10 @@ public class HDFSFactory implements FSFactory {
   public BufferedOutputStream getBufferedOutputStream(String filePath) {
     try {
       return (BufferedOutputStream)
-          getBufferedOutputStream.invoke(constructorWithPathname.newInstance(filePath), filePath);
+          getBufferedOutputStream.invoke(constructorWithPathname.newInstance(filePath));
     } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
       logger.error(
-          "Failed to get buffered output stream for {}. Please check your dependency of Hadoop module.",
+          "Failed to get buffered output stream for {}. Please check your dependency of object storage module.",
           filePath,
           e);
       return null;
@@ -187,14 +197,37 @@ public class HDFSFactory implements FSFactory {
   }
 
   @Override
-  public void moveFile(File srcFile, File destFile) {
+  public void moveFile(File srcFile, File destFile) throws IOException {
     try {
-      renameTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()), destFile);
-    } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+      renameTo.invoke(srcFile, destFile);
+    } catch (InvocationTargetException | IllegalAccessException e) {
       logger.error(
-          "Failed to rename file from {} to {}. Please check your dependency of Hadoop module.",
+          "Failed to rename file from {} to {}. Please check your dependency of object storage module.",
           srcFile.getName(),
           destFile.getName());
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void copyFile(File srcFile, File destFile) throws IOException {
+    FSType srcType = FSUtils.getFSType(srcFile);
+    try {
+      if (srcType == FSType.LOCAL) {
+        putFile.invoke(destFile, srcFile);
+      } else if (srcType == FSType.OBJECT_STORAGE) {
+        copyTo.invoke(srcFile, destFile);
+      } else {
+        throw new IOException(
+            String.format(
+                "Doesn't support copy file from %s to %s.", srcType, FSType.OBJECT_STORAGE));
+      }
+    } catch (InvocationTargetException | IllegalAccessException e) {
+      logger.error(
+          "Failed to copy file from {} to {}. Please check your dependency of object storage module.",
+          srcFile.getName(),
+          destFile.getName());
+      throw new IOException(e);
     }
   }
 
@@ -206,7 +239,7 @@ public class HDFSFactory implements FSFactory {
               constructorWithPathname.newInstance(fileFolder), fileFolder, suffix);
     } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
       logger.error(
-          "Failed to list files in {} with SUFFIX {}. Please check your dependency of Hadoop module.",
+          "Failed to list files in {} with SUFFIX {}. Please check your dependency of object storage module.",
           fileFolder,
           suffix,
           e);
@@ -222,7 +255,7 @@ public class HDFSFactory implements FSFactory {
               constructorWithPathname.newInstance(fileFolder), fileFolder, prefix);
     } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
       logger.error(
-          "Failed to list files in {} with PREFIX {}. Please check your dependency of Hadoop module.",
+          "Failed to list files in {} with PREFIX {}. Please check your dependency of object storage module.",
           fileFolder,
           prefix,
           e);
@@ -234,4 +267,16 @@ public class HDFSFactory implements FSFactory {
   public boolean deleteIfExists(File file) {
     return file.delete();
   }
+
+  @Override
+  public void deleteDirectory(String dir) throws IOException {
+    try {
+      deleteObjectsByPrefix.invoke(constructorWithPathname.newInstance(dir));
+    } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+      logger.error(
+          "Failed to delete directory {}. Please check your dependency of object storage module.",
+          dir,
+          e);
+    }
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
index cba0c326dde..afb1520d011 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/LocalTsFileInput.java
@@ -18,8 +18,6 @@
  */
 package org.apache.iotdb.tsfile.read.reader;
 
-import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -103,21 +101,6 @@ public class LocalTsFileInput implements TsFileInput {
     }
   }
 
-  @Override
-  public int read() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public int read(byte[] b, int off, int len) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public FileChannel wrapAsFileChannel() {
-    return channel;
-  }
-
   @Override
   public InputStream wrapAsInputStream() {
     return Channels.newInputStream(channel);
@@ -133,37 +116,6 @@ public class LocalTsFileInput implements TsFileInput {
     }
   }
 
-  @Override
-  public int readInt() {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public String readVarIntString(long offset) throws IOException {
-    try {
-      ByteBuffer byteBuffer = ByteBuffer.allocate(5);
-      channel.read(byteBuffer, offset);
-      byteBuffer.flip();
-      int strLength = ReadWriteForEncodingUtils.readVarInt(byteBuffer);
-      if (strLength < 0) {
-        return null;
-      } else if (strLength == 0) {
-        return "";
-      }
-      ByteBuffer strBuffer = ByteBuffer.allocate(strLength);
-      int varIntLength = ReadWriteForEncodingUtils.varIntSize(strLength);
-      byte[] bytes = new byte[strLength];
-      channel.read(strBuffer, offset + varIntLength);
-      strBuffer.flip();
-      strBuffer.get(bytes, 0, strLength);
-      return new String(bytes, 0, strLength);
-    } catch (ClosedByInterruptException e) {
-      logger.warn(
-          "Current thread is interrupted by another thread when it is blocked in an I/O operation upon a channel.");
-      return null;
-    }
-  }
-
   @Override
   public String getFilePath() {
     return filePath;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/TsFileInput.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/TsFileInput.java
index 8572f20000e..b2e9fc0c47a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/TsFileInput.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/read/reader/TsFileInput.java
@@ -18,13 +18,14 @@
  */
 package org.apache.iotdb.tsfile.read.reader;
 
+import org.apache.iotdb.tsfile.utils.ReadWriteForEncodingUtils;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.channels.AsynchronousCloseException;
 import java.nio.channels.ClosedByInterruptException;
 import java.nio.channels.ClosedChannelException;
-import java.nio.channels.FileChannel;
 import java.nio.channels.ReadableByteChannel;
 
 public interface TsFileInput {
@@ -97,20 +98,6 @@ public interface TsFileInput {
    */
   int read(ByteBuffer dst, long position) throws IOException;
 
-  /** read a byte from the Input. */
-  int read() throws IOException;
-
-  /**
-   * read an array of byte from the Input.
-   *
-   * @param b -array of byte
-   * @param off -offset of the Input
-   * @param len -length
-   */
-  int read(byte[] b, int off, int len) throws IOException;
-
-  FileChannel wrapAsFileChannel() throws IOException;
-
   InputStream wrapAsInputStream() throws IOException;
 
   /**
@@ -122,11 +109,25 @@ public interface TsFileInput {
    */
   void close() throws IOException;
 
-  /** read 4 bytes from the Input and convert it to a integer. */
-  int readInt() throws IOException;
-
   /** read a string from the Input at the given position */
-  String readVarIntString(long offset) throws IOException;
+  default String readVarIntString(long offset) throws IOException {
+    ByteBuffer byteBuffer = ByteBuffer.allocate(5);
+    read(byteBuffer, offset);
+    byteBuffer.flip();
+    int strLength = ReadWriteForEncodingUtils.readVarInt(byteBuffer);
+    if (strLength < 0) {
+      return null;
+    } else if (strLength == 0) {
+      return "";
+    }
+    ByteBuffer strBuffer = ByteBuffer.allocate(strLength);
+    int varIntLength = ReadWriteForEncodingUtils.varIntSize(strLength);
+    byte[] bytes = new byte[strLength];
+    read(strBuffer, offset + varIntLength);
+    strBuffer.flip();
+    strBuffer.get(bytes, 0, strLength);
+    return new String(bytes, 0, strLength);
+  }
 
   String getFilePath();
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
new file mode 100644
index 00000000000..bd74dccc1d9
--- /dev/null
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.tsfile.utils;
+
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
+import org.apache.iotdb.tsfile.fileSystem.FSPath;
+import org.apache.iotdb.tsfile.fileSystem.FSType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+
+public class FSUtils {
+  private static final Logger logger = LoggerFactory.getLogger(FSUtils.class);
+  private static final TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+  private static final FSType[] fsTypes = {FSType.OBJECT_STORAGE, FSType.HDFS};
+  public static final int PATH_FROM_SEQUENCE_LEVEL = 5;
+  public static final int PATH_FROM_DATABASE_LEVEL = 4;
+  public static final String[] fsPrefix = {"os://", "hdfs://"};
+  public static final String OS_FILE_SEPARATOR = "/";
+  private static final String[] fsFileClassName = {
+    config.getObjectStorageFile(), config.getHdfsFile()
+  };
+
+  private static final boolean[] isSupported = new boolean[fsTypes.length];
+  private static final Class<?>[] fsFileClass = new Class[fsTypes.length];
+
+  private FSUtils() {}
+
+  static {
+    reload();
+  }
+
+  public static synchronized void reload() {
+    fsFileClassName[0] = config.getObjectStorageFile();
+    fsFileClassName[1] = config.getHdfsFile();
+    for (FSType fsType : config.getTSFileStorageFs()) {
+      if (fsType == FSType.OBJECT_STORAGE) {
+        isSupported[0] = true;
+      } else if (fsType == FSType.HDFS) {
+        isSupported[1] = true;
+      }
+    }
+
+    for (int i = 0; i < fsTypes.length; ++i) {
+      if (!isSupported[i]) {
+        continue;
+      }
+      try {
+        fsFileClass[i] = Class.forName(fsFileClassName[i]);
+      } catch (ClassNotFoundException e) {
+        logger.error(
+            "Failed to get "
+                + fsTypes[i].name()
+                + " file system. Please check your dependency of "
+                + fsTypes[i].name()
+                + " module.",
+            e);
+      }
+    }
+  }
+
+  public static FSType getFSType(File file) {
+    for (int i = 0; i < fsTypes.length; ++i) {
+      if (fsFileClass[i] != null && fsFileClass[i].isInstance(file)) {
+        return fsTypes[i];
+      }
+    }
+    return FSType.LOCAL;
+  }
+
+  public static FSType getFSType(String fsPath) {
+    for (int i = 0; i < fsTypes.length; ++i) {
+      if (fsPath.startsWith(fsPrefix[i])) {
+        return fsTypes[i];
+      }
+    }
+    return FSType.LOCAL;
+  }
+
+  public static String getPath(String fsPath) {
+    for (int i = 0; i < fsTypes.length; ++i) {
+      if (fsPath.startsWith(fsPrefix[i])) {
+        return fsPath.substring(fsPrefix[i].length());
+      }
+    }
+    return fsPath;
+  }
+
+  public static FSPath parse(String fsPath) {
+    FSType type = FSType.LOCAL;
+    String path = fsPath;
+    for (int i = 0; i < fsTypes.length; ++i) {
+      if (fsPath.startsWith(fsPrefix[i])) {
+        type = fsTypes[i];
+        break;
+      }
+    }
+    return new FSPath(type, path);
+  }
+
+  public static String getOSDefaultPath(String bucket, int dataNodeId) {
+    return new FSPath(FSType.OBJECT_STORAGE, fsPrefix[0] + bucket + OS_FILE_SEPARATOR + dataNodeId)
+        .getPath();
+  }
+
+  public static FSPath parseLocalTsFile2OSFile(File localFile, String bucket, int dataNodeId)
+      throws IOException {
+    return new FSPath(
+        FSType.OBJECT_STORAGE,
+        fsPrefix[0]
+            + bucket
+            + OS_FILE_SEPARATOR
+            + dataNodeId
+            + OS_FILE_SEPARATOR
+            + getLocalTsFileShortPath(localFile, PATH_FROM_SEQUENCE_LEVEL));
+  }
+
+  public static String getLocalTsFileShortPath(File localTsFile, int level) throws IOException {
+    String[] filePathSplits = FilePathUtils.splitTsFilePath(localTsFile.getCanonicalPath());
+    return String.join(
+        OS_FILE_SEPARATOR,
+        Arrays.copyOfRange(filePathSplits, filePathSplits.length - level, filePathSplits.length));
+  }
+
+  public static boolean isLocal(String fsPath) {
+    return getFSType(fsPath) == FSType.LOCAL;
+  }
+
+  public static boolean isLocal(File file) {
+    return getFSType(file) == FSType.LOCAL;
+  }
+}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FilePathUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FilePathUtils.java
index 3d481be7c97..df7ac44ef8e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FilePathUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FilePathUtils.java
@@ -19,9 +19,7 @@
 
 package org.apache.iotdb.tsfile.utils;
 
-import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
 import org.apache.iotdb.tsfile.common.constant.TsFileConstant;
-import org.apache.iotdb.tsfile.fileSystem.FSType;
 
 import java.io.File;
 
@@ -29,11 +27,7 @@ import static org.apache.iotdb.tsfile.common.constant.TsFileConstant.TSFILE_SUFF
 
 public class FilePathUtils {
 
-  private static final String PATH_SPLIT_STRING =
-      TSFileDescriptor.getInstance().getConfig().getTSFileStorageFs() == FSType.LOCAL
-              && "\\".equals(File.separator)
-          ? "\\\\"
-          : "/";
+  private static final String LOCAL_PATH_SPLIT_STRING = "\\".equals(File.separator) ? "\\\\" : "/";
   public static final String FILE_NAME_SEPARATOR = "-";
 
   private FilePathUtils() {
@@ -64,7 +58,20 @@ public class FilePathUtils {
    * @param tsFileAbsolutePath the tsFile Absolute Path
    */
   public static String[] splitTsFilePath(String tsFileAbsolutePath) {
-    return tsFileAbsolutePath.split(PATH_SPLIT_STRING);
+    String separator = LOCAL_PATH_SPLIT_STRING;
+    if (!FSUtils.isLocal(tsFileAbsolutePath)) {
+      separator = "/";
+    }
+    return tsFileAbsolutePath.split(separator);
+  }
+
+  public static boolean isSequence(String tsFileAbsolutePath) {
+    String[] pathSegments = splitTsFilePath(tsFileAbsolutePath);
+    // If path is not a regular IoTDB TsFile path, then process it as an unsequence file
+    if (pathSegments.length < 5) {
+      return false;
+    }
+    return pathSegments[pathSegments.length - 5].equals("sequence");
   }
 
   public static String getLogicalStorageGroupName(String tsFileAbsolutePath) {