You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by he...@apache.org on 2023/05/24 09:16:11 UTC
[iotdb] branch tiered_storage updated: delete duplicate TsFiles when recovering
This is an automated email from the ASF dual-hosted git repository.
heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tiered_storage by this push:
new 5d835648c18 delete duplicate TsFiles when recovering
5d835648c18 is described below
commit 5d835648c180b68bf6487a5fe5721b4bd4d89a8d
Author: HeimingZ <zh...@qq.com>
AuthorDate: Wed May 24 17:15:54 2023 +0800
delete duplicate TsFiles when recovering
---
.../resources/conf/iotdb-common.properties | 2 +-
.../db/engine/migration/LocalMigrationTask.java | 9 +--
.../db/engine/migration/RemoteMigrationTask.java | 14 +++-
.../iotdb/db/engine/storagegroup/DataRegion.java | 78 +++++++++++++++-------
.../apache/iotdb/db/utils/EnvironmentUtils.java | 8 +--
5 files changed, 77 insertions(+), 34 deletions(-)
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index f688c00509f..b6b5e246742 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -1180,7 +1180,7 @@ cluster_name=defaultCluster
# enable_object_storage=false
# Datatype: string
-# remote_tsfile_cache_dirs=data/datanode/remote/cache
+# remote_tsfile_cache_dirs=data/datanode/data/cache
# Datatype: long
# remote_tsfile_cache_max_disk_usage=53687091200
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
index 889466733a5..93fcd99f82c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
@@ -38,10 +38,11 @@ public class LocalMigrationTask extends MigrationTask {
// copy TsFile and resource file
tsFileResource.readLock();
try {
+ destTsFile.getParentFile().mkdirs();
fsFactory.copyFile(srcFile, destTsFile);
fsFactory.copyFile(srcResourceFile, destResourceFile);
- } catch (IOException e) {
- logger.error("Fail to copy TsFile {}", srcFile);
+ } catch (Exception e) {
+ logger.error("Fail to copy TsFile from local {} to local {}", srcFile, srcResourceFile);
destTsFile.delete();
destResourceFile.delete();
return;
@@ -57,8 +58,8 @@ public class LocalMigrationTask extends MigrationTask {
fsFactory.copyFile(srcModsFile, destModsFile);
}
tsFileResource.setFile(destTsFile);
- } catch (IOException e) {
- logger.error("Fail to copy mods file {}", srcModsFile);
+ } catch (Exception e) {
+ logger.error("Fail to copy mods file from local {} to local {}", srcModsFile, destModsFile);
destTsFile.delete();
destResourceFile.delete();
destModsFile.delete();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
index 428778ecea8..fa4bbe51cd6 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
@@ -35,13 +35,21 @@ public class RemoteMigrationTask extends MigrationTask {
@Override
public void migrate() {
+ // tsfile may exist on the remote if the last same migration task hasn't completed when the
+ // system shutdown.
+ if (destTsFile.exists()) {
+ destTsFile.delete();
+ }
+ if (destResourceFile.exists()) {
+ destResourceFile.delete();
+ }
// copy TsFile and resource file
tsFileResource.readLock();
try {
fsFactory.copyFile(srcFile, destTsFile);
fsFactory.copyFile(srcResourceFile, destResourceFile);
- } catch (IOException e) {
- logger.error("Fail to copy TsFile {}", srcFile);
+ } catch (Exception e) {
+ logger.error("Fail to copy TsFile from local {} to remote {}", srcFile, srcResourceFile);
destTsFile.delete();
destResourceFile.delete();
return;
@@ -52,6 +60,8 @@ public class RemoteMigrationTask extends MigrationTask {
tsFileResource.writeLock();
try {
srcFile.delete();
+ } catch (Exception e) {
+ logger.error("Fail to delete local TsFile {}", srcFile);
} finally {
tsFileResource.writeUnlock();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
index 1cde01d65c5..f358c88d927 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/DataRegion.java
@@ -325,7 +325,7 @@ public class DataRegion implements IDataRegionForQuery {
"Skip recovering data region {}[{}] when consensus protocol is ratis and storage engine is not ready.",
databaseName,
dataRegionId);
- for (String fileFolder : TierManager.getInstance().getAllFilesFolders()) {
+ for (String fileFolder : TierManager.getInstance().getAllLocalFilesFolders()) {
File dataRegionFolder =
fsFactory.getFile(fileFolder, databaseName + File.separator + dataRegionId);
if (dataRegionFolder.exists()) {
@@ -442,12 +442,12 @@ public class DataRegion implements IDataRegionForQuery {
try {
// collect candidate TsFiles from sequential and unsequential data directory
Pair<List<TsFileResource>, List<TsFileResource>> seqTsFilesPair =
- getAllFiles(TierManager.getInstance().getAllSequenceFileFolders());
+ getAllFiles(TierManager.getInstance().getAllLocalSequenceFileFolders());
List<TsFileResource> tmpSeqTsFiles = seqTsFilesPair.left;
List<TsFileResource> oldSeqTsFiles = seqTsFilesPair.right;
upgradeSeqFileList.addAll(oldSeqTsFiles);
Pair<List<TsFileResource>, List<TsFileResource>> unseqTsFilesPair =
- getAllFiles(TierManager.getInstance().getAllUnSequenceFileFolders());
+ getAllFiles(TierManager.getInstance().getAllLocalUnSequenceFileFolders());
List<TsFileResource> tmpUnseqTsFiles = unseqTsFilesPair.left;
List<TsFileResource> oldUnseqTsFiles = unseqTsFilesPair.right;
upgradeUnseqFileList.addAll(oldUnseqTsFiles);
@@ -683,8 +683,7 @@ public class DataRegion implements IDataRegionForQuery {
private Pair<List<TsFileResource>, List<TsFileResource>> getAllFiles(List<String> folders)
throws IOException, DataRegionException {
// represents local TsFile and remote TsFile on Object Storage
- Set<String> tsFilePathSet = new HashSet<>();
- List<File> tsFiles = new ArrayList<>();
+ Map<String, File> tsFileName2File = new HashMap<>();
List<File> upgradeFiles = new ArrayList<>();
for (String baseDir : folders) {
File fileFolder = fsFactory.getFile(baseDir + File.separator + databaseName, dataRegionId);
@@ -713,16 +712,30 @@ public class DataRegion implements IDataRegionForQuery {
File[] resourceFilesInThisFolder =
fsFactory.listFilesBySuffix(partitionFolder.getAbsolutePath(), RESOURCE_SUFFIX);
for (File f : tsFilesInThisFolder) {
- tsFilePathSet.add(f.getCanonicalPath());
+ String tsFileFileName = f.getName();
+ if (tsFileName2File.containsKey(tsFileFileName)) {
+ // check migration: two same name tsfile exists, only keep one of them
+ File actualFile = deleteDuplicateTsFiles(f, tsFileName2File.get(tsFileFileName));
+ tsFileName2File.put(tsFileFileName, actualFile);
+ } else {
+ tsFileName2File.put(tsFileFileName, f);
+ }
}
- Collections.addAll(tsFiles, tsFilesInThisFolder);
for (File f : resourceFilesInThisFolder) {
- String tsFilePath =
+ String tsFileFileName =
f.getCanonicalPath()
.substring(0, f.getCanonicalPath().length() - RESOURCE_SUFFIX.length());
- if (!tsFilePathSet.contains(tsFilePath)) {
- tsFiles.add(fsFactory.getFile(tsFilePath));
+ if (tsFileName2File.containsKey(tsFileFileName)) {
+ // check migration: tsfile already added, but this resource file doesn't correspond
+ // to the file, so delete it
+ if (!f.getCanonicalPath()
+ .startsWith(tsFileName2File.get(tsFileFileName).getCanonicalPath())) {
+ f.delete();
+ }
+ } else {
+ tsFileName2File.put(
+ tsFileFileName, fsFactory.getFile(partitionFolder, tsFileFileName));
}
}
@@ -736,19 +749,16 @@ public class DataRegion implements IDataRegionForQuery {
}
}
- tsFiles.sort(this::compareFileName);
- if (!tsFiles.isEmpty()) {
- checkTsFileTime(tsFiles.get(tsFiles.size() - 1));
- }
+ long currentTime = System.currentTimeMillis();
List<TsFileResource> ret = new ArrayList<>();
- tsFiles.forEach(f -> ret.add(new TsFileResource(f)));
-
- upgradeFiles.sort(this::compareFileName);
- if (!upgradeFiles.isEmpty()) {
- checkTsFileTime(upgradeFiles.get(upgradeFiles.size() - 1));
+ for (File f : tsFileName2File.values()) {
+ checkTsFileTime(f, currentTime);
+ ret.add(new TsFileResource(f));
}
+
List<TsFileResource> upgradeRet = new ArrayList<>();
for (File f : upgradeFiles) {
+ checkTsFileTime(f, currentTime);
TsFileResource fileResource = new TsFileResource(f);
fileResource.setStatus(TsFileResourceStatus.NORMAL);
// make sure the flush command is called before IoTDB is down.
@@ -772,11 +782,33 @@ public class DataRegion implements IDataRegionForQuery {
}
}
+ /** Remove the duplicate TsFile and return the actual TsFile (has .tsfile and .tsfile.resource) */
+ private File deleteDuplicateTsFiles(File f1, File f2) {
+ File f1Resource = fsFactory.getFile(f1 + RESOURCE_SUFFIX);
+ File f2Resource = fsFactory.getFile(f2 + RESOURCE_SUFFIX);
+ if (f1.exists() && f1Resource.exists()) {
+ if (f2.exists()) {
+ f2.delete();
+ }
+ if (f2Resource.exists()) {
+ f2Resource.delete();
+ }
+ return f1;
+ } else {
+ if (f1.exists()) {
+ f1.delete();
+ }
+ if (f1Resource.exists()) {
+ f1Resource.delete();
+ }
+ return f2;
+ }
+ }
+
/** check if the tsfile's time is smaller than system current time. */
- private void checkTsFileTime(File tsFile) throws DataRegionException {
+ private void checkTsFileTime(File tsFile, long currentTime) throws DataRegionException {
String[] items = tsFile.getName().replace(TSFILE_SUFFIX, "").split(FILE_NAME_SEPARATOR);
long fileTime = Long.parseLong(items[0]);
- long currentTime = System.currentTimeMillis();
if (fileTime > currentTime) {
throw new DataRegionException(
String.format(
@@ -1570,7 +1602,7 @@ public class DataRegion implements IDataRegionForQuery {
TsFileMetricManager.getInstance().decreaseModFileSize(x.getModFile().getSize());
}
});
- deleteAllSGFolders(TierManager.getInstance().getAllFilesFolders());
+ deleteAllSGFolders(TierManager.getInstance().getAllLocalFilesFolders());
this.workSequenceTsFileProcessors.clear();
this.workUnsequenceTsFileProcessors.clear();
@@ -3090,7 +3122,7 @@ public class DataRegion implements IDataRegionForQuery {
public long countRegionDiskSize() {
AtomicLong diskSize = new AtomicLong(0);
TierManager.getInstance()
- .getAllFilesFolders()
+ .getAllLocalFilesFolders()
.forEach(
folder -> {
folder = folder + File.separator + databaseName + File.separator + dataRegionId;
diff --git a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
index b661a3f8d06..7807f272fd8 100644
--- a/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
+++ b/server/src/test/java/org/apache/iotdb/db/utils/EnvironmentUtils.java
@@ -231,11 +231,11 @@ public class EnvironmentUtils {
public static void cleanAllDir() throws IOException {
// delete sequential files
- for (String path : tierManager.getAllSequenceFileFolders()) {
+ for (String path : tierManager.getAllLocalSequenceFileFolders()) {
cleanDir(path);
}
// delete unsequence files
- for (String path : tierManager.getAllUnSequenceFileFolders()) {
+ for (String path : tierManager.getAllLocalUnSequenceFileFolders()) {
cleanDir(path);
}
// delete system info
@@ -319,11 +319,11 @@ public class EnvironmentUtils {
private static void createAllDir() {
// create sequential files
- for (String path : tierManager.getAllSequenceFileFolders()) {
+ for (String path : tierManager.getAllLocalSequenceFileFolders()) {
createDir(path);
}
// create unsequential files
- for (String path : tierManager.getAllUnSequenceFileFolders()) {
+ for (String path : tierManager.getAllLocalUnSequenceFileFolders()) {
createDir(path);
}
// create database