You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by he...@apache.org on 2023/05/15 03:44:10 UTC
[iotdb] branch tiered_storage updated: add caffeine
This is an automated email from the ASF dual-hosted git repository.
heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tiered_storage by this push:
new c9da83738aa add caffeine
c9da83738aa is described below
commit c9da83738aada5fbb1db82d78d191d9bf7de50e9
Author: HeimingZ <zh...@qq.com>
AuthorDate: Mon May 15 11:43:53 2023 +0800
add caffeine
---
object-storage/pom.xml | 4 ++
.../{CacheEntry.java => CacheFileChannel.java} | 8 +---
.../CacheInputStream.java} | 12 ++---
.../org/apache/iotdb/os/cache/CacheManger.java | 46 ------------------
.../org/apache/iotdb/os/cache/OSFileCache.java | 50 ++++++++++++++++++-
.../org/apache/iotdb/os/cache/OSFileCacheImpl.java | 56 ----------------------
.../org/apache/iotdb/os/cache/OSFileCacheKey.java | 3 +-
.../org/apache/iotdb/os/cache/PersistentCache.java | 33 -------------
.../iotdb/os/cache/RemoteFileCacheLoader.java | 54 +++++++++++++++++++++
.../apache/iotdb/os/conf/ObjectStorageConfig.java | 22 ++++-----
.../org/apache/iotdb/os/fileSystem/OSFile.java | 15 ++++--
.../{OSInput.java => OSTsFileInput.java} | 10 ++--
.../{OSOutput.java => OSTsFileOutput.java} | 4 +-
.../apache/iotdb/os/io/ObjectStorageConnector.java | 2 +
.../iotdb/os/io/aws/S3ObjectStorageConnector.java | 8 ++++
.../iotdb/tsfile/common/conf/TSFileConfig.java | 6 +++
.../fileInputFactory/OSFileInputFactory.java | 2 +-
.../fileOutputFactory/OSFileOutputFactory.java | 32 ++++++++++++-
.../org/apache/iotdb/tsfile/utils/FSUtils.java | 19 +++++++-
19 files changed, 208 insertions(+), 178 deletions(-)
diff --git a/object-storage/pom.xml b/object-storage/pom.xml
index c313b65bc53..8c4a4e2aaf3 100644
--- a/object-storage/pom.xml
+++ b/object-storage/pom.xml
@@ -57,6 +57,10 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.ben-manes.caffeine</groupId>
+ <artifactId>caffeine</artifactId>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheEntry.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
similarity index 84%
rename from object-storage/src/main/java/org/apache/iotdb/os/cache/CacheEntry.java
rename to object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
index 8a54fdee1ec..8b7bfc9124c 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheEntry.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheFileChannel.java
@@ -18,10 +18,4 @@
*/
package org.apache.iotdb.os.cache;
-import java.io.File;
-
-public class CacheEntry {
- private File cacheFile;
- // cached value, null when the value has been flushed
- private byte[] value;
-}
+public class CacheFileChannel {}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFileInputStream.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
similarity index 78%
rename from object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFileInputStream.java
rename to object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
index 9bc19d6face..2df16266ba6 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFileInputStream.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheInputStream.java
@@ -16,21 +16,21 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.iotdb.os.cache;
-package org.apache.iotdb.os.fileSystem;
+import org.apache.iotdb.os.fileSystem.OSTsFileInput;
import java.io.IOException;
import java.io.InputStream;
-public class OSFileInputStream extends InputStream {
-
- private OSInput osInput;
+public class CacheInputStream extends InputStream {
+ private OSTsFileInput osTsFileInput;
private long position; // 15
private long size; // 100
private InputStream currentInputStream; // 第二个文件,从 5 开始
- public OSFileInputStream(OSInput osInput, long position, long size) {
- this.osInput = osInput;
+ public CacheInputStream(OSTsFileInput osTsFileInput, long position, long size) {
+ this.osTsFileInput = osTsFileInput;
this.position = position;
this.size = size;
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheManger.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheManger.java
deleted file mode 100644
index 02175eb4ea7..00000000000
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheManger.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.os.cache;
-
-import org.apache.iotdb.os.conf.ObjectStorageConfig;
-import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
-
-import java.util.ArrayList;
-import java.util.List;
-
-public class CacheManger {
- private final ObjectStorageConfig config = ObjectStorageDescriptor.getInstance().getConfig();
- private final List<PersistentCache> caches = new ArrayList<>();
-
- private CacheManger() {
- for (String cacheDir : config.getCacheDirs()) {
- caches.add(new PersistentCache(cacheDir));
- }
- }
-
- public static CacheManger getInstance() {
- return InstanceHolder.INSTANCE;
- }
-
- private static class InstanceHolder {
- private InstanceHolder() {}
-
- private static final CacheManger INSTANCE = new CacheManger();
- }
-}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
index 4533e12470b..b7bb22c9918 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCache.java
@@ -19,9 +19,55 @@
package org.apache.iotdb.os.cache;
+import org.apache.iotdb.os.conf.ObjectStorageConfig;
+import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
+
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.LoadingCache;
+import com.github.benmanes.caffeine.cache.Weigher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+
+public class OSFileCache {
+ private static final Logger logger = LoggerFactory.getLogger(OSFileCache.class);
+ private static final ObjectStorageConfig config =
+ ObjectStorageDescriptor.getInstance().getConfig();
+
+ private LoadingCache<OSFileCacheKey, File> remotePos2LocalCacheFile;
+
+ private OSFileCache() {
+ remotePos2LocalCacheFile =
+ Caffeine.newBuilder()
+ .maximumWeight(config.getCacheMaxDiskUsage())
+ .weigher(
+ (Weigher<OSFileCacheKey, File>) (key, value) -> Math.toIntExact(value.length()))
+ .build(new RemoteFileCacheLoader());
+ }
+
+ public InputStream getAsInputSteam(String fileName, long startPosition) throws IOException {
+ FileChannel cacheFileChannel = getLocalCacheFileChannel(fileName, startPosition);
+ return Channels.newInputStream(cacheFileChannel);
+ }
+
+ private FileChannel getLocalCacheFileChannel(String fileName, long startPosition) {
+ // 根据 fileName 和 startPosition 计算出对应的本地文件路径,并返回对应的 FileChannel
+ // 如果是使用一个 CacheFile, 则寻找到对应的位置,可能需要封装一个自己的 FileChannel 防止读多
+ return null;
+ }
+
+ public static OSFileCache getInstance() {
+ return OSFileCache.InstanceHolder.INSTANCE;
+ }
+
+ private static class InstanceHolder {
+ private InstanceHolder() {}
-public interface OSFileCache {
- InputStream getAsInputSteam(String fileName, long startPosition) throws IOException;
+ private static final OSFileCache INSTANCE = new OSFileCache();
+ }
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheImpl.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheImpl.java
deleted file mode 100644
index d3b071607ee..00000000000
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheImpl.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.os.cache;
-
-import java.io.BufferedInputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.channels.Channels;
-import java.nio.channels.FileChannel;
-
-public class OSFileCacheImpl implements OSFileCache {
-
- private OSFileCacheImpl() {
-
- }
-
- @Override
- public InputStream getAsInputSteam(String fileName, long startPosition) throws IOException {
- FileChannel cacheFileChannel = getLocalCacheFileChannel(fileName, startPosition);
- return Channels.newInputStream(cacheFileChannel);
- }
-
- private FileChannel getLocalCacheFileChannel(String fileName, long startPosition) {
- // 根据 fileName 和 startPosition 计算出对应的本地文件路径,并返回对应的 FileChannel
- // 如果是使用一个 CacheFile, 则寻找到对应的位置,可能需要封装一个自己的 FileChannel 防止读多
- return null;
- }
-
- public static OSFileCacheImpl getInstance() {
- return OSFileCacheImpl.InstanceHolder.INSTANCE;
- }
-
- private static class InstanceHolder {
- private InstanceHolder() {}
-
- private static final OSFileCacheImpl INSTANCE = new OSFileCacheImpl();
- }
-}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
index b068fe985eb..8757483432c 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/OSFileCacheKey.java
@@ -21,7 +21,6 @@ package org.apache.iotdb.os.cache;
import org.apache.iotdb.os.fileSystem.OSFile;
public class OSFileCacheKey {
- private String fileName;
+ private OSFile file;
private long position;
- private int size;
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/PersistentCache.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/PersistentCache.java
deleted file mode 100644
index 5207c1b0efb..00000000000
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/PersistentCache.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.os.cache;
-
-public class PersistentCache {
- private final String cacheDir;
-
- public PersistentCache(String cacheDir) {
- this.cacheDir = cacheDir;
- }
-
- public byte[] get(OSFileCacheKey OSFileCacheKey) {
- return null;
- }
-
- private void serialize(OSFileCacheKey OSFileCacheKey, byte[] cacheVal) {}
-}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/RemoteFileCacheLoader.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/RemoteFileCacheLoader.java
new file mode 100644
index 00000000000..56e182fd823
--- /dev/null
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/RemoteFileCacheLoader.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.os.cache;
+
+import org.apache.iotdb.os.conf.ObjectStorageConfig;
+import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
+import org.apache.iotdb.os.io.ObjectStorageConnector;
+import org.apache.iotdb.os.io.aws.S3ObjectStorageConnector;
+
+import com.github.benmanes.caffeine.cache.CacheLoader;
+import org.checkerframework.checker.nullness.qual.NonNull;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+
+public class RemoteFileCacheLoader implements CacheLoader<OSFileCacheKey, File> {
+ private static final Logger logger = LoggerFactory.getLogger(RemoteFileCacheLoader.class);
+ private static final ObjectStorageConfig config =
+ ObjectStorageDescriptor.getInstance().getConfig();
+ private static final ObjectStorageConnector connector;
+
+ static {
+ switch (config.getOsType()) {
+ case AWS_S3:
+ connector = new S3ObjectStorageConnector();
+ break;
+ default:
+ connector = null;
+ }
+ }
+
+ @Override
+ public @Nullable File load(@NonNull OSFileCacheKey key) throws Exception {
+ return null;
+ }
+}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java b/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
index 9db205ca472..2935c641e48 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
@@ -24,33 +24,33 @@ import org.apache.iotdb.os.utils.ObjectStorageType;
import java.io.File;
public class ObjectStorageConfig {
- private static ObjectStorageType osType = ObjectStorageType.AWS_S3;
+ private ObjectStorageType osType = ObjectStorageType.AWS_S3;
- private static AWSS3Config awss3Config = new AWSS3Config();
+ private AWSS3Config awss3Config = new AWSS3Config();
- private static String[] cacheDirs = {
+ private String[] cacheDirs = {
"data" + File.separator + "datanode" + File.separator + "data" + File.separator + "cache"
};
- private static long cacheSizeThreshold = 10 * 1024 * 1024 * 1024;
+ private long cacheMaxDiskUsage = 20 * 1024 * 1024 * 1024L;
- private static long pageSize = 32 * 1024;
+ private int cachePageSize = 10 * 1024 * 1024;
ObjectStorageConfig() {}
- public static ObjectStorageType getOsType() {
+ public ObjectStorageType getOsType() {
return osType;
}
- public static String[] getCacheDirs() {
+ public String[] getCacheDirs() {
return cacheDirs;
}
- public static long getCacheSizeThreshold() {
- return cacheSizeThreshold;
+ public long getCacheMaxDiskUsage() {
+ return cacheMaxDiskUsage;
}
- public static long getPageSize() {
- return pageSize;
+ public int getCachePageSize() {
+ return cachePageSize;
}
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
index dab819b8015..64bff215538 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
@@ -64,6 +64,8 @@ public class OSFile extends File {
private final OSURI osUri;
+ private long length = 0L;
+
public OSFile(String pathname) {
super(pathname);
this.osUri = new OSURI(pathname);
@@ -204,12 +206,15 @@ public class OSFile extends File {
@Override
public long length() {
- try {
- return connector.getMetaData(osUri).length();
- } catch (ObjectStorageException e) {
- logger.error("Fail to get length of the object {}.", osUri, e);
- return 0;
+ if (length == 0) {
+ try {
+ length = connector.getMetaData(osUri).length();
+ } catch (ObjectStorageException e) {
+ logger.error("Fail to get length of the object {}.", osUri, e);
+ return 0;
+ }
}
+ return length;
}
@Override
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSInput.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
similarity index 91%
rename from object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSInput.java
rename to object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
index 2a081ea7424..18df7c4e270 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSInput.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileInput.java
@@ -19,7 +19,6 @@
package org.apache.iotdb.os.fileSystem;
import org.apache.iotdb.os.cache.OSFileCache;
-import org.apache.iotdb.os.cache.OSFileCacheImpl;
import org.apache.iotdb.tsfile.read.reader.TsFileInput;
import org.slf4j.Logger;
@@ -27,14 +26,15 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-public class OSInput implements TsFileInput {
- private static final Logger logger = LoggerFactory.getLogger(OSInput.class);
+public class OSTsFileInput implements TsFileInput {
+ private static final Logger logger = LoggerFactory.getLogger(OSTsFileInput.class);
private String osFileName;
- private OSFileCache cache = OSFileCacheImpl.getInstance();
+ private OSFileCache cache = OSFileCache.getInstance();
@Override
public long size() throws IOException {
@@ -74,7 +74,7 @@ public class OSInput implements TsFileInput {
@Override
public FileChannel wrapAsFileChannel() throws IOException {
- return null;
+ throw new UnsupportedEncodingException();
}
@Override
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSOutput.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileOutput.java
similarity index 96%
rename from object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSOutput.java
rename to object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileOutput.java
index bc9e7bdc79e..d580355c451 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSOutput.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSTsFileOutput.java
@@ -27,8 +27,8 @@ import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
-public class OSOutput implements TsFileOutput {
- private static final Logger logger = LoggerFactory.getLogger(OSOutput.class);
+public class OSTsFileOutput implements TsFileOutput {
+ private static final Logger logger = LoggerFactory.getLogger(OSTsFileOutput.class);
@Override
public void write(byte[] b) throws IOException {
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java b/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
index a40bc3dd377..cbf004329e6 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
@@ -41,4 +41,6 @@ public interface ObjectStorageConnector {
OSURI[] list(OSURI osUri) throws ObjectStorageException;
void putLocalFile(OSURI osUri, File lcoalFile) throws ObjectStorageException;
+
+ byte[] getRemoteFile(OSURI osUri, long position, int size) throws ObjectStorageException;
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java b/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
index 09b4a7bfb2f..92cfa90d767 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
@@ -160,6 +160,14 @@ public class S3ObjectStorageConnector implements ObjectStorageConnector {
}
}
+ @Override
+ public byte[] getRemoteFile(OSURI osUri, long position, int size) throws ObjectStorageException {
+ GetObjectRequest req =
+ GetObjectRequest.builder().bucket(osUri.getBucket()).key(osUri.getKey()).build();
+ s3Client.getObject(req);
+ return new byte[0];
+ }
+
public void close() {
s3Client.close();
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
index 4dd967556c8..041a67858fe 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -159,6 +159,8 @@ public class TSFileConfig implements Serializable {
private int patternMatchingThreshold = 1000000;
+ private String osBucket = "iotdb";
+
/** customizedProperties, this should be empty by default. */
private Properties customizedProperties = new Properties();
@@ -479,4 +481,8 @@ public class TSFileConfig implements Serializable {
public String getSprintzPredictScheme() {
return "fire";
}
+
+ public String getOSBucket() {
+ return osBucket;
+ }
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/OSFileInputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/OSFileInputFactory.java
index d020331d4c7..eac57b971d1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/OSFileInputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/OSFileInputFactory.java
@@ -29,7 +29,7 @@ import java.lang.reflect.InvocationTargetException;
public class OSFileInputFactory implements FileInputFactory {
private static final Logger logger = LoggerFactory.getLogger(OSFileInputFactory.class);
- private static final String OS_INPUT_CLASS_NAME = "org.apache.iotdb.os.fileSystem.OSInput";
+ private static final String OS_INPUT_CLASS_NAME = "org.apache.iotdb.os.fileSystem.OSTsFileInput";
private static Constructor constructor;
static {
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/OSFileOutputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/OSFileOutputFactory.java
index dfea67401cc..e96b86f1af8 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/OSFileOutputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/OSFileOutputFactory.java
@@ -20,9 +20,39 @@ package org.apache.iotdb.tsfile.fileSystem.fileOutputFactory;
import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
public class OSFileOutputFactory implements FileOutputFactory {
+ private static final Logger logger = LoggerFactory.getLogger(OSFileOutputFactory.class);
+ private static final String OS_OUTPUT_CLASS_NAME =
+ "org.apache.iotdb.os.fileSystem.OSTsFileOutput";
+ private static Constructor constructor;
+
+ static {
+ try {
+ Class<?> clazz = Class.forName(OS_OUTPUT_CLASS_NAME);
+ constructor = clazz.getConstructor(String.class, boolean.class);
+ } catch (ClassNotFoundException | NoSuchMethodException e) {
+ logger.error(
+ "Failed to get OSInput in object storage. Please check your dependency of object storage module.",
+ e);
+ }
+ }
+
@Override
public TsFileOutput getTsFileOutput(String filePath, boolean append) {
- throw new UnsupportedOperationException("Cannot directly write to object storage.");
+ try {
+ return (TsFileOutput) constructor.newInstance(filePath, !append);
+ } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+ logger.error(
+ "Failed to get TsFile output of file: {}. Please check your dependency of object storage module.",
+ filePath,
+ e);
+ return null;
+ }
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
index 908979307a6..ef5a07cf447 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/FSUtils.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.tsfile.utils;
+import org.apache.iotdb.tsfile.common.conf.TSFileConfig;
+import org.apache.iotdb.tsfile.common.conf.TSFileDescriptor;
import org.apache.iotdb.tsfile.fileSystem.FSPath;
import org.apache.iotdb.tsfile.fileSystem.FSType;
@@ -25,10 +27,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
+import java.io.IOException;
public class FSUtils {
private static final Logger logger = LoggerFactory.getLogger(FSUtils.class);
-
+ private static final TSFileConfig conf = TSFileDescriptor.getInstance().getConfig();
private static final FSType[] fsTypes = {FSType.OBJECT_STORAGE, FSType.HDFS};
public static final String[] fsPrefix = {"os://", "hdfs://"};
private static final String[] fsFileClassName = {
@@ -95,6 +98,20 @@ public class FSUtils {
return new FSPath(type, path);
}
+ public static FSPath parseLocalTsFile2OSFile(File lcoalFile) throws IOException {
+ String canonicalPath = lcoalFile.getCanonicalPath();
+ int startIdx = canonicalPath.lastIndexOf("sequence");
+ if (startIdx < 0) {
+ startIdx = canonicalPath.lastIndexOf("unsequence");
+ }
+ if (startIdx < 0) {
+ throw new IllegalArgumentException(canonicalPath + "isn't a TsFile path.");
+ }
+ return new FSPath(
+ FSType.OBJECT_STORAGE,
+ fsPrefix[0] + conf.getOSBucket() + "/" + canonicalPath.substring(startIdx));
+ }
+
public static boolean isLocal(String fsPath) {
return getFSType(fsPath) == FSType.LOCAL;
}