You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by he...@apache.org on 2023/05/22 11:55:53 UTC

[iotdb] branch tiered_storage updated (80bf822d0d7 -> 7549d7c2914)

This is an automated email from the ASF dual-hosted git repository.

heiming pushed a change to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git


    from 80bf822d0d7 add more UT
     new fbe596118ae add migration
     new 7549d7c2914 fix test

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../resources/conf/iotdb-common.properties         |  18 ++
 .../iotdb/commons/concurrent/ThreadName.java       |   2 +
 .../apache/iotdb/commons/service/ServiceType.java  |   4 +-
 .../org/apache/iotdb/os/fileSystem/OSFile.java     |  13 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  55 ++++--
 .../iotdb/db/conf/directories/FolderManager.java   |   4 +-
 .../iotdb/db/conf/directories/TierManager.java     | 127 ++++++++++++-
 .../impl/SizeTieredCompactionSelector.java         |   2 +-
 .../utils/CrossSpaceCompactionCandidate.java       |   5 +-
 .../db/engine/migration/LocalMigrationTask.java    |  70 ++++++++
 .../iotdb/db/engine/migration/MigrationCause.java  |   8 +-
 .../iotdb/db/engine/migration/MigrationTask.java   |  93 ++++++++++
 .../db/engine/migration/MigrationTaskManager.java  | 196 +++++++++++++++++++++
 .../db/engine/migration/RemoteMigrationTask.java   |  44 +++--
 .../iotdb/db/engine/storagegroup/DataRegion.java   |   4 +-
 .../engine/storagegroup/TsFileNameGenerator.java   |   5 +-
 .../db/engine/storagegroup/TsFileResource.java     |  30 +++-
 .../java/org/apache/iotdb/db/service/DataNode.java |   2 +
 .../iotdb/db/engine/storagegroup/TTLTest.java      |   4 +-
 .../tsfile/fileSystem/fsFactory/FSFactory.java     |  10 +-
 .../tsfile/fileSystem/fsFactory/HDFSFactory.java   |   3 +
 .../fileSystem/fsFactory/HybridFSFactory.java      |   5 +
 .../fileSystem/fsFactory/LocalFSFactory.java       |   5 +
 .../tsfile/fileSystem/fsFactory/OSFSFactory.java   |   3 +
 .../org/apache/iotdb/tsfile/utils/FSUtils.java     |   6 +-
 25 files changed, 653 insertions(+), 65 deletions(-)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
 copy consensus/src/main/java/org/apache/iotdb/consensus/iot/service/IoTConsensusRPCServiceMBean.java => server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationCause.java (89%)
 create mode 100644 server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
 create mode 100644 server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
 copy tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/LocalFSOutputFactory.java => server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java (50%)


[iotdb] 01/02: add migration

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fbe596118ae8df1e2e77b79d913eedbc9e0b6273
Author: HeimingZ <zh...@qq.com>
AuthorDate: Mon May 22 19:53:50 2023 +0800

    add migration
---
 .../resources/conf/iotdb-common.properties         |  18 ++
 .../iotdb/commons/concurrent/ThreadName.java       |   2 +
 .../apache/iotdb/commons/service/ServiceType.java  |   4 +-
 .../org/apache/iotdb/os/fileSystem/OSFile.java     |  13 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  55 ++++--
 .../iotdb/db/conf/directories/FolderManager.java   |   4 +-
 .../iotdb/db/conf/directories/TierManager.java     | 127 ++++++++++++-
 .../impl/SizeTieredCompactionSelector.java         |   2 +-
 .../utils/CrossSpaceCompactionCandidate.java       |   5 +-
 .../db/engine/migration/LocalMigrationTask.java    |  70 ++++++++
 .../iotdb/db/engine/migration/MigrationCause.java  |  24 +++
 .../iotdb/db/engine/migration/MigrationTask.java   |  93 ++++++++++
 .../db/engine/migration/MigrationTaskManager.java  | 196 +++++++++++++++++++++
 .../db/engine/migration/RemoteMigrationTask.java   |  60 +++++++
 .../iotdb/db/engine/storagegroup/DataRegion.java   |   4 +-
 .../engine/storagegroup/TsFileNameGenerator.java   |   5 +-
 .../db/engine/storagegroup/TsFileResource.java     |  30 +++-
 .../java/org/apache/iotdb/db/service/DataNode.java |   2 +
 .../tsfile/fileSystem/fsFactory/FSFactory.java     |  10 +-
 .../tsfile/fileSystem/fsFactory/HDFSFactory.java   |   3 +
 .../fileSystem/fsFactory/HybridFSFactory.java      |   5 +
 .../fileSystem/fsFactory/LocalFSFactory.java       |   5 +
 .../tsfile/fileSystem/fsFactory/OSFSFactory.java   |   3 +
 .../org/apache/iotdb/tsfile/utils/FSUtils.java     |   6 +-
 24 files changed, 703 insertions(+), 43 deletions(-)

diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index d0e00cb5d38..a3c906e4cf0 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -174,6 +174,14 @@ cluster_name=defaultCluster
 # Datatype: long
 # heartbeat_interval_in_ms=1000
 
+####################
+### Disk management
+####################
+
+# thread pool size for migrate operation in the DataNode's data directories.
+# Datatype: int
+# migrate_thread_count=3
+
 # Disk remaining threshold at which DataNode is set to ReadOnly status
 # Datatype: double(percentage)
 # disk_space_warning_threshold=0.05
@@ -1125,3 +1133,13 @@ cluster_name=defaultCluster
 
 # Datatype: int
 # influxdb_rpc_port=8086
+
+####################
+### Object storage management
+####################
+
+# object_storage_name=aws_s3
+# object_storage_bucket=iotdb
+# object_storage_endpoiont=yourEndpoint
+# object_storage_access_key=yourAccessKey
+# object_storage_access_secret=yourAccessSecret
\ No newline at end of file
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 5d6fa3b53b4..6a9360b93c6 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -71,6 +71,8 @@ public enum ThreadName {
   PIPE_PROCESSOR_EXECUTOR_POOL("Pipe-Processor-Executor-Pool"),
   PIPE_CONNECTOR_EXECUTOR_POOL("Pipe-Connector-Executor-Pool"),
   PIPE_SUBTASK_CALLBACK_EXECUTOR_POOL("Pipe-SubTask-Callback-Executor-Pool"),
+  MIGRATION_SCHEDULER("Migration-Scheduler"),
+  MIGRATION("Migration-Executor-Pool"),
   ;
 
   private final String name;
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
index 94d524defcf..b7a6e3aceaa 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
@@ -75,7 +75,9 @@ public enum ServiceType {
   IOT_CONSENSUS_SERVICE("IoTConsensus Service", "IoTConsensusRPCService"),
   PIPE_PLUGIN_CLASSLOADER_MANAGER_SERVICE(
       "Pipe Plugin Classloader Manager Service", "PipePluginClassLoader"),
-  MLNode_RPC_SERVICE("Rpc Service for MLNode", "MLNodeRPCService");
+  MLNode_RPC_SERVICE("Rpc Service for MLNode", "MLNodeRPCService"),
+  MIGRATION_SERVICE("Migration Manager", "Migration Manager");
+
   private final String name;
   private final String jmxName;
 
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
index 31de138b9a5..d04eaa397ea 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
@@ -260,12 +260,12 @@ public class OSFile extends File {
 
   @Override
   public boolean mkdir() {
-    throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+    return true;
   }
 
   @Override
   public boolean mkdirs() {
-    throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+    return true;
   }
 
   @Override
@@ -326,17 +326,20 @@ public class OSFile extends File {
 
   @Override
   public long getTotalSpace() {
-    throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+    // object storage has infinity space
+    return Long.MAX_VALUE;
   }
 
   @Override
   public long getFreeSpace() {
-    throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+    // object storage has infinity space
+    return Long.MAX_VALUE;
   }
 
   @Override
   public long getUsableSpace() {
-    throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+    // object storage has infinity space
+    return Long.MAX_VALUE;
   }
 
   @Override
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index fed4db2feb9..1b042e42b25 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -25,6 +25,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.db.audit.AuditLogOperation;
 import org.apache.iotdb.db.audit.AuditLogStorage;
+import org.apache.iotdb.db.conf.directories.TierManager;
 import org.apache.iotdb.db.engine.compaction.execute.performer.constant.CrossCompactionPerformer;
 import org.apache.iotdb.db.engine.compaction.execute.performer.constant.InnerSeqCompactionPerformer;
 import org.apache.iotdb.db.engine.compaction.execute.performer.constant.InnerUnseqCompactionPerformer;
@@ -1105,6 +1106,12 @@ public class IoTDBConfig {
    */
   private String RateLimiterType = "FixedIntervalRateLimiter";
 
+  private String objectStorageName = "aws_s3";
+  private String objectStorageBucket = "iotdb";
+  private String objectStorageEndpoiont = "yourEndpoint";
+  private String objectStorageAccessKey = "yourAccessKey";
+  private String objectStorageAccessSecret = "yourAccessSecret";
+
   IoTDBConfig() {}
 
   public float getUdfMemoryBudgetInMB() {
@@ -1211,17 +1218,18 @@ public class IoTDBConfig {
   private void formulateDataDirs(String[][] tierDataDirs) {
     for (int i = 0; i < tierDataDirs.length; i++) {
       for (int j = 0; j < tierDataDirs[i].length; j++) {
-        switch (FSUtils.getFSType(tierDataDirs[i][j])) {
-          case HDFS:
-            tierDataDirs[i][j] = getHdfsDir() + File.separatorChar + tierDataDirs[i][j];
-            break;
-          case OBJECT_STORAGE:
-            // TODO(zhm) 对象存储路径配置
-            break;
-          case LOCAL:
-          default:
-            tierDataDirs[i][j] = addDataHomeDir(tierDataDirs[i][j]);
-            break;
+        if (tierDataDirs[i][j].equals("object_storage")) {
+          tierDataDirs[i][j] = FSUtils.getOSDefaultPath(objectStorageBucket, dataNodeId);
+        } else {
+          switch (FSUtils.getFSType(tierDataDirs[i][j])) {
+            case HDFS:
+              tierDataDirs[i][j] = getHdfsDir() + File.separatorChar + tierDataDirs[i][j];
+              break;
+            case LOCAL:
+            default:
+              tierDataDirs[i][j] = addDataHomeDir(tierDataDirs[i][j]);
+              break;
+          }
         }
       }
     }
@@ -1230,7 +1238,7 @@ public class IoTDBConfig {
   void reloadDataDirs(String[][] tierDataDirs) throws LoadConfigurationException {
     // format data directories
     formulateDataDirs(tierDataDirs);
-    // make sure old data directories not removed, TODO(zhm) 层级关系是否可以变化,当前实现仅支持在最后添加层级
+    // make sure old data directories not removed
     for (int i = 0; i < this.tierDataDirs.length; ++i) {
       HashSet<String> newDirs = new HashSet<>(Arrays.asList(tierDataDirs[i]));
       for (String oldDir : this.tierDataDirs[i]) {
@@ -1243,7 +1251,7 @@ public class IoTDBConfig {
       }
     }
     this.tierDataDirs = tierDataDirs;
-    //    TierManager.getInstance().updateFileFolders();
+    TierManager.getInstance().resetFolders();
   }
 
   // if IOTDB_DATA_HOME is not set, then we keep dataHomeDir prefix being the same with IOTDB_HOME
@@ -1313,6 +1321,7 @@ public class IoTDBConfig {
   }
 
   public void setTierDataDirs(String[][] tierDataDirs) {
+    formulateDataDirs(tierDataDirs);
     this.tierDataDirs = tierDataDirs;
     // TODO(szywilliam): rewrite the logic here when ratis supports complete snapshot semantic
     setRatisDataRegionSnapshotDir(
@@ -3810,4 +3819,24 @@ public class IoTDBConfig {
   public String getSortTmpDir() {
     return sortTmpDir;
   }
+
+  public String getObjectStorageName() {
+    return objectStorageName;
+  }
+
+  public String getObjectStorageBucket() {
+    return objectStorageBucket;
+  }
+
+  public String getObjectStorageEndpoiont() {
+    return objectStorageEndpoiont;
+  }
+
+  public String getObjectStorageAccessKey() {
+    return objectStorageAccessKey;
+  }
+
+  public String getObjectStorageAccessSecret() {
+    return objectStorageAccessSecret;
+  }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/FolderManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/FolderManager.java
index 571a07a9ad4..9fb73749bc0 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/FolderManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/FolderManager.java
@@ -61,7 +61,7 @@ public class FolderManager {
     try {
       this.selectStrategy.setFolders(folders);
     } catch (DiskSpaceInsufficientException e) {
-      logger.error("All disks of wal folders are full, change system mode to read-only.", e);
+      logger.error("All folders are full, change system mode to read-only.", e);
       CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
       throw e;
     }
@@ -71,7 +71,7 @@ public class FolderManager {
     try {
       return folders.get(selectStrategy.nextFolderIndex());
     } catch (DiskSpaceInsufficientException e) {
-      logger.error("All disks of wal folders are full, change system mode to read-only.", e);
+      logger.error("All folders are full, change system mode to read-only.", e);
       CommonDescriptor.getInstance().getConfig().setNodeStatus(NodeStatus.ReadOnly);
       throw e;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
index 2daeb089ff4..027c3e35971 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
@@ -33,11 +33,18 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
+import java.nio.file.FileStore;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.stream.Collectors;
 
 /** The main class of multiple directories. Used to allocate folders to data files. */
@@ -58,6 +65,8 @@ public class TierManager {
   private final Map<String, Integer> seqDir2TierLevel = new HashMap<>();
   /** unSeq file folder's rawFsPath path -> tier level */
   private final Map<String, Integer> unSeqDir2TierLevel = new HashMap<>();
+  /** total space of each tier, Long.MAX_VALUE when one tier contains remote storage */
+  private long[] tierDiskTotalSpace;
 
   private TierManager() {
     try {
@@ -78,10 +87,26 @@ public class TierManager {
 
   public void resetFolders() {
     String[][] tierDirs = config.getTierDataDirs();
+    for (int i = 0; i < tierDirs.length; ++i) {
+      for (int j = 0; j < tierDirs[i].length; ++j) {
+        if (FSUtils.isLocal(tierDirs[i][j])) {
+          try {
+            tierDirs[i][j] = new File(tierDirs[i][j]).getCanonicalPath();
+          } catch (IOException e) {
+            logger.error("Fail to get canonical path of data dir {}", tierDirs[i][j], e);
+          }
+        }
+      }
+    }
+
     for (int tierLevel = 0; tierLevel < tierDirs.length; ++tierLevel) {
       List<String> seqDirs =
           Arrays.stream(tierDirs[tierLevel])
-              .map(v -> v + File.separator + IoTDBConstant.SEQUENCE_FLODER_NAME)
+              .map(
+                  v ->
+                      FSFactoryProducer.getFSFactory()
+                          .getFile(v, IoTDBConstant.SEQUENCE_FLODER_NAME)
+                          .getPath())
               .collect(Collectors.toList());
       mkDataDirs(seqDirs);
       try {
@@ -95,7 +120,11 @@ public class TierManager {
 
       List<String> unSeqDirs =
           Arrays.stream(tierDirs[tierLevel])
-              .map(v -> v + File.separator + IoTDBConstant.UNSEQUENCE_FLODER_NAME)
+              .map(
+                  v ->
+                      FSFactoryProducer.getFSFactory()
+                          .getFile(v, IoTDBConstant.UNSEQUENCE_FLODER_NAME)
+                          .getPath())
               .collect(Collectors.toList());
       mkDataDirs(unSeqDirs);
       try {
@@ -107,6 +136,8 @@ public class TierManager {
         unSeqDir2TierLevel.put(dir, tierLevel);
       }
     }
+
+    tierDiskTotalSpace = getTierDiskSpace(DiskSpaceType.TOTAL);
   }
 
   private void mkDataDirs(List<String> folders) {
@@ -121,13 +152,11 @@ public class TierManager {
     }
   }
 
-  public String getNextFolderForSequenceFile(int tierLevel) throws DiskSpaceInsufficientException {
-    return seqTiers.get(tierLevel).getNextFolder();
-  }
-
-  public String getNextFolderForUnSequenceFile(int tierLevel)
+  public String getNextFolderForTsFile(int tierLevel, boolean sequence)
       throws DiskSpaceInsufficientException {
-    return unSeqTiers.get(tierLevel).getNextFolder();
+    return sequence
+        ? seqTiers.get(tierLevel).getNextFolder()
+        : unSeqTiers.get(tierLevel).getNextFolder();
   }
 
   public List<String> getAllFilesFolders() {
@@ -162,6 +191,88 @@ public class TierManager {
     return seqTiers.size();
   }
 
+  public int getFileTierLevel(File file) {
+    String filePath;
+    try {
+      filePath = file.getCanonicalPath();
+    } catch (IOException e) {
+      logger.error("Fail to get canonical path of data dir {}", file, e);
+      filePath = file.getPath();
+    }
+
+    for (Map.Entry<String, Integer> entry : seqDir2TierLevel.entrySet()) {
+      if (filePath.startsWith(entry.getKey())) {
+        return entry.getValue();
+      }
+    }
+    for (Map.Entry<String, Integer> entry : unSeqDir2TierLevel.entrySet()) {
+      if (filePath.startsWith(entry.getKey())) {
+        return entry.getValue();
+      }
+    }
+    throw new RuntimeException(String.format("%s is not a legal TsFile path", file));
+  }
+
+  public long[] getTierDiskTotalSpace() {
+    return Arrays.copyOf(tierDiskTotalSpace, tierDiskTotalSpace.length);
+  }
+
+  public long[] getTierDiskUsableSpace() {
+    return getTierDiskSpace(DiskSpaceType.USABLE);
+  }
+
+  private long[] getTierDiskSpace(DiskSpaceType type) {
+    String[][] tierDirs = config.getTierDataDirs();
+    long[] tierDiskSpace = new long[tierDirs.length];
+    for (int tierLevel = 0; tierLevel < tierDirs.length; ++tierLevel) {
+      Set<FileStore> tierFileStores = new HashSet<>();
+      for (String dir : tierDirs[tierLevel]) {
+        if (!FSUtils.isLocal(dir)) {
+          tierDiskSpace[tierLevel] = Long.MAX_VALUE;
+          break;
+        }
+        // get the FileStore of each local dir
+        Path path = Paths.get(dir);
+        FileStore fileStore = null;
+        try {
+          fileStore = Files.getFileStore(path);
+        } catch (IOException e) {
+          // check parent if path is not exists
+          path = path.getParent();
+          try {
+            fileStore = Files.getFileStore(path);
+          } catch (IOException innerException) {
+            logger.error("Failed to get storage path of {}, because", dir, innerException);
+          }
+        }
+        // update space info
+        if (fileStore != null && !tierFileStores.contains(fileStore)) {
+          tierFileStores.add(fileStore);
+          try {
+            switch (type) {
+              case TOTAL:
+                tierDiskSpace[tierLevel] += fileStore.getTotalSpace();
+                break;
+              case USABLE:
+                tierDiskSpace[tierLevel] += fileStore.getUsableSpace();
+                break;
+              default:
+                break;
+            }
+          } catch (IOException e) {
+            logger.error("Failed to statistic the size of {}, because", fileStore, e);
+          }
+        }
+      }
+    }
+    return tierDiskSpace;
+  }
+
+  private enum DiskSpaceType {
+    TOTAL,
+    USABLE,
+  }
+
   public static TierManager getInstance() {
     return TierManagerHolder.INSTANCE;
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
index df88248bf2e..bb0a2c8ee44 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/impl/SizeTieredCompactionSelector.java
@@ -114,7 +114,7 @@ public class SizeTieredCompactionSelector
         selectedFileSize = 0L;
         continue;
       }
-      if (currentFile.getStatus() != TsFileResourceStatus.CLOSED) {
+      if (currentFile.getStatus() != TsFileResourceStatus.CLOSED || currentFile.isMigrating()) {
         selectedFileList.clear();
         selectedFileSize = 0L;
         continue;
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
index bedf2e2fbcc..329c778f120 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/compaction/selector/utils/CrossSpaceCompactionCandidate.java
@@ -142,7 +142,9 @@ public class CrossSpaceCompactionCandidate {
   private List<TsFileResourceCandidate> filterUnseqResource(List<TsFileResource> unseqResources) {
     List<TsFileResourceCandidate> ret = new ArrayList<>();
     for (TsFileResource resource : unseqResources) {
-      if (resource.getStatus() != TsFileResourceStatus.CLOSED || !resource.getTsFile().exists()) {
+      if (resource.getStatus() != TsFileResourceStatus.CLOSED
+          || resource.isMigrating()
+          || !resource.getTsFile().exists()) {
         break;
       } else if (resource.stillLives(ttlLowerBound)) {
         ret.add(new TsFileResourceCandidate(resource));
@@ -199,6 +201,7 @@ public class CrossSpaceCompactionCandidate {
       // the status of file may be changed after the task is submitted to queue
       this.isValidCandidate =
           tsFileResource.getStatus() == TsFileResourceStatus.CLOSED
+              && !tsFileResource.isMigrating()
               && tsFileResource.getTsFile().exists();
     }
 
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
new file mode 100644
index 00000000000..347452e26ed
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.migration;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class LocalMigrationTask extends MigrationTask {
+  private static final Logger logger = LoggerFactory.getLogger(LocalMigrationTask.class);
+
+  LocalMigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) {
+    super(cause, tsFile, targetDir);
+  }
+
+  @Override
+  public void migrate() {
+    // copy TsFile and resource file
+    tsFile.readLock();
+    try {
+      fsFactory.copyFile(srcTsFile, destTsFile);
+      fsFactory.copyFile(srcResourceFile, destResourceFile);
+    } catch (IOException e) {
+      logger.error("Fail to copy TsFile {}", srcTsFile);
+      destTsFile.delete();
+      destResourceFile.delete();
+      return;
+    } finally {
+      tsFile.readUnlock();
+    }
+    // close mods file and replace TsFile path
+    tsFile.writeLock();
+    try {
+      tsFile.resetModFile();
+      fsFactory.copyFile(srcModsFile, destModsFile);
+      tsFile.setFile(destTsFile);
+    } catch (IOException e) {
+      logger.error("Fail to copy mods file {}", srcModsFile);
+      destTsFile.delete();
+      destResourceFile.delete();
+      destModsFile.delete();
+      return;
+    } finally {
+      tsFile.writeUnlock();
+    }
+    // clear src files
+    srcTsFile.delete();
+    srcResourceFile.delete();
+    srcModsFile.delete();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationCause.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationCause.java
new file mode 100644
index 00000000000..dc1d270ba61
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationCause.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.migration;
+
+public enum MigrationCause {
+  TTL,
+  DISK_SPACE
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
new file mode 100644
index 00000000000..0571d9d47f7
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.migration;
+
+import org.apache.iotdb.db.engine.modification.ModificationFile;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
+import org.apache.iotdb.tsfile.fileSystem.FSFactoryProducer;
+import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
+import org.apache.iotdb.tsfile.utils.FSUtils;
+
+import java.io.File;
+
+public abstract class MigrationTask implements Runnable {
+  protected static final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+
+  protected final MigrationCause cause;
+  protected final TsFileResource tsFile;
+  protected final String targetDir;
+
+  protected final File srcTsFile;
+  protected final File destTsFile;
+  protected final File srcResourceFile;
+  protected final File destResourceFile;
+  protected final File srcModsFile;
+  protected final File destModsFile;
+
+  MigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) {
+    this.cause = cause;
+    this.tsFile = tsFile;
+    this.targetDir = targetDir;
+    this.srcTsFile = tsFile.getTsFile();
+    this.destTsFile = fsFactory.getFile(targetDir, tsFile.getTsFile().getName());
+    this.srcResourceFile =
+        fsFactory.getFile(
+            srcTsFile.getParentFile(), srcTsFile.getName() + TsFileResource.RESOURCE_SUFFIX);
+    this.destResourceFile =
+        fsFactory.getFile(targetDir, tsFile.getTsFile().getName() + TsFileResource.RESOURCE_SUFFIX);
+    this.srcModsFile =
+        fsFactory.getFile(
+            srcTsFile.getParentFile(), srcTsFile.getName() + ModificationFile.FILE_SUFFIX);
+    this.destModsFile =
+        fsFactory.getFile(targetDir, tsFile.getTsFile().getName() + ModificationFile.FILE_SUFFIX);
+  }
+
+  public static MigrationTask newTask(
+      MigrationCause cause, TsFileResource sourceTsFile, String targetDir) {
+    if (FSUtils.isLocal(targetDir)) {
+      return new LocalMigrationTask(cause, sourceTsFile, targetDir);
+    } else {
+      return new RemoteMigrationTask(cause, sourceTsFile, targetDir);
+    }
+  }
+
+  @Override
+  public void run() {
+    if (canMigrate()) {
+      tsFile.setIsMigrating(true);
+      if (!canMigrate()) {
+        tsFile.setIsMigrating(false);
+        return;
+      }
+    } else {
+      return;
+    }
+
+    migrate();
+
+    tsFile.setIsMigrating(false);
+  }
+
+  protected boolean canMigrate() {
+    return tsFile.getStatus() == TsFileResourceStatus.CLOSED;
+  }
+
+  public abstract void migrate();
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
new file mode 100644
index 00000000000..37957aa8b0a
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.migration;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
+import org.apache.iotdb.commons.conf.CommonConfig;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.service.IService;
+import org.apache.iotdb.commons.service.ServiceType;
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.TierManager;
+import org.apache.iotdb.db.engine.StorageEngine;
+import org.apache.iotdb.db.engine.storagegroup.DataRegion;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+import org.apache.iotdb.db.engine.storagegroup.TsFileResourceStatus;
+import org.apache.iotdb.db.utils.DateTimeUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class MigrationTaskManager implements IService {
+  private static final Logger logger = LoggerFactory.getLogger(MigrationTaskManager.class);
+  private static final IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
+  private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
+  private static final TierManager tierManager = TierManager.getInstance();
+  private static final long CHECK_INTERVAL_IN_SECONDS = 10 * 60;
+  private static final double TIER_DISK_SPACE_WARN_THRESHOLD =
+      commonConfig.getDiskSpaceWarningThreshold() + 0.1;
+  private static final double TIER_DISK_SPACE_SAFE_THRESHOLD =
+      commonConfig.getDiskSpaceWarningThreshold() + 0.2;
+  /** single thread to schedule */
+  private ScheduledExecutorService scheduler;
+  /** single thread to sync syncingBuffer to disk */
+  private ExecutorService workers;
+
+  @Override
+  public void start() throws StartupException {
+    if (iotdbConfig.getTierDataDirs().length == 1) {
+      return;
+    }
+    scheduler =
+        IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+            ThreadName.MIGRATION_SCHEDULER.getName());
+    workers =
+        IoTDBThreadPoolFactory.newFixedThreadPool(
+            iotdbConfig.getCompactionThreadCount(), ThreadName.MIGRATION.getName());
+    ScheduledExecutorUtil.safelyScheduleAtFixedRate(
+        scheduler,
+        () -> new MigrationScheduleTask().run(),
+        CHECK_INTERVAL_IN_SECONDS,
+        CHECK_INTERVAL_IN_SECONDS,
+        TimeUnit.SECONDS);
+  }
+
+  private class MigrationScheduleTask implements Runnable {
+    private final long[] tierDiskTotalSpace = tierManager.getTierDiskTotalSpace();
+    private final long[] tierDiskUsableSpace = tierManager.getTierDiskUsableSpace();
+    private final Set<Integer> needMigrationTiers = new HashSet<>();
+
+    public MigrationScheduleTask() {
+      for (int i = 0; i < tierManager.getTiersNum(); i++) {
+        double usage = tierDiskUsableSpace[i] * 1.0 / tierDiskTotalSpace[i];
+        if (usage <= TIER_DISK_SPACE_WARN_THRESHOLD) {
+          needMigrationTiers.add(i);
+        }
+      }
+    }
+
+    @Override
+    public void run() {
+      schedule();
+    }
+
+    private void schedule() {
+      // submit migration tasks
+      for (DataRegion dataRegion : StorageEngine.getInstance().getAllDataRegions()) {
+        List<TsFileResource> tsfiles = dataRegion.getSequenceFileList();
+        tsfiles.addAll(dataRegion.getUnSequenceFileList());
+        tsfiles.sort(this::compareMigrationPriority);
+        for (TsFileResource tsfile : tsfiles) {
+          try {
+            int tierLevel = tsfile.getTierLevel();
+            // only migrate closed TsFiles not in the last tier
+            if (tsfile.getStatus() != TsFileResourceStatus.CLOSED
+                || tierLevel == iotdbConfig.getTierDataDirs().length - 1) {
+              continue;
+            }
+            // check tier ttl and disk space
+            long tierTTL =
+                DateTimeUtils.convertMilliTimeWithPrecision(
+                    commonConfig.getTierTTLInMs()[tierLevel], iotdbConfig.getTimestampPrecision());
+            if (tsfile.stillLives(tierTTL)) {
+              submitMigrationTask(
+                  tierLevel,
+                  MigrationCause.TTL,
+                  tsfile,
+                  tierManager.getNextFolderForTsFile(tierLevel, tsfile.isSeq()));
+            } else if (needMigrationTiers.contains(tierLevel)) {
+              submitMigrationTask(
+                  tierLevel,
+                  MigrationCause.DISK_SPACE,
+                  tsfile,
+                  tierManager.getNextFolderForTsFile(tierLevel, tsfile.isSeq()));
+            }
+          } catch (Exception e) {
+            logger.error(
+                "An error occurred when checking migration of TsFileResource {}", tsfile, e);
+          }
+        }
+      }
+    }
+
+    private void submitMigrationTask(
+        int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, String targetDir) {
+      MigrationTask task = MigrationTask.newTask(cause, sourceTsFile, targetDir);
+      workers.submit(task);
+      tierDiskUsableSpace[tierLevel] -= sourceTsFile.getTsFileSize();
+      if (needMigrationTiers.contains(tierLevel)) {
+        double usage = tierDiskUsableSpace[tierLevel] * 1.0 / tierDiskTotalSpace[tierLevel];
+        if (usage > TIER_DISK_SPACE_SAFE_THRESHOLD) {
+          needMigrationTiers.remove(tierLevel);
+        }
+      }
+    }
+
+    private int compareMigrationPriority(TsFileResource f1, TsFileResource f2) {
+      // old time partitions first
+      int res = Long.compare(f1.getTimePartition(), f2.getTimePartition());
+      // sequence files in one partition
+      if (res == 0) {
+        if (f1.isSeq() && !f2.isSeq()) {
+          res = -1;
+        } else if (!f1.isSeq() && f2.isSeq()) {
+          res = 1;
+        }
+      }
+      // old version files in one partition
+      if (res == 0) {
+        res = Long.compare(f1.getVersion(), f2.getVersion());
+      }
+      return res;
+    }
+  }
+
+  @Override
+  public void stop() {
+    if (scheduler != null) {
+      scheduler.shutdownNow();
+    }
+    if (workers != null) {
+      workers.shutdownNow();
+    }
+  }
+
+  @Override
+  public ServiceType getID() {
+    return ServiceType.MIGRATION_SERVICE;
+  }
+
+  public static MigrationTaskManager getInstance() {
+    return InstanceHolder.INSTANCE;
+  }
+
+  private static class InstanceHolder {
+    private InstanceHolder() {}
+
+    private static final MigrationTaskManager INSTANCE = new MigrationTaskManager();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
new file mode 100644
index 00000000000..18136da0c46
--- /dev/null
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.engine.migration;
+
+import org.apache.iotdb.db.engine.storagegroup.TsFileResource;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class RemoteMigrationTask extends MigrationTask {
+  private static final Logger logger = LoggerFactory.getLogger(RemoteMigrationTask.class);
+
+  RemoteMigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) {
+    super(cause, tsFile, targetDir);
+  }
+
+  @Override
+  public void migrate() {
+    // copy TsFile and resource file
+    tsFile.readLock();
+    try {
+      fsFactory.copyFile(srcTsFile, destTsFile);
+      fsFactory.copyFile(srcResourceFile, destResourceFile);
+    } catch (IOException e) {
+      logger.error("Fail to copy TsFile {}", srcTsFile);
+      destTsFile.delete();
+      destResourceFile.delete();
+      return;
+    } finally {
+      tsFile.readUnlock();
+    }
+    // replace TsFile path
+    tsFile.writeLock();
+    try {
+      tsFile.setFile(destTsFile);
+    } finally {
+      tsFile.writeUnlock();
+    }
+    // clear src files
+    srcTsFile.delete();
+  }
+}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index b72150a93d5..850fda3ba78 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -2676,7 +2676,7 @@ public class DataRegion implements IDataRegionForQuery {
       case LOAD_UNSEQUENCE:
         targetFile =
             fsFactory.getFile(
-                TierManager.getInstance().getNextFolderForUnSequenceFile(0),
+                TierManager.getInstance().getNextFolderForTsFile(0, false),
                 databaseName
                     + File.separatorChar
                     + dataRegionId
@@ -2698,7 +2698,7 @@ public class DataRegion implements IDataRegionForQuery {
       case LOAD_SEQUENCE:
         targetFile =
             fsFactory.getFile(
-                TierManager.getInstance().getNextFolderForSequenceFile(0),
+                TierManager.getInstance().getNextFolderForTsFile(0, true),
                 databaseName
                     + File.separatorChar
                     + dataRegionId
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
index c69f9bfe023..b4e936a6cfc 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileNameGenerator.java
@@ -79,10 +79,7 @@ public class TsFileNameGenerator {
       long timePartitionId)
       throws DiskSpaceInsufficientException {
     TierManager tierManager = TierManager.getInstance();
-    String baseDir =
-        sequence
-            ? tierManager.getNextFolderForSequenceFile(0)
-            : tierManager.getNextFolderForUnSequenceFile(0);
+    String baseDir = tierManager.getNextFolderForTsFile(0, sequence);
     return baseDir
         + File.separator
         + logicalStorageGroup
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index 5ab10f08b59..9f2bca44cfb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -22,6 +22,7 @@ import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.conf.directories.TierManager;
 import org.apache.iotdb.db.engine.modification.ModificationFile;
 import org.apache.iotdb.db.engine.querycontext.ReadOnlyMemChunk;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion.SettleTsFileCallBack;
@@ -130,6 +131,10 @@ public class TsFileResource {
 
   private long ramSize;
 
+  private volatile int tierLevel = 0;
+
+  private volatile boolean isMigrating = false;
+
   private volatile long tsFileSize = -1L;
 
   private TsFileProcessor processor;
@@ -170,6 +175,7 @@ public class TsFileResource {
     this.minPlanIndex = other.minPlanIndex;
     this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
     this.tsFileSize = other.tsFileSize;
+    this.tierLevel = other.tierLevel;
   }
 
   /** for sealed TsFile, call setClosed to close TsFileResource */
@@ -177,6 +183,7 @@ public class TsFileResource {
     this.file = file;
     this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
     this.timeIndex = CONFIG.getTimeIndexLevel().getTimeIndex();
+    this.tierLevel = TierManager.getInstance().getFileTierLevel(file);
   }
 
   /** Used for compaction to create target files. */
@@ -191,6 +198,7 @@ public class TsFileResource {
     this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
     this.timeIndex = CONFIG.getTimeIndexLevel().getTimeIndex();
     this.processor = processor;
+    this.tierLevel = TierManager.getInstance().getFileTierLevel(file);
   }
 
   /** unsealed TsFile, for query */
@@ -206,6 +214,7 @@ public class TsFileResource {
     this.pathToChunkMetadataListMap.put(path, chunkMetadataList);
     this.originTsFileResource = originTsFileResource;
     this.version = originTsFileResource.version;
+    this.tierLevel = originTsFileResource.tierLevel;
   }
 
   /** unsealed TsFile, for query */
@@ -221,6 +230,7 @@ public class TsFileResource {
     generatePathToTimeSeriesMetadataMap();
     this.originTsFileResource = originTsFileResource;
     this.version = originTsFileResource.version;
+    this.tierLevel = originTsFileResource.tierLevel;
   }
 
   @TestOnly
@@ -355,9 +365,10 @@ public class TsFileResource {
     return compactionModFile;
   }
 
-  public void resetModFile() {
+  public void resetModFile() throws IOException {
     if (modFile != null) {
       synchronized (this) {
+        modFile.close();
         modFile = null;
       }
     }
@@ -375,6 +386,10 @@ public class TsFileResource {
     return file.getPath();
   }
 
+  public int getTierLevel() {
+    return tierLevel;
+  }
+
   public long getTsFileSize() {
     if (isClosed()) {
       if (tsFileSize == -1) {
@@ -419,8 +434,7 @@ public class TsFileResource {
   public DeviceTimeIndex buildDeviceTimeIndex() throws IOException {
     readLock();
     try (InputStream inputStream =
-        FSFactoryProducer.getFSFactory()
-            .getBufferedInputStream(file.getPath() + TsFileResource.RESOURCE_SUFFIX)) {
+        FSFactoryProducer.getFSFactory().getBufferedInputStream(file.getPath() + RESOURCE_SUFFIX)) {
       ReadWriteIOUtils.readByte(inputStream);
       ITimeIndex timeIndexFromResourceFile = ITimeIndex.createTimeIndex(inputStream);
       if (!(timeIndexFromResourceFile instanceof DeviceTimeIndex)) {
@@ -429,7 +443,7 @@ public class TsFileResource {
       return (DeviceTimeIndex) timeIndexFromResourceFile;
     } catch (Exception e) {
       throw new IOException(
-          "Can't read file " + file.getPath() + TsFileResource.RESOURCE_SUFFIX + " from disk", e);
+          "Can't read file " + file.getPath() + RESOURCE_SUFFIX + " from disk", e);
     } finally {
       readUnlock();
     }
@@ -618,6 +632,14 @@ public class TsFileResource {
     return this.status == TsFileResourceStatus.COMPACTION_CANDIDATE;
   }
 
+  public boolean isMigrating() {
+    return isMigrating;
+  }
+
+  public void setIsMigrating(boolean isMigrating) {
+    this.isMigrating = isMigrating;
+  }
+
   public void setStatus(TsFileResourceStatus status) {
     switch (status) {
       case CLOSED:
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 3e0f39ac4ff..1e95ffe09f9 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -66,6 +66,7 @@ import org.apache.iotdb.db.engine.StorageEngine;
 import org.apache.iotdb.db.engine.cache.CacheHitRatioMonitor;
 import org.apache.iotdb.db.engine.compaction.schedule.CompactionTaskManager;
 import org.apache.iotdb.db.engine.flush.FlushManager;
+import org.apache.iotdb.db.engine.migration.MigrationTaskManager;
 import org.apache.iotdb.db.metadata.schemaregion.SchemaEngine;
 import org.apache.iotdb.db.metadata.template.ClusterTemplateManager;
 import org.apache.iotdb.db.mpp.execution.exchange.MPPDataExchangeService;
@@ -543,6 +544,7 @@ public class DataNode implements DataNodeMBean {
     registerManager.register(RegionMigrateService.getInstance());
 
     registerManager.register(CompactionTaskManager.getInstance());
+    registerManager.register(MigrationTaskManager.getInstance());
   }
 
   /** set up RPC and protocols after DataNode is available */
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
index 8029bcca1d5..386cc210baa 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
@@ -105,13 +105,21 @@ public interface FSFactory {
   BufferedOutputStream getBufferedOutputStream(String filePath);
 
   /**
-   * move file
+   * TODO(zhm) move file
    *
    * @param srcFile src file
    * @param destFile dest file
    */
   void moveFile(File srcFile, File destFile);
 
+  /**
+   * TODO(zhm) copy file
+   *
+   * @param srcFile src file
+   * @param destFile dest file
+   */
+  void copyFile(File srcFile, File destFile) throws IOException;
+
   /**
    * list file by suffix
    *
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
index ddf971b29cd..328c4f9e39e 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HDFSFactory.java
@@ -199,6 +199,9 @@ public class HDFSFactory implements FSFactory {
     }
   }
 
+  @Override
+  public void copyFile(File srcFile, File destFile) throws IOException {}
+
   @Override
   public File[] listFilesBySuffix(String fileFolder, String suffix) {
     try {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java
index b2d58149753..325b0ba5e52 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/HybridFSFactory.java
@@ -103,6 +103,11 @@ public class HybridFSFactory implements FSFactory {
     // TODO
   }
 
+  @Override
+  public void copyFile(File srcFile, File destFile) throws IOException {
+    // TODO
+  }
+
   @Override
   public File[] listFilesBySuffix(String fileFolder, String suffix) {
     FSPath folder = FSUtils.parse(fileFolder);
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java
index 53c1dcb31a6..b6d46989e88 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/LocalFSFactory.java
@@ -123,6 +123,11 @@ public class LocalFSFactory implements FSFactory {
     }
   }
 
+  @Override
+  public void copyFile(File srcFile, File destFile) throws IOException {
+    FileUtils.copyFile(srcFile, destFile);
+  }
+
   @Override
   public File[] listFilesBySuffix(String fileFolder, String suffix) {
     return new File(fileFolder).listFiles(file -> file.getName().endsWith(suffix));
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
index eb2b4c837ec..1f58fc6ffd6 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
@@ -198,6 +198,9 @@ public class OSFSFactory implements FSFactory {
     }
   }
 
+  @Override
+  public void copyFile(File srcFile, File destFile) throws IOException {}
+
   @Override
   public File[] listFilesBySuffix(String fileFolder, String suffix) {
     try {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
index 0c9f80a591e..6ab36b56634 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
@@ -95,7 +95,11 @@ public class FSUtils {
     return new FSPath(type, path);
   }
 
-  public static FSPath parseLocalTsFile2OSFile(File lcoalFile, String bucket, String dataNodeId)
+  public static String getOSDefaultPath(String bucket, int dataNodeId) {
+    return new FSPath(FSType.OBJECT_STORAGE, fsPrefix[0] + "/" + dataNodeId).getPath();
+  }
+
+  public static FSPath parseLocalTsFile2OSFile(File lcoalFile, String bucket, int dataNodeId)
       throws IOException {
     String canonicalPath = lcoalFile.getCanonicalPath();
     int startIdx = canonicalPath.lastIndexOf("unsequence");


[iotdb] 02/02: fix test

Posted by he...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 7549d7c2914b5ce93158f2aa5fd6327ef0975fd0
Author: HeimingZ <zh...@qq.com>
AuthorDate: Mon May 22 19:54:15 2023 +0800

    fix test
---
 .../test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
index 46a95bf9035..c2392bff9ec 100644
--- a/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
+++ b/server/src/test/java/org/apache/iotdb/db/engine/storagegroup/TTLTest.java
@@ -249,8 +249,8 @@ public class TTLTest {
     dataRegion.syncCloseAllWorkingTsFileProcessors();
 
     // files before ttl
-    File seqDir = new File(TierManager.getInstance().getNextFolderForSequenceFile(0), sg1);
-    File unseqDir = new File(TierManager.getInstance().getNextFolderForUnSequenceFile(0), sg1);
+    File seqDir = new File(TierManager.getInstance().getNextFolderForTsFile(0, true), sg1);
+    File unseqDir = new File(TierManager.getInstance().getNextFolderForTsFile(0, false), sg1);
 
     List<File> seqFiles = new ArrayList<>();
     for (File directory : seqDir.listFiles()) {