You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/05/24 02:59:32 UTC

[iotdb] branch tiered_storage updated: refine code

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/tiered_storage by this push:
     new 599d8f09b43 refine code
599d8f09b43 is described below

commit 599d8f09b43d9550e909c1d2a8f12d7fc531c205
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed May 24 10:59:18 2023 +0800

    refine code
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  2 +-
 .../iotdb/db/conf/directories/TierManager.java     |  1 +
 .../db/engine/migration/LocalMigrationTask.java    | 21 +++++++-------
 .../iotdb/db/engine/migration/MigrationTask.java   | 33 +++++++++++++---------
 .../db/engine/migration/MigrationTaskManager.java  | 14 +++++----
 .../db/engine/migration/RemoteMigrationTask.java   | 16 +++++------
 .../db/engine/storagegroup/TsFileProcessor.java    |  3 +-
 .../db/engine/storagegroup/TsFileResource.java     |  4 ++-
 .../iotdb/tsfile/common/conf/TSFileConfig.java     |  3 +-
 .../org/apache/iotdb/tsfile/utils/FSUtils.java     | 21 +++++++-------
 10 files changed, 66 insertions(+), 52 deletions(-)

diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c59bd9c558e..40a8c3387db 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1140,7 +1140,7 @@ public class IoTDBConfig {
   private int migrateThreadCount = 1;
 
   /** Enable hdfs or not */
-  private boolean enableObjectStorage = false;
+  private boolean enableObjectStorage = true;
 
   /** Config for object storage */
   private ObjectStorageConfig osConfig = ObjectStorageDescriptor.getInstance().getConfig();
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
index 7c3335910b8..734d7a5decf 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
@@ -206,6 +206,7 @@ public class TierManager {
   }
 
   public int getFileTierLevel(File file) {
+    // If the file does not exist on Local disk, it is assumed be on remote Object Storage
     if (!file.exists()) {
       return getTiersNum() - 1;
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
index 7746007fb0c..ca4e097036c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
@@ -28,33 +28,34 @@ import java.io.IOException;
 public class LocalMigrationTask extends MigrationTask {
   private static final Logger logger = LoggerFactory.getLogger(LocalMigrationTask.class);
 
-  LocalMigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) {
+  protected LocalMigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) throws IOException {
     super(cause, tsFile, targetDir);
   }
 
   @Override
   public void migrate() {
     // copy TsFile and resource file
-    tsFile.readLock();
+    tsFileResource.readLock();
     try {
-      fsFactory.copyFile(srcTsFile, destTsFile);
+      fsFactory.copyFile(srcFile, destTsFile);
       fsFactory.copyFile(srcResourceFile, destResourceFile);
     } catch (IOException e) {
-      logger.error("Fail to copy TsFile {}", srcTsFile);
+      logger.error("Fail to copy TsFile {}", srcFile);
       destTsFile.delete();
       destResourceFile.delete();
       return;
     } finally {
-      tsFile.readUnlock();
+      tsFileResource.readUnlock();
     }
     // close mods file and replace TsFile path
-    tsFile.writeLock();
+    tsFileResource.writeLock();
     try {
-      tsFile.resetModFile();
+      tsFileResource.resetModFile();
+      // migrate MOD file only when it exists
       if (srcModsFile.exists()) {
         fsFactory.copyFile(srcModsFile, destModsFile);
       }
-      tsFile.setFile(destTsFile);
+      tsFileResource.setFile(destTsFile);
     } catch (IOException e) {
       logger.error("Fail to copy mods file {}", srcModsFile);
       destTsFile.delete();
@@ -62,10 +63,10 @@ public class LocalMigrationTask extends MigrationTask {
       destModsFile.delete();
       return;
     } finally {
-      tsFile.writeUnlock();
+      tsFileResource.writeUnlock();
     }
     // clear src files
-    srcTsFile.delete();
+    srcFile.delete();
     srcResourceFile.delete();
     srcModsFile.delete();
   }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
index 8bdfc751256..32e1676d8e6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
@@ -26,41 +26,46 @@ import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
 import org.apache.iotdb.tsfile.utils.FSUtils;
 
 import java.io.File;
+import java.io.IOException;
 
 public abstract class MigrationTask implements Runnable {
   protected static final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
 
   protected final MigrationCause cause;
-  protected final TsFileResource tsFile;
+  protected final TsFileResource tsFileResource;
   protected final String targetDir;
 
-  protected final File srcTsFile;
+  protected final File srcFile;
   protected final File destTsFile;
   protected final File srcResourceFile;
   protected final File destResourceFile;
   protected final File srcModsFile;
   protected final File destModsFile;
 
-  MigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) {
+  protected MigrationTask(MigrationCause cause, TsFileResource tsFileResource, String targetDir) throws IOException {
     this.cause = cause;
-    this.tsFile = tsFile;
+    this.tsFileResource = tsFileResource;
     this.targetDir = targetDir;
-    this.srcTsFile = tsFile.getTsFile();
-    this.destTsFile = fsFactory.getFile(targetDir, tsFile.getTsFile().getName());
+    this.srcFile = tsFileResource.getTsFile();
+    this.destTsFile = fsFactory.getFile(targetDir, getDestTsFilePath(srcFile));
     this.srcResourceFile =
         fsFactory.getFile(
-            srcTsFile.getParentFile(), srcTsFile.getName() + TsFileResource.RESOURCE_SUFFIX);
+            srcFile.getParentFile(), srcFile.getName() + TsFileResource.RESOURCE_SUFFIX);
     this.destResourceFile =
-        fsFactory.getFile(targetDir, tsFile.getTsFile().getName() + TsFileResource.RESOURCE_SUFFIX);
+        fsFactory.getFile(targetDir, getDestTsFilePath(srcFile) + TsFileResource.RESOURCE_SUFFIX);
     this.srcModsFile =
         fsFactory.getFile(
-            srcTsFile.getParentFile(), srcTsFile.getName() + ModificationFile.FILE_SUFFIX);
+            srcFile.getParentFile(), srcFile.getName() + ModificationFile.FILE_SUFFIX);
     this.destModsFile =
-        fsFactory.getFile(targetDir, tsFile.getTsFile().getName() + ModificationFile.FILE_SUFFIX);
+        fsFactory.getFile(targetDir, getDestTsFilePath(srcFile) + ModificationFile.FILE_SUFFIX);
+  }
+
+  private String getDestTsFilePath(File src) throws IOException {
+    return FSUtils.getLocalTsFileShortPath(src, FSUtils.PATH_FROM_DATABASE_LEVEL);
   }
 
   public static MigrationTask newTask(
-      MigrationCause cause, TsFileResource sourceTsFile, String targetDir) {
+      MigrationCause cause, TsFileResource sourceTsFile, String targetDir) throws IOException {
     if (FSUtils.isLocal(targetDir)) {
       return new LocalMigrationTask(cause, sourceTsFile, targetDir);
     } else {
@@ -71,12 +76,12 @@ public abstract class MigrationTask implements Runnable {
   @Override
   public void run() {
     migrate();
-    tsFile.increaseTierLevel();
-    tsFile.setIsMigrating(false);
+    tsFileResource.increaseTierLevel();
+    tsFileResource.setIsMigrating(false);
   }
 
   protected boolean canMigrate() {
-    return tsFile.getStatus() == TsFileResourceStatus.NORMAL;
+    return tsFileResource.getStatus() == TsFileResourceStatus.NORMAL;
   }
 
   public abstract void migrate();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
index 0ebaa49d7f2..764a9987044 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.utils.DateTimeUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -133,15 +134,14 @@ public class MigrationTaskManager implements IService {
                   tierManager.getNextFolderForTsFile(nextTier, tsfile.isSeq()));
             }
           } catch (Exception e) {
-            logger.error(
-                "An error occurred when checking migration of TsFileResource {}", tsfile, e);
+            logger.error("An error occurred when check and try to migrate TsFileResource {}", tsfile, e);
           }
         }
       }
     }
 
     private void submitMigrationTask(
-        int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, String targetDir) {
+        int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, String targetDir) throws IOException {
       if (!checkAndMarkMigrate(sourceTsFile)) {
         return;
       }
@@ -159,7 +159,7 @@ public class MigrationTaskManager implements IService {
     private boolean checkAndMarkMigrate(TsFileResource tsFile) {
       if (canMigrate(tsFile)) {
         tsFile.setIsMigrating(true);
-        if (!canMigrate(tsFile)) {
+        if (occupiedByCompaction(tsFile)) {
           tsFile.setIsMigrating(false);
           return false;
         }
@@ -169,7 +169,11 @@ public class MigrationTaskManager implements IService {
     }
 
     private boolean canMigrate(TsFileResource tsFile) {
-      return tsFile.getStatus() == TsFileResourceStatus.NORMAL;
+      return tsFile.getStatus() == TsFileResourceStatus.NORMAL && !tsFile.isMigrating();
+    }
+
+    private boolean occupiedByCompaction(TsFileResource tsFile) {
+      return tsFile.getStatus() != TsFileResourceStatus.NORMAL;
     }
 
     private int compareMigrationPriority(TsFileResource f1, TsFileResource f2) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
index 65aeb4d9181..ffd0015ffee 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
@@ -28,31 +28,31 @@ import java.io.IOException;
 public class RemoteMigrationTask extends MigrationTask {
   private static final Logger logger = LoggerFactory.getLogger(RemoteMigrationTask.class);
 
-  RemoteMigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) {
+  protected RemoteMigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) throws IOException {
     super(cause, tsFile, targetDir);
   }
 
   @Override
   public void migrate() {
     // copy TsFile and resource file
-    tsFile.readLock();
+    tsFileResource.readLock();
     try {
-      fsFactory.copyFile(srcTsFile, destTsFile);
+      fsFactory.copyFile(srcFile, destTsFile);
       fsFactory.copyFile(srcResourceFile, destResourceFile);
     } catch (IOException e) {
-      logger.error("Fail to copy TsFile {}", srcTsFile);
+      logger.error("Fail to copy TsFile {}", srcFile);
       destTsFile.delete();
       destResourceFile.delete();
       return;
     } finally {
-      tsFile.readUnlock();
+      tsFileResource.readUnlock();
     }
     // clear src files
-    tsFile.writeLock();
+    tsFileResource.writeLock();
     try {
-      srcTsFile.delete();
+      srcFile.delete();
     } finally {
-      tsFile.writeUnlock();
+      tsFileResource.writeUnlock();
     }
   }
 }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index f4f33c10473..93b0f0db214 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -185,11 +185,12 @@ public class TsFileProcessor {
       boolean sequence)
       throws IOException {
     this.storageGroupName = storageGroupName;
+    // this.sequence should be assigned at first because `this` will be passed as parameter to other val later
+    this.sequence = sequence;
     this.tsFileResource = new TsFileResource(tsfile, this);
     this.dataRegionInfo = dataRegionInfo;
     this.writer = new RestorableTsFileIOWriter(tsfile);
     this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
-    this.sequence = sequence;
     this.walNode =
         WALManager.getInstance()
             .applyForWALNode(WALManager.getApplicantUniqueId(storageGroupName, sequence));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index b7536902ccc..3902869defb 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -185,6 +185,7 @@ public class TsFileResource {
     this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
     this.timeIndex = CONFIG.getTimeIndexLevel().getTimeIndex();
     this.isSeq = FilePathUtils.isSequence(this.file.getAbsolutePath());
+    // This method is invoked when DataNode recovers, so the tierLevel should be calculated when restarting
     this.tierLevel = TierManager.getInstance().getFileTierLevel(file);
   }
 
@@ -201,7 +202,8 @@ public class TsFileResource {
     this.timeIndex = CONFIG.getTimeIndexLevel().getTimeIndex();
     this.processor = processor;
     this.isSeq = processor.isSequence();
-    this.tierLevel = TierManager.getInstance().getFileTierLevel(file);
+    // this method is invoked when a new TsFile is created and a newly created TsFile's the tierLevel is 0 by default
+    this.tierLevel = 0;
   }
 
   /** unsealed TsFile, for query */
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
index 8712ba0aba5..9d7813784a9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -120,7 +120,8 @@ public class TSFileConfig implements Serializable {
   /** Default endian value is BIG_ENDIAN. */
   private String endian = "BIG_ENDIAN";
   /** Default storage is in local file system */
-  private FSType[] TSFileStorageFs = new FSType[] {FSType.LOCAL};
+  // TODO: (haiming) fix the bug that the config is not loaded
+  private FSType[] TSFileStorageFs = new FSType[] {FSType.LOCAL, FSType.OBJECT_STORAGE};
   /** Default core-site.xml file path is /etc/hadoop/conf/core-site.xml */
   private String coreSitePath = "/etc/hadoop/conf/core-site.xml";
   /** Default hdfs-site.xml file path is /etc/hadoop/conf/hdfs-site.xml */
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
index 5291985b678..0efcb65156c 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
@@ -32,6 +32,8 @@ import java.util.Arrays;
 public class FSUtils {
   private static final Logger logger = LoggerFactory.getLogger(FSUtils.class);
   private static final FSType[] fsTypes = {FSType.OBJECT_STORAGE, FSType.HDFS};
+  public static final int PATH_FROM_SEQUENCE_LEVEL = 5;
+  public static final int PATH_FROM_DATABASE_LEVEL = 4;
   public static final String[] fsPrefix = {"os://", "hdfs://"};
   public static final String OS_FILE_SEPARATOR = "/";
   private static final String[] fsFileClassName = {
@@ -119,7 +121,6 @@ public class FSUtils {
 
   public static FSPath parseLocalTsFile2OSFile(File localFile, String bucket, int dataNodeId)
       throws IOException {
-    String[] filePathSplits = FilePathUtils.splitTsFilePath(localFile.getCanonicalPath());
     return new FSPath(
         FSType.OBJECT_STORAGE,
         fsPrefix[0]
@@ -127,18 +128,16 @@ public class FSUtils {
             + OS_FILE_SEPARATOR
             + dataNodeId
             + OS_FILE_SEPARATOR
-            + String.join(
-                OS_FILE_SEPARATOR,
-                Arrays.copyOfRange(
-                    filePathSplits, filePathSplits.length - 5, filePathSplits.length)));
+            + getLocalTsFileShortPath(localFile, PATH_FROM_SEQUENCE_LEVEL));
   }
 
-  //  public static FSPath parseLocalTsFile2OSFile(File lcoalFile, String bucket, int dataNodeId)
-  //      throws IOException {
-  //    String fileName = lcoalFile.getName();
-  //    return new FSPath(FSType.OBJECT_STORAGE,
-  // "os://bucket/Users/jinruizhangjinrui/Documents/work/iotdb/data/datanode/s3/" + fileName);
-  //  }
+  public static String getLocalTsFileShortPath(File localTsFile, int level)  throws IOException{
+    String[] filePathSplits = FilePathUtils.splitTsFilePath(localTsFile.getCanonicalPath());
+    return String.join(
+        OS_FILE_SEPARATOR,
+        Arrays.copyOfRange(
+            filePathSplits, filePathSplits.length - level, filePathSplits.length));
+  }
 
   public static boolean isLocal(String fsPath) {
     return getFSType(fsPath) == FSType.LOCAL;