You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by he...@apache.org on 2023/05/24 09:16:11 UTC

[iotdb] branch tiered_storage updated: delete duplicate TsFiles when recovering

This is an automated email from the ASF dual-hosted git repository.

heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/tiered_storage by this push:
     new 5d835648c18 delete duplicate TsFiles when recovering
5d835648c18 is described below

commit 5d835648c180b68bf6487a5fe5721b4bd4d89a8d
Author: HeimingZ <zh...@qq.com>
AuthorDate: Wed May 24 17:15:54 2023 +0800

    delete duplicate TsFiles when recovering
---
 .../resources/conf/iotdb-common.properties         |  2 +-
 .../db/engine/migration/LocalMigrationTask.java    |  9 +--
 .../db/engine/migration/RemoteMigrationTask.java   | 14 +++-
 .../iotdb/db/engine/storagegroup/DataRegion.java   | 78 +++++++++++++++-------
 .../apache/iotdb/db/utils/EnvironmentUtils.java    |  8 +--
 5 files changed, 77 insertions(+), 34 deletions(-)

diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index f688c00509f..b6b5e246742 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -1180,7 +1180,7 @@ cluster_name=defaultCluster
 # enable_object_storage=false
 
 # Datatype: string
-# remote_tsfile_cache_dirs=data/datanode/remote/cache
+# remote_tsfile_cache_dirs=data/datanode/data/cache
 
 # Datatype: long
 # remote_tsfile_cache_max_disk_usage=53687091200
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
index 889466733a5..93fcd99f82c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
@@ -38,10 +38,11 @@ public class LocalMigrationTask extends MigrationTask {
     // copy TsFile and resource file
     tsFileResource.readLock();
     try {
+      destTsFile.getParentFile().mkdirs();
       fsFactory.copyFile(srcFile, destTsFile);
       fsFactory.copyFile(srcResourceFile, destResourceFile);
-    } catch (IOException e) {
-      logger.error("Fail to copy TsFile {}", srcFile);
+    } catch (Exception e) {
+      logger.error("Fail to copy TsFile from local {} to local {}", srcFile, srcResourceFile);
       destTsFile.delete();
       destResourceFile.delete();
       return;
@@ -57,8 +58,8 @@ public class LocalMigrationTask extends MigrationTask {
         fsFactory.copyFile(srcModsFile, destModsFile);
       }
       tsFileResource.setFile(destTsFile);
-    } catch (IOException e) {
-      logger.error("Fail to copy mods file {}", srcModsFile);
+    } catch (Exception e) {
+      logger.error("Fail to copy mods file from local {} to local {}", srcModsFile, destModsFile);
       destTsFile.delete();
       destResourceFile.delete();
       destModsFile.delete();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
index 428778ecea8..fa4bbe51cd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
@@ -35,13 +35,21 @@ public class RemoteMigrationTask extends MigrationTask {
 
   @Override
   public void migrate() {
+    // tsfile may exist on the remote if the last same migration task hasn't completed when the
+    // system shutdown.
+    if (destTsFile.exists()) {
+      destTsFile.delete();
+    }
+    if (destResourceFile.exists()) {
+      destResourceFile.delete();
+    }
     // copy TsFile and resource file
     tsFileResource.readLock();
     try {
       fsFactory.copyFile(srcFile, destTsFile);
       fsFactory.copyFile(srcResourceFile, destResourceFile);
-    } catch (IOException e) {
-      logger.error("Fail to copy TsFile {}", srcFile);
+    } catch (Exception e) {
+      logger.error("Fail to copy TsFile from local {} to remote {}", srcFile, srcResourceFile);
       destTsFile.delete();
       destResourceFile.delete();
       return;
@@ -52,6 +60,8 @@ public class RemoteMigrationTask extends MigrationTask {
     tsFileResource.writeLock();
     try {
       srcFile.delete();
+    } catch (Exception e) {
+      logger.error("Fail to delete local TsFile {}", srcFile);
     } finally {
       tsFileResource.writeUnlock();
     }
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 1cde01d65c5..f358c88d927 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -325,7 +325,7 @@ public class DataRegion implements IDataRegionForQuery {
           "Skip recovering data region {}[{}] when consensus protocol is ratis and storage engine is not ready.",
           databaseName,
           dataRegionId);
-      for (String fileFolder : TierManager.getInstance().getAllFilesFolders()) {
+      for (String fileFolder : TierManager.getInstance().getAllLocalFilesFolders()) {
         File dataRegionFolder =
             fsFactory.getFile(fileFolder, databaseName + File.separator + dataRegionId);
         if (dataRegionFolder.exists()) {
@@ -442,12 +442,12 @@ public class DataRegion implements IDataRegionForQuery {
     try {
       // collect candidate TsFiles from sequential and unsequential data directory
       Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair =
-          getAllFiles(TierManager.getInstance().getAllSequenceFileFolders());
+          getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
       List<TsFileResource> tmpSeqTsFiles = seqTsFilesPair.left;
       List<TsFileResource> oldSeqTsFiles = seqTsFilesPair.right;
       upgradeSeqFileList.addAll(oldSeqTsFiles);
       Pair<List<TsFileResource>, List<TsFileResource>> unseqTsFilesPair =
-          getAllFiles(TierManager.getInstance().getAllUnSequenceFileFolders());
+          getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
       List<TsFileResource> tmpUnseqTsFiles = unseqTsFilesPair.left;
       List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right;
       upgradeUnseqFileList.addAll(oldUnseqTsFiles);
@@ -683,8 +683,7 @@ public class DataRegion implements IDataRegionForQuery {
   private Pair<List<TsFileResource>, List<TsFileResource>> getAllFiles(List<String> folders)
       throws IOException, DataRegionException {
     // represents local TsFile and remote TsFile on Object Storage
-    Set<String> tsFilePathSet = new HashSet<>();
-    List<File> tsFiles = new ArrayList<>();
+    Map<String, File> tsFileName2File = new HashMap<>();
     List<File> upgradeFiles = new ArrayList<>();
     for (String baseDir : folders) {
       File fileFolder = fsFactory.getFile(baseDir + File.separator + databaseName, dataRegionId);
@@ -713,16 +712,30 @@ public class DataRegion implements IDataRegionForQuery {
             File[] resourceFilesInThisFolder =
                 fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), RESOURCE_SUFFIX);
             for (File f : tsFilesInThisFolder) {
-              tsFilePathSet.add(f.getCanonicalPath());
+              String tsFileFileName = f.getName();
+              if (tsFileName2File.containsKey(tsFileFileName)) {
+                // check migration: two same name tsfile exists, only keep one of them
+                File actualFile = deleteDuplicateTsFiles(f, tsFileName2File.get(tsFileFileName));
+                tsFileName2File.put(tsFileFileName, actualFile);
+              } else {
+                tsFileName2File.put(tsFileFileName, f);
+              }
             }
 
-            Collections.addAll(tsFiles, tsFilesInThisFolder);
             for (File f : resourceFilesInThisFolder) {
-              String tsFilePath =
+              String tsFileFileName =
                   f.getCanonicalPath()
                       .substring(0, f.getCanonicalPath().length() - RESOURCE_SUFFIX.length());
-              if (!tsFilePathSet.contains(tsFilePath)) {
-                tsFiles.add(fsFactory.getFile(tsFilePath));
+              if (tsFileName2File.containsKey(tsFileFileName)) {
+                // check migration: tsfile already added, but this resource file doesn't correspond
+                // to the file, so delete it
+                if (!f.getCanonicalPath()
+                    .startsWith(tsFileName2File.get(tsFileFileName).getCanonicalPath())) {
+                  f.delete();
+                }
+              } else {
+                tsFileName2File.put(
+                    tsFileFileName, fsFactory.getFile(partitionFolder, tsFileFileName));
               }
             }
 
@@ -736,19 +749,16 @@ public class DataRegion implements IDataRegionForQuery {
       }
     }
 
-    tsFiles.sort(this::compareFileName);
-    if (!tsFiles.isEmpty()) {
-      checkTsFileTime(tsFiles.get(tsFiles.size() - 1));
-    }
+    long currentTime = System.currentTimeMillis();
     List<TsFileResource> ret = new ArrayList<>();
-    tsFiles.forEach(f -> ret.add(new TsFileResource(f)));
-
-    upgradeFiles.sort(this::compareFileName);
-    if (!upgradeFiles.isEmpty()) {
-      checkTsFileTime(upgradeFiles.get(upgradeFiles.size() - 1));
+    for (File f : tsFileName2File.values()) {
+      checkTsFileTime(f, currentTime);
+      ret.add(new TsFileResource(f));
     }
+
     List<TsFileResource> upgradeRet = new ArrayList<>();
     for (File f : upgradeFiles) {
+      checkTsFileTime(f, currentTime);
       TsFileResource fileResource = new TsFileResource(f);
       fileResource.setStatus(TsFileResourceStatus.NORMAL);
       // make sure the flush command is called before IoTDB is down.
@@ -772,11 +782,33 @@ public class DataRegion implements IDataRegionForQuery {
     }
   }
 
+  /** Remove the duplicate TsFile and return the actual TsFile (has .tsfile and .tsfile.resource) */
+  private File deleteDuplicateTsFiles(File f1, File f2) {
+    File f1Resource = fsFactory.getFile(f1 + RESOURCE_SUFFIX);
+    File f2Resource = fsFactory.getFile(f2 + RESOURCE_SUFFIX);
+    if (f1.exists() && f1Resource.exists()) {
+      if (f2.exists()) {
+        f2.delete();
+      }
+      if (f2Resource.exists()) {
+        f2Resource.delete();
+      }
+      return f1;
+    } else {
+      if (f1.exists()) {
+        f1.delete();
+      }
+      if (f1Resource.exists()) {
+        f1Resource.delete();
+      }
+      return f2;
+    }
+  }
+
   /** check if the tsfile's time is smaller than system current time. */
-  private void checkTsFileTime(File tsFile) throws DataRegionException {
+  private void checkTsFileTime(File tsFile, long currentTime) throws DataRegionException {
     String[] items = tsFile.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
     long fileTime = Long.parseLong(items[0]);
-    long currentTime = System.currentTimeMillis();
     if (fileTime > currentTime) {
       throw new DataRegionException(
           String.format(
@@ -1570,7 +1602,7 @@ public class DataRegion implements IDataRegionForQuery {
               TsFileMetricManager.getInstance().decreaseModFileSize(x.getModFile().getSize());
             }
           });
-      deleteAllSGFolders(TierManager.getInstance().getAllFilesFolders());
+      deleteAllSGFolders(TierManager.getInstance().getAllLocalFilesFolders());
 
       this.workSequenceTsFileProcessors.clear();
       this.workUnsequenceTsFileProcessors.clear();
@@ -3090,7 +3122,7 @@ public class DataRegion implements IDataRegionForQuery {
   public long countRegionDiskSize() {
     AtomicLong diskSize = new AtomicLong(0);
     TierManager.getInstance()
-        .getAllFilesFolders()
+        .getAllLocalFilesFolders()
         .forEach(
             folder -> {
               folder = folder + File.separator + databaseName + File.separator + dataRegionId;
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index b661a3f8d06..7807f272fd8 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -231,11 +231,11 @@ public class EnvironmentUtils {
 
   public static void cleanAllDir() throws IOException {
     // delete sequential files
-    for (String path : tierManager.getAllSequenceFileFolders()) {
+    for (String path : tierManager.getAllLocalSequenceFileFolders()) {
       cleanDir(path);
     }
     // delete unsequence files
-    for (String path : tierManager.getAllUnSequenceFileFolders()) {
+    for (String path : tierManager.getAllLocalUnSequenceFileFolders()) {
       cleanDir(path);
     }
     // delete system info
@@ -319,11 +319,11 @@ public class EnvironmentUtils {
 
   private static void createAllDir() {
     // create sequential files
-    for (String path : tierManager.getAllSequenceFileFolders()) {
+    for (String path : tierManager.getAllLocalSequenceFileFolders()) {
       createDir(path);
     }
     // create unsequential files
-    for (String path : tierManager.getAllUnSequenceFileFolders()) {
+    for (String path : tierManager.getAllLocalUnSequenceFileFolders()) {
       createDir(path);
     }
     // create database