You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/05/24 03:02:46 UTC
[iotdb] 01/01: refine code
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 92e7cfcf277e870d11dd3dbfb9aa4ce986296a98
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Wed May 24 10:59:18 2023 +0800
refine code
---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 2 +-
.../iotdb/db/conf/directories/TierManager.java | 1 +
.../db/engine/migration/LocalMigrationTask.java | 22 +++++++-------
.../iotdb/db/engine/migration/MigrationTask.java | 34 +++++++++++++---------
.../db/engine/migration/MigrationTaskManager.java | 14 ++++++---
.../db/engine/migration/RemoteMigrationTask.java | 17 ++++++-----
.../db/engine/storagegroup/TsFileProcessor.java | 4 ++-
.../db/engine/storagegroup/TsFileResource.java | 6 +++-
.../iotdb/tsfile/common/conf/TSFileConfig.java | 3 +-
.../org/apache/iotdb/tsfile/utils/FSUtils.java | 20 ++++++-------
10 files changed, 72 insertions(+), 51 deletions(-)
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index c59bd9c558e..40a8c3387db 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1140,7 +1140,7 @@ public class IoTDBConfig {
private int migrateThreadCount = 1;
/** Enable hdfs or not */
- private boolean enableObjectStorage = false;
+ private boolean enableObjectStorage = true;
/** Config for object storage */
private ObjectStorageConfig osConfig = ObjectStorageDescriptor.getInstance().getConfig();
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
index 7c3335910b8..734d7a5decf 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
@@ -206,6 +206,7 @@ public class TierManager {
}
public int getFileTierLevel(File file) {
+ // If the file does not exist on Local disk, it is assumed be on remote Object Storage
if (!file.exists()) {
return getTiersNum() - 1;
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
index 7746007fb0c..889466733a5 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/LocalMigrationTask.java
@@ -28,33 +28,35 @@ import java.io.IOException;
public class LocalMigrationTask extends MigrationTask {
private static final Logger logger = LoggerFactory.getLogger(LocalMigrationTask.class);
- LocalMigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) {
+ protected LocalMigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir)
+ throws IOException {
super(cause, tsFile, targetDir);
}
@Override
public void migrate() {
// copy TsFile and resource file
- tsFile.readLock();
+ tsFileResource.readLock();
try {
- fsFactory.copyFile(srcTsFile, destTsFile);
+ fsFactory.copyFile(srcFile, destTsFile);
fsFactory.copyFile(srcResourceFile, destResourceFile);
} catch (IOException e) {
- logger.error("Fail to copy TsFile {}", srcTsFile);
+ logger.error("Fail to copy TsFile {}", srcFile);
destTsFile.delete();
destResourceFile.delete();
return;
} finally {
- tsFile.readUnlock();
+ tsFileResource.readUnlock();
}
// close mods file and replace TsFile path
- tsFile.writeLock();
+ tsFileResource.writeLock();
try {
- tsFile.resetModFile();
+ tsFileResource.resetModFile();
+ // migrate MOD file only when it exists
if (srcModsFile.exists()) {
fsFactory.copyFile(srcModsFile, destModsFile);
}
- tsFile.setFile(destTsFile);
+ tsFileResource.setFile(destTsFile);
} catch (IOException e) {
logger.error("Fail to copy mods file {}", srcModsFile);
destTsFile.delete();
@@ -62,10 +64,10 @@ public class LocalMigrationTask extends MigrationTask {
destModsFile.delete();
return;
} finally {
- tsFile.writeUnlock();
+ tsFileResource.writeUnlock();
}
// clear src files
- srcTsFile.delete();
+ srcFile.delete();
srcResourceFile.delete();
srcModsFile.delete();
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
index 8bdfc751256..a894334a84c 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
@@ -26,41 +26,47 @@ import org.apache.iotdb.tsfile.fileSystem.fsFactory.FSFactory;
import org.apache.iotdb.tsfile.utils.FSUtils;
import java.io.File;
+import java.io.IOException;
public abstract class MigrationTask implements Runnable {
protected static final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
protected final MigrationCause cause;
- protected final TsFileResource tsFile;
+ protected final TsFileResource tsFileResource;
protected final String targetDir;
- protected final File srcTsFile;
+ protected final File srcFile;
protected final File destTsFile;
protected final File srcResourceFile;
protected final File destResourceFile;
protected final File srcModsFile;
protected final File destModsFile;
- MigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) {
+ protected MigrationTask(MigrationCause cause, TsFileResource tsFileResource, String targetDir)
+ throws IOException {
this.cause = cause;
- this.tsFile = tsFile;
+ this.tsFileResource = tsFileResource;
this.targetDir = targetDir;
- this.srcTsFile = tsFile.getTsFile();
- this.destTsFile = fsFactory.getFile(targetDir, tsFile.getTsFile().getName());
+ this.srcFile = tsFileResource.getTsFile();
+ this.destTsFile = fsFactory.getFile(targetDir, getDestTsFilePath(srcFile));
this.srcResourceFile =
fsFactory.getFile(
- srcTsFile.getParentFile(), srcTsFile.getName() + TsFileResource.RESOURCE_SUFFIX);
+ srcFile.getParentFile(), srcFile.getName() + TsFileResource.RESOURCE_SUFFIX);
this.destResourceFile =
- fsFactory.getFile(targetDir, tsFile.getTsFile().getName() + TsFileResource.RESOURCE_SUFFIX);
+ fsFactory.getFile(targetDir, getDestTsFilePath(srcFile) + TsFileResource.RESOURCE_SUFFIX);
this.srcModsFile =
fsFactory.getFile(
- srcTsFile.getParentFile(), srcTsFile.getName() + ModificationFile.FILE_SUFFIX);
+ srcFile.getParentFile(), srcFile.getName() + ModificationFile.FILE_SUFFIX);
this.destModsFile =
- fsFactory.getFile(targetDir, tsFile.getTsFile().getName() + ModificationFile.FILE_SUFFIX);
+ fsFactory.getFile(targetDir, getDestTsFilePath(srcFile) + ModificationFile.FILE_SUFFIX);
+ }
+
+ private String getDestTsFilePath(File src) throws IOException {
+ return FSUtils.getLocalTsFileShortPath(src, FSUtils.PATH_FROM_DATABASE_LEVEL);
}
public static MigrationTask newTask(
- MigrationCause cause, TsFileResource sourceTsFile, String targetDir) {
+ MigrationCause cause, TsFileResource sourceTsFile, String targetDir) throws IOException {
if (FSUtils.isLocal(targetDir)) {
return new LocalMigrationTask(cause, sourceTsFile, targetDir);
} else {
@@ -71,12 +77,12 @@ public abstract class MigrationTask implements Runnable {
@Override
public void run() {
migrate();
- tsFile.increaseTierLevel();
- tsFile.setIsMigrating(false);
+ tsFileResource.increaseTierLevel();
+ tsFileResource.setIsMigrating(false);
}
protected boolean canMigrate() {
- return tsFile.getStatus() == TsFileResourceStatus.NORMAL;
+ return tsFileResource.getStatus() == TsFileResourceStatus.NORMAL;
}
public abstract void migrate();
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
index 0ebaa49d7f2..ede3ecce410 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.db.utils.DateTimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -134,14 +135,15 @@ public class MigrationTaskManager implements IService {
}
} catch (Exception e) {
logger.error(
- "An error occurred when checking migration of TsFileResource {}", tsfile, e);
+ "An error occurred when check and try to migrate TsFileResource {}", tsfile, e);
}
}
}
}
private void submitMigrationTask(
- int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, String targetDir) {
+ int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, String targetDir)
+ throws IOException {
if (!checkAndMarkMigrate(sourceTsFile)) {
return;
}
@@ -159,7 +161,7 @@ public class MigrationTaskManager implements IService {
private boolean checkAndMarkMigrate(TsFileResource tsFile) {
if (canMigrate(tsFile)) {
tsFile.setIsMigrating(true);
- if (!canMigrate(tsFile)) {
+ if (occupiedByCompaction(tsFile)) {
tsFile.setIsMigrating(false);
return false;
}
@@ -169,7 +171,11 @@ public class MigrationTaskManager implements IService {
}
private boolean canMigrate(TsFileResource tsFile) {
- return tsFile.getStatus() == TsFileResourceStatus.NORMAL;
+ return tsFile.getStatus() == TsFileResourceStatus.NORMAL && !tsFile.isMigrating();
+ }
+
+ private boolean occupiedByCompaction(TsFileResource tsFile) {
+ return tsFile.getStatus() != TsFileResourceStatus.NORMAL;
}
private int compareMigrationPriority(TsFileResource f1, TsFileResource f2) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
index 65aeb4d9181..428778ecea8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/RemoteMigrationTask.java
@@ -28,31 +28,32 @@ import java.io.IOException;
public class RemoteMigrationTask extends MigrationTask {
private static final Logger logger = LoggerFactory.getLogger(RemoteMigrationTask.class);
- RemoteMigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir) {
+ protected RemoteMigrationTask(MigrationCause cause, TsFileResource tsFile, String targetDir)
+ throws IOException {
super(cause, tsFile, targetDir);
}
@Override
public void migrate() {
// copy TsFile and resource file
- tsFile.readLock();
+ tsFileResource.readLock();
try {
- fsFactory.copyFile(srcTsFile, destTsFile);
+ fsFactory.copyFile(srcFile, destTsFile);
fsFactory.copyFile(srcResourceFile, destResourceFile);
} catch (IOException e) {
- logger.error("Fail to copy TsFile {}", srcTsFile);
+ logger.error("Fail to copy TsFile {}", srcFile);
destTsFile.delete();
destResourceFile.delete();
return;
} finally {
- tsFile.readUnlock();
+ tsFileResource.readUnlock();
}
// clear src files
- tsFile.writeLock();
+ tsFileResource.writeLock();
try {
- srcTsFile.delete();
+ srcFile.delete();
} finally {
- tsFile.writeUnlock();
+ tsFileResource.writeUnlock();
}
}
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
index f4f33c10473..513a0a25458 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileProcessor.java
@@ -185,11 +185,13 @@ public class TsFileProcessor {
boolean sequence)
throws IOException {
this.storageGroupName = storageGroupName;
+ // this.sequence should be assigned at first because `this` will be passed as parameter to other
+ // val later
+ this.sequence = sequence;
this.tsFileResource = new TsFileResource(tsfile, this);
this.dataRegionInfo = dataRegionInfo;
this.writer = new RestorableTsFileIOWriter(tsfile);
this.updateLatestFlushTimeCallback = updateLatestFlushTimeCallback;
- this.sequence = sequence;
this.walNode =
WALManager.getInstance()
.applyForWALNode(WALManager.getApplicantUniqueId(storageGroupName, sequence));
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
index b7536902ccc..fe1b9c102d8 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/storagegroup/TsFileResource.java
@@ -185,6 +185,8 @@ public class TsFileResource {
this.version = FilePathUtils.splitAndGetTsFileVersion(this.file.getName());
this.timeIndex = CONFIG.getTimeIndexLevel().getTimeIndex();
this.isSeq = FilePathUtils.isSequence(this.file.getAbsolutePath());
+ // This method is invoked when DataNode recovers, so the tierLevel should be calculated when
+ // restarting
this.tierLevel = TierManager.getInstance().getFileTierLevel(file);
}
@@ -201,7 +203,9 @@ public class TsFileResource {
this.timeIndex = CONFIG.getTimeIndexLevel().getTimeIndex();
this.processor = processor;
this.isSeq = processor.isSequence();
- this.tierLevel = TierManager.getInstance().getFileTierLevel(file);
+ // this method is invoked when a new TsFile is created and a newly created TsFile's the
+ // tierLevel is 0 by default
+ this.tierLevel = 0;
}
/** unsealed TsFile, for query */
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
index 8712ba0aba5..9d7813784a9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -120,7 +120,8 @@ public class TSFileConfig implements Serializable {
/** Default endian value is BIG_ENDIAN. */
private String endian = "BIG_ENDIAN";
/** Default storage is in local file system */
- private FSType[] TSFileStorageFs = new FSType[] {FSType.LOCAL};
+ // TODO: (haiming) fix the bug that the config is not loaded
+ private FSType[] TSFileStorageFs = new FSType[] {FSType.LOCAL, FSType.OBJECT_STORAGE};
/** Default core-site.xml file path is /etc/hadoop/conf/core-site.xml */
private String coreSitePath = "/etc/hadoop/conf/core-site.xml";
/** Default hdfs-site.xml file path is /etc/hadoop/conf/hdfs-site.xml */
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
index 5291985b678..c5f80343d70 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
@@ -32,6 +32,8 @@ import java.util.Arrays;
public class FSUtils {
private static final Logger logger = LoggerFactory.getLogger(FSUtils.class);
private static final FSType[] fsTypes = {FSType.OBJECT_STORAGE, FSType.HDFS};
+ public static final int PATH_FROM_SEQUENCE_LEVEL = 5;
+ public static final int PATH_FROM_DATABASE_LEVEL = 4;
public static final String[] fsPrefix = {"os://", "hdfs://"};
public static final String OS_FILE_SEPARATOR = "/";
private static final String[] fsFileClassName = {
@@ -119,7 +121,6 @@ public class FSUtils {
public static FSPath parseLocalTsFile2OSFile(File localFile, String bucket, int dataNodeId)
throws IOException {
- String[] filePathSplits = FilePathUtils.splitTsFilePath(localFile.getCanonicalPath());
return new FSPath(
FSType.OBJECT_STORAGE,
fsPrefix[0]
@@ -127,18 +128,15 @@ public class FSUtils {
+ OS_FILE_SEPARATOR
+ dataNodeId
+ OS_FILE_SEPARATOR
- + String.join(
- OS_FILE_SEPARATOR,
- Arrays.copyOfRange(
- filePathSplits, filePathSplits.length - 5, filePathSplits.length)));
+ + getLocalTsFileShortPath(localFile, PATH_FROM_SEQUENCE_LEVEL));
}
- // public static FSPath parseLocalTsFile2OSFile(File lcoalFile, String bucket, int dataNodeId)
- // throws IOException {
- // String fileName = lcoalFile.getName();
- // return new FSPath(FSType.OBJECT_STORAGE,
- // "os://bucket/Users/jinruizhangjinrui/Documents/work/iotdb/data/datanode/s3/" + fileName);
- // }
+ public static String getLocalTsFileShortPath(File localTsFile, int level) throws IOException {
+ String[] filePathSplits = FilePathUtils.splitTsFilePath(localTsFile.getCanonicalPath());
+ return String.join(
+ OS_FILE_SEPARATOR,
+ Arrays.copyOfRange(filePathSplits, filePathSplits.length - level, filePathSplits.length));
+ }
public static boolean isLocal(String fsPath) {
return getFSType(fsPath) == FSType.LOCAL;