You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/05/23 13:12:16 UTC
[iotdb] 04/05: test migration
This is an automated email from the ASF dual-hosted git repository.
xingtanzjr pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit d263283d704bf5340f92d6ba3ec6bbc6c97fa60b
Author: Jinrui.Zhang <xi...@gmail.com>
AuthorDate: Tue May 23 21:11:44 2023 +0800
test migration
---
.../apache/iotdb/commons/conf/CommonConfig.java | 2 +-
.../org/apache/iotdb/os/cache/OSFileCache.java | 8 ++-
.../org/apache/iotdb/os/cache/OSFileCacheKey.java | 4 ++
.../apache/iotdb/os/conf/ObjectStorageConfig.java | 8 +--
.../iotdb/os/conf/provider/OSProviderConfig.java | 2 +-
.../apache/iotdb/os/conf/provider/TestConfig.java | 2 +-
.../apache/iotdb/os/fileSystem/OSTsFileInput.java | 4 ++
.../os/io/test/TestObjectStorageConnector.java | 68 +++++++++++++++++++---
.../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 7 ++-
.../iotdb/db/conf/directories/TierManager.java | 5 ++
.../iotdb/db/engine/migration/MigrationTask.java | 11 ----
.../db/engine/migration/MigrationTaskManager.java | 39 ++++++++++---
.../java/org/apache/iotdb/db/service/DataNode.java | 1 +
.../fileInputFactory/HybridFileInputFactory.java | 5 +-
.../tsfile/fileSystem/fsFactory/OSFSFactory.java | 8 +--
.../org/apache/iotdb/tsfile/utils/FSUtils.java | 42 ++++++-------
16 files changed, 149 insertions(+), 67 deletions(-)
diff --git a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
index b48e18c400f..79072d8681a 100644
--- a/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
+++ b/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java
@@ -92,7 +92,7 @@ public class CommonConfig {
* Notice: if this property is changed, previous created database which are not set TTL will also
* be affected. Unit: millisecond
*/
- private long[] tierTTLInMs = {Long.MAX_VALUE};
+ private long[] tierTTLInMs = {2 * 24 * 60 * 60 * 1000L, Long.MAX_VALUE};
/** Thrift socket and connection timeout between data node and config node. */
private int connectionTimeoutInMS = (int) TimeUnit.SECONDS.toMillis(20);
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
index a11c140d848..b302d6983ad 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
@@ -63,7 +63,13 @@ public class OSFileCache {
}
public OSFileCacheValue get(OSFileCacheKey key) {
- return remotePos2LocalCacheFile.get(key);
+ OSFileCacheValue value = remotePos2LocalCacheFile.get(key);
+ // TODO try to simplify the logic here
+ if (!value.getCacheFile().exists()) {
+ logger.info("want {} but file deleted", key);
+ remotePos2LocalCacheFile.invalidate(key);
+ }
+ return value;
}
/** This method is used by the recover procedure */
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
index c71724a34c4..4bd6f47f085 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
@@ -79,4 +79,8 @@ public class OSFileCacheKey implements Serializable {
OSFileCacheKey that = (OSFileCacheKey) obj;
return file.equals(that.file) && startPosition == that.startPosition;
}
+
+ public String toString() {
+ return file.getName() + "," + startPosition;
+ }
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java b/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
index 9126fb21cf6..b66b4871768 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
@@ -26,17 +26,17 @@ import org.apache.iotdb.os.utils.ObjectStorageType;
import java.io.File;
public class ObjectStorageConfig {
- private ObjectStorageType osType = ObjectStorageType.AWS_S3;
+ private ObjectStorageType osType = ObjectStorageType.TEST;
- private OSProviderConfig providerConfig = new AWSS3Config();
+ private OSProviderConfig providerConfig = new TestConfig();
private String[] cacheDirs = {
"data" + File.separator + "datanode" + File.separator + "data" + File.separator + "cache"
};
- private long cacheMaxDiskUsage = 50 * 1024 * 1024 * 1024L;
+ private long cacheMaxDiskUsage = 50 * 1024 * 1024L;
- private int cachePageSize = 20 * 1024 * 1024;
+ private int cachePageSize = 10 * 1024 * 1024;
ObjectStorageConfig() {}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/conf/provider/OSProviderConfig.java b/object-storage/src/main/java/org/apache/iotdb/os/conf/provider/OSProviderConfig.java
index 9c42c673a99..ca0fff2ec18 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/conf/provider/OSProviderConfig.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/conf/provider/OSProviderConfig.java
@@ -20,7 +20,7 @@ package org.apache.iotdb.os.conf.provider;
public abstract class OSProviderConfig {
protected String endpoint;
- protected String bucketName;
+ protected String bucketName = "iotdb_data";
protected String accessKeyId;
protected String accessKeySecret;
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/conf/provider/TestConfig.java b/object-storage/src/main/java/org/apache/iotdb/os/conf/provider/TestConfig.java
index ac32757b0c6..558d01fbe8d 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/conf/provider/TestConfig.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/conf/provider/TestConfig.java
@@ -21,7 +21,7 @@ package org.apache.iotdb.os.conf.provider;
import java.io.File;
public class TestConfig extends OSProviderConfig {
- private String testDir = "target" + File.separator + "test";
+ private String testDir = "data" + File.separator + "test_s3";
public String getTestDir() {
return testDir;
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
index 41d893f9291..3e83e56f726 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
@@ -29,6 +29,10 @@ public class OSTsFileInput implements TsFileInput {
private OSFile file;
private OSFileChannel channel;
+ public OSTsFileInput(String fileURI) throws IOException {
+ this(new OSFile(fileURI));
+ }
+
public OSTsFileInput(OSFile file) throws IOException {
this.file = file;
this.channel = new OSFileChannel(file);
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/io/test/TestObjectStorageConnector.java b/object-storage/src/main/java/org/apache/iotdb/os/io/test/TestObjectStorageConnector.java
index 70ee1a0d817..cfeb74e6908 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/io/test/TestObjectStorageConnector.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/io/test/TestObjectStorageConnector.java
@@ -24,9 +24,18 @@ import org.apache.iotdb.os.exception.ObjectStorageException;
import org.apache.iotdb.os.fileSystem.OSURI;
import org.apache.iotdb.os.io.IMetaData;
import org.apache.iotdb.os.io.ObjectStorageConnector;
+import org.apache.iotdb.os.io.aws.S3MetaData;
+import org.apache.iotdb.os.utils.ObjectStorageConstant;
+
+import org.apache.commons.io.FileUtils;
import java.io.File;
+import java.io.IOException;
import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
public class TestObjectStorageConnector implements ObjectStorageConnector {
private final TestConfig testConfig =
@@ -34,45 +43,88 @@ public class TestObjectStorageConnector implements ObjectStorageConnector {
@Override
public boolean doesObjectExist(OSURI osUri) throws ObjectStorageException {
- return false;
+ File file = new File(getDstFilePath(osUri));
+ return file.exists();
}
@Override
public IMetaData getMetaData(OSURI osUri) throws ObjectStorageException {
- return null;
+ File file = new File(getDstFilePath(osUri));
+ return new S3MetaData(file.length(), System.currentTimeMillis());
}
@Override
public boolean createNewEmptyObject(OSURI osUri) throws ObjectStorageException {
+ File file = new File(getDstFilePath(osUri));
+ if (!file.exists()) {
+ try {
+ return file.createNewFile();
+ } catch (IOException e) {
+ throw new ObjectStorageException(e);
+ }
+ }
return false;
}
@Override
public boolean delete(OSURI osUri) throws ObjectStorageException {
- return false;
+ File file = new File(getDstFilePath(osUri));
+ return file.delete();
}
@Override
public boolean renameTo(OSURI fromOSUri, OSURI toOSUri) throws ObjectStorageException {
- return false;
+ File file = new File(getDstFilePath(fromOSUri));
+ return file.renameTo(new File(getDstFilePath(toOSUri)));
}
@Override
public InputStream getInputStream(OSURI osUri) throws ObjectStorageException {
- return null;
+ File file = new File(getDstFilePath(osUri));
+ try {
+ return Channels.newInputStream(FileChannel.open(file.toPath(), StandardOpenOption.READ));
+ } catch (IOException e) {
+ throw new ObjectStorageException(e);
+ }
}
@Override
public OSURI[] list(OSURI osUri) throws ObjectStorageException {
- return new OSURI[0];
+ return null;
}
@Override
- public void putLocalFile(OSURI osUri, File lcoalFile) throws ObjectStorageException {}
+ public void putLocalFile(OSURI osUri, File localFile) throws ObjectStorageException {
+ try {
+ File targetFile = new File(getDstFilePath(osUri));
+ if (!targetFile.getParentFile().exists() && !targetFile.getParentFile().mkdirs()) {
+ throw new ObjectStorageException(
+ String.format(
+ "[TieredMigration] cannot mkdir for path %s",
+ targetFile.getParentFile().getAbsolutePath()));
+ }
+ FileUtils.copyFile(localFile, targetFile);
+ } catch (IOException e) {
+ throw new ObjectStorageException(e);
+ }
+ }
@Override
public byte[] getRemoteFile(OSURI osUri, long position, int len) throws ObjectStorageException {
- return new byte[0];
+ File file = new File(getDstFilePath(osUri));
+ ByteBuffer dst = ByteBuffer.allocate(len);
+ try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.READ)) {
+ channel.read(dst, position);
+ } catch (IOException e) {
+ throw new ObjectStorageException(e);
+ }
+ return dst.array();
+ }
+
+ private String getDstFilePath(OSURI osuri) {
+ return testConfig.getTestDir()
+ + File.separator
+ + osuri.getKey().replace(ObjectStorageConstant.FILE_SEPARATOR, File.separator);
}
@Override
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index e48757b398d..c59bd9c558e 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -300,7 +300,8 @@ public class IoTDBConfig {
/** Tiered data directories. It can be settled as dataDirs = {{"data1"}, {"data2", "data3"}}; */
private String[][] tierDataDirs = {
- {IoTDBConstant.DEFAULT_BASE_DIR + File.separator + IoTDBConstant.DATA_FOLDER_NAME}
+ {IoTDBConstant.DEFAULT_BASE_DIR + File.separator + IoTDBConstant.DATA_FOLDER_NAME},
+ {IoTDBConstant.OBJECT_STORAGE_DIR}
};
private String loadTsFileDir =
@@ -528,7 +529,7 @@ public class IoTDBConfig {
private int candidateCompactionTaskQueueSize = 50;
/** whether to cache meta data(ChunkMetaData and TsFileMetaData) or not. */
- private boolean metaDataCacheEnable = true;
+ private boolean metaDataCacheEnable = false;
/** Memory allocated for bloomFilter cache in read process */
private long allocateMemoryForBloomFilterCache = allocateMemoryForRead / 1001;
@@ -1136,7 +1137,7 @@ public class IoTDBConfig {
private String RateLimiterType = "FixedIntervalRateLimiter";
/** Threads for migration tasks */
- private int migrateThreadCount = 3;
+ private int migrateThreadCount = 1;
/** Enable hdfs or not */
private boolean enableObjectStorage = false;
diff --git a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
index e0483be0558..32b8e8aea94 100644
--- a/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/conf/directories/TierManager.java
@@ -92,6 +92,11 @@ public class TierManager {
return;
}
+ seqTiers.clear();
+ unSeqTiers.clear();
+ seqDir2TierLevel.clear();
+ unSeqDir2TierLevel.clear();
+
String[][] tierDirs = config.getTierDataDirs();
for (int i = 0; i < tierDirs.length; ++i) {
for (int j = 0; j < tierDirs[i].length; ++j) {
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
index 998e903925c..36766750832 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTask.java
@@ -70,18 +70,7 @@ public abstract class MigrationTask implements Runnable {
@Override
public void run() {
- if (canMigrate()) {
- tsFile.setIsMigrating(true);
- if (!canMigrate()) {
- tsFile.setIsMigrating(false);
- return;
- }
- } else {
- return;
- }
-
migrate();
-
tsFile.setIsMigrating(false);
}
diff --git a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
index 4bd9712c263..0ebaa49d7f2 100644
--- a/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
+++ b/server/src/main/java/org/apache/iotdb/db/engine/migration/MigrationTaskManager.java
@@ -50,7 +50,7 @@ public class MigrationTaskManager implements IService {
private static final IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
private static final CommonConfig commonConfig = CommonDescriptor.getInstance().getConfig();
private static final TierManager tierManager = TierManager.getInstance();
- private static final long CHECK_INTERVAL_IN_SECONDS = 10 * 60;
+ private static final long CHECK_INTERVAL_IN_SECONDS = 10;
private static final double TIER_DISK_SPACE_WARN_THRESHOLD =
commonConfig.getDiskSpaceWarningThreshold() + 0.1;
private static final double TIER_DISK_SPACE_SAFE_THRESHOLD =
@@ -107,28 +107,30 @@ public class MigrationTaskManager implements IService {
tsfiles.sort(this::compareMigrationPriority);
for (TsFileResource tsfile : tsfiles) {
try {
- int tierLevel = tsfile.getTierLevel();
+ int currentTier = tsfile.getTierLevel();
+ int nextTier = currentTier + 1;
// only migrate closed TsFiles not in the last tier
if (tsfile.getStatus() != TsFileResourceStatus.NORMAL
- || tierLevel == iotdbConfig.getTierDataDirs().length - 1) {
+ || nextTier == iotdbConfig.getTierDataDirs().length) {
continue;
}
// check tier ttl and disk space
long tierTTL =
DateTimeUtils.convertMilliTimeWithPrecision(
- commonConfig.getTierTTLInMs()[tierLevel], iotdbConfig.getTimestampPrecision());
+ commonConfig.getTierTTLInMs()[currentTier],
+ iotdbConfig.getTimestampPrecision());
if (tsfile.stillLives(tierTTL)) {
submitMigrationTask(
- tierLevel,
+ currentTier,
MigrationCause.TTL,
tsfile,
- tierManager.getNextFolderForTsFile(tierLevel, tsfile.isSeq()));
- } else if (needMigrationTiers.contains(tierLevel)) {
+ tierManager.getNextFolderForTsFile(nextTier, tsfile.isSeq()));
+ } else if (needMigrationTiers.contains(currentTier)) {
submitMigrationTask(
- tierLevel,
+ currentTier,
MigrationCause.DISK_SPACE,
tsfile,
- tierManager.getNextFolderForTsFile(tierLevel, tsfile.isSeq()));
+ tierManager.getNextFolderForTsFile(nextTier, tsfile.isSeq()));
}
} catch (Exception e) {
logger.error(
@@ -140,6 +142,9 @@ public class MigrationTaskManager implements IService {
private void submitMigrationTask(
int tierLevel, MigrationCause cause, TsFileResource sourceTsFile, String targetDir) {
+ if (!checkAndMarkMigrate(sourceTsFile)) {
+ return;
+ }
MigrationTask task = MigrationTask.newTask(cause, sourceTsFile, targetDir);
workers.submit(task);
tierDiskUsableSpace[tierLevel] -= sourceTsFile.getTsFileSize();
@@ -151,6 +156,22 @@ public class MigrationTaskManager implements IService {
}
}
+ private boolean checkAndMarkMigrate(TsFileResource tsFile) {
+ if (canMigrate(tsFile)) {
+ tsFile.setIsMigrating(true);
+ if (!canMigrate(tsFile)) {
+ tsFile.setIsMigrating(false);
+ return false;
+ }
+ return true;
+ }
+ return false;
+ }
+
+ private boolean canMigrate(TsFileResource tsFile) {
+ return tsFile.getStatus() == TsFileResourceStatus.NORMAL;
+ }
+
private int compareMigrationPriority(TsFileResource f1, TsFileResource f2) {
// old time partitions first
int res = Long.compare(f1.getTimePartition(), f2.getTimePartition());
diff --git a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 829494c6375..4010f2ed6b1 100644
--- a/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/server/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -459,6 +459,7 @@ public class DataNode implements DataNodeMBean {
/* Store runtime configurations when restart request is accepted */
storeRuntimeConfigurations(
dataNodeRestartResp.getConfigNodeList(), dataNodeRestartResp.getRuntimeConfiguration());
+ configOSStorage(config.getDataNodeId());
logger.info("Restart request to cluster: {} is accepted.", config.getClusterName());
} else {
/* Throw exception when restart is rejected */
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HybridFileInputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HybridFileInputFactory.java
index 3a7cc7d099e..f9140af71e9 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HybridFileInputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/HybridFileInputFactory.java
@@ -42,8 +42,7 @@ public class HybridFileInputFactory implements FileInputFactory {
@Override
public TsFileInput getTsFileInput(String filePath) throws IOException {
- return inputFactories.get(FSType.OBJECT_STORAGE).getTsFileInput(filePath);
-// FSPath path = FSUtils.parse(filePath);
-// return inputFactories.get(path.getFsType()).getTsFileInput(path.getPath());
+ FSPath path = FSUtils.parse(filePath);
+ return inputFactories.get(path.getFsType()).getTsFileInput(path.getPath());
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
index 59b175d4926..130ee579d7b 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
@@ -67,8 +67,8 @@ public class OSFSFactory implements FSFactory {
listFilesBySuffix = clazz.getMethod("listFilesBySuffix", String.class, String.class);
listFilesByPrefix = clazz.getMethod("listFilesByPrefix", String.class, String.class);
renameTo = clazz.getMethod("renameTo", File.class);
- renameTo = clazz.getMethod("putFile", File.class);
- renameTo = clazz.getMethod("copyTo", File.class);
+ putFile = clazz.getMethod("putFile", File.class);
+ copyTo = clazz.getMethod("copyTo", File.class);
} catch (ClassNotFoundException | NoSuchMethodException e) {
logger.error(
"Failed to get object storage. Please check your dependency of object storage module.",
@@ -211,13 +211,13 @@ public class OSFSFactory implements FSFactory {
FSType srcType = FSUtils.getFSType(srcFile);
try {
if (srcType == FSType.LOCAL) {
- putFile.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()), destFile);
+ putFile.invoke(destFile, srcFile);
} else if (srcType == FSType.OBJECT_STORAGE) {
copyTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()), destFile);
} else {
throw new IOException(
String.format(
- "Doesn't support move file from %s to %s.", srcType, FSType.OBJECT_STORAGE));
+ "Doesn't support copy file from %s to %s.", srcType, FSType.OBJECT_STORAGE));
}
} catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
logger.error(
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
index 2a8169655cd..5b12fa6e69f 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
@@ -102,7 +102,6 @@ public class FSUtils {
for (int i = 0; i < fsTypes.length; ++i) {
if (fsPath.startsWith(fsPrefix[i])) {
type = fsTypes[i];
- path = fsPath.substring(fsPrefix[i].length());
break;
}
}
@@ -110,32 +109,33 @@ public class FSUtils {
}
public static String getOSDefaultPath(String bucket, int dataNodeId) {
- return new FSPath(FSType.OBJECT_STORAGE, fsPrefix[0] + OS_FILE_SEPARATOR + dataNodeId)
+ return new FSPath(FSType.OBJECT_STORAGE, fsPrefix[0] + bucket + OS_FILE_SEPARATOR + dataNodeId)
.getPath();
}
-// public static FSPath parseLocalTsFile2OSFile(File lcoalFile, String bucket, int dataNodeId)
-// throws IOException {
-// String[] filePathSplits = FilePathUtils.splitTsFilePath(lcoalFile.getCanonicalPath());
-// return new FSPath(
-// FSType.OBJECT_STORAGE,
-// fsPrefix[0]
-// + bucket
-// + OS_FILE_SEPARATOR
-// + dataNodeId
-// + OS_FILE_SEPARATOR
-// + String.join(
-// OS_FILE_SEPARATOR,
-// Arrays.copyOfRange(
-// filePathSplits, filePathSplits.length - 5, filePathSplits.length)));
-// }
-
- public static FSPath parseLocalTsFile2OSFile(File lcoalFile, String bucket, int dataNodeId)
+ public static FSPath parseLocalTsFile2OSFile(File localFile, String bucket, int dataNodeId)
throws IOException {
- String fileName = lcoalFile.getName();
- return new FSPath(FSType.LOCAL, "/Users/jinruizhangjinrui/Documents/work/iotdb/data/datanode/s3/" + fileName);
+ String[] filePathSplits = FilePathUtils.splitTsFilePath(localFile.getCanonicalPath());
+ return new FSPath(
+ FSType.OBJECT_STORAGE,
+ fsPrefix[0]
+ + bucket
+ + OS_FILE_SEPARATOR
+ + dataNodeId
+ + OS_FILE_SEPARATOR
+ + String.join(
+ OS_FILE_SEPARATOR,
+ Arrays.copyOfRange(
+ filePathSplits, filePathSplits.length - 5, filePathSplits.length)));
}
+ // public static FSPath parseLocalTsFile2OSFile(File lcoalFile, String bucket, int dataNodeId)
+ // throws IOException {
+ // String fileName = lcoalFile.getName();
+ // return new FSPath(FSType.OBJECT_STORAGE,
+ // "os://bucket/Users/jinruizhangjinrui/Documents/work/iotdb/data/datanode/s3/" + fileName);
+ // }
+
public static boolean isLocal(String fsPath) {
return getFSType(fsPath) == FSType.LOCAL;
}