You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by he...@apache.org on 2023/05/11 08:57:45 UTC
[iotdb] branch tiered_storage updated: add OSFile
This is an automated email from the ASF dual-hosted git repository.
heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tiered_storage by this push:
new 2e027bacf3a add OSFile
2e027bacf3a is described below
commit 2e027bacf3aa87c098ebaf6587c1abb109dfe8fa
Author: HeimingZ <zh...@qq.com>
AuthorDate: Thu May 11 16:56:10 2023 +0800
add OSFile
---
.../java/org/apache/iotdb/os/cache/CacheEntry.java | 3 +-
.../os/cache/{CacheEntry.java => CacheKey.java} | 7 +-
.../org/apache/iotdb/os/cache/PersistentCache.java | 13 +-
.../org/apache/iotdb/os/fileSystem/OSFile.java | 77 +++++++++-
.../{cache/CacheEntry.java => io/IMetaData.java} | 11 +-
.../apache/iotdb/os/io/ObjectStorageConnector.java | 17 ++-
.../org/apache/iotdb/os/io/aws/S3MetaData.java | 24 ++-
.../iotdb/os/io/aws/S3ObjectStorageConnector.java | 105 ++++++++++---
.../fileInputFactory/OSFileInputFactory.java | 25 ++-
.../fileOutputFactory/OSFileOutputFactory.java | 2 +-
.../tsfile/fileSystem/fsFactory/FSFactory.java | 2 +-
.../tsfile/fileSystem/fsFactory/OSFSFactory.java | 168 +++++++++++++++++++--
12 files changed, 380 insertions(+), 74 deletions(-)
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheEntry.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheEntry.java
index c9ffdc5cbed..8a54fdee1ec 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheEntry.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheEntry.java
@@ -22,7 +22,6 @@ import java.io.File;
public class CacheEntry {
private File cacheFile;
- private long position;
- private int size;
+ // cached value, null when the value has been flushed
private byte[] value;
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheEntry.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheKey.java
similarity index 90%
copy from object-storage/src/main/java/org/apache/iotdb/os/cache/CacheEntry.java
copy to object-storage/src/main/java/org/apache/iotdb/os/cache/CacheKey.java
index c9ffdc5cbed..ead333c8c9b 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheEntry.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheKey.java
@@ -18,11 +18,10 @@
*/
package org.apache.iotdb.os.cache;
-import java.io.File;
+import org.apache.iotdb.os.fileSystem.OSFile;
-public class CacheEntry {
- private File cacheFile;
+public class CacheKey {
+ private OSFile osFile;
private long position;
private int size;
- private byte[] value;
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/PersistentCache.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/PersistentCache.java
index 3310ee25547..f7828096a36 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/PersistentCache.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/PersistentCache.java
@@ -18,17 +18,16 @@
*/
package org.apache.iotdb.os.cache;
-import java.io.File;
-import java.nio.channels.FileChannel;
-
public class PersistentCache {
private final String cacheDir;
- private File cacheFile;
- private FileChannel rwChannel;
- private FileChannel readChannel;
public PersistentCache(String cacheDir) {
this.cacheDir = cacheDir;
- this.cacheFile = cacheFile;
}
+
+ public byte[] get(CacheKey cacheKey) {
+ return null;
+ }
+
+ private void serialize(CacheKey cacheKey, byte[] cacheVal) {}
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
index 8012ee3808c..af57e10e314 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
@@ -27,10 +27,15 @@ import org.apache.iotdb.os.io.aws.S3ObjectStorageConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
import java.io.File;
import java.io.FileFilter;
import java.io.FilenameFilter;
import java.io.IOException;
+import java.io.InputStreamReader;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
@@ -159,8 +164,8 @@ public class OSFile extends File {
return connector.doesObjectExist(osUri);
} catch (ObjectStorageException e) {
logger.error("Fail to get object {}.", osUri, e);
+ return false;
}
- return false;
}
@Override
@@ -180,22 +185,42 @@ public class OSFile extends File {
@Override
public long lastModified() {
- return super.lastModified();
+ try {
+ return connector.getMetaData(osUri).lastModified();
+ } catch (ObjectStorageException e) {
+ logger.error("Fail to get lastModified of the object {}.", osUri, e);
+ return 0;
+ }
}
@Override
public long length() {
- return super.length();
+ try {
+ return connector.getMetaData(osUri).length();
+ } catch (ObjectStorageException e) {
+ logger.error("Fail to get length of the object {}.", osUri, e);
+ return 0;
+ }
}
@Override
public boolean createNewFile() throws IOException {
- return super.createNewFile();
+ try {
+ return connector.createNewEmptyObject(osUri);
+ } catch (ObjectStorageException e) {
+ logger.error("Fail to create new object {}.", osUri, e);
+ return false;
+ }
}
@Override
public boolean delete() {
- return super.delete();
+ try {
+ return connector.delete(osUri);
+ } catch (ObjectStorageException e) {
+ logger.error("Fail to delete object {}.", osUri, e);
+ return false;
+ }
}
@Override
@@ -240,7 +265,13 @@ public class OSFile extends File {
@Override
public boolean renameTo(File dest) {
- return super.renameTo(dest);
+ OSURI targetOSUri = ((OSFile) dest).osUri;
+ try {
+ return connector.renameTo(osUri, targetOSUri);
+ } catch (ObjectStorageException e) {
+ logger.error("Fail to rename object from {} to {}.", osUri, targetOSUri, e);
+ return false;
+ }
}
@Override
@@ -331,4 +362,38 @@ public class OSFile extends File {
public Path toPath() {
throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
}
+
+ public BufferedReader getBufferedReader() {
+ try {
+ return new BufferedReader(new InputStreamReader(connector.getInputStream(osUri)));
+ } catch (ObjectStorageException e) {
+ logger.error("Fail to open input stream for object {}.", osUri, e);
+ return null;
+ }
+ }
+
+ public BufferedWriter getBufferedWriter(boolean append) {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ public BufferedInputStream getBufferedInputStream() {
+ try {
+ return new BufferedInputStream(connector.getInputStream(osUri));
+ } catch (ObjectStorageException e) {
+ logger.error("Fail to open input stream for object {}.", osUri, e);
+ return null;
+ }
+ }
+
+ public BufferedOutputStream getBufferedOutputStream() {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ public File[] listFilesBySuffix(String fileFolder, String suffix) {
+ return null;
+ }
+
+ public File[] listFilesByPrefix(String fileFolder, String prefix) {
+ return null;
+ }
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheEntry.java b/object-storage/src/main/java/org/apache/iotdb/os/io/IMetaData.java
similarity index 82%
copy from object-storage/src/main/java/org/apache/iotdb/os/cache/CacheEntry.java
copy to object-storage/src/main/java/org/apache/iotdb/os/io/IMetaData.java
index c9ffdc5cbed..771f32f223e 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheEntry.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/io/IMetaData.java
@@ -16,13 +16,10 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.os.cache;
+package org.apache.iotdb.os.io;
-import java.io.File;
+public interface IMetaData {
+ long length();
-public class CacheEntry {
- private File cacheFile;
- private long position;
- private int size;
- private byte[] value;
+ long lastModified();
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java b/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
index 154417a4696..a29410e8a39 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
@@ -22,10 +22,21 @@ package org.apache.iotdb.os.io;
import org.apache.iotdb.os.exception.ObjectStorageException;
import org.apache.iotdb.os.fileSystem.OSURI;
+import java.io.File;
+import java.io.InputStream;
+
public interface ObjectStorageConnector {
- boolean doesObjectExist(OSURI osuri) throws ObjectStorageException;
+ boolean doesObjectExist(OSURI osUri) throws ObjectStorageException;
+
+ IMetaData getMetaData(OSURI osUri) throws ObjectStorageException;
+
+ boolean createNewEmptyObject(OSURI osUri) throws ObjectStorageException;
+
+ boolean delete(OSURI osUri) throws ObjectStorageException;
+
+ boolean renameTo(OSURI fromOSUri, OSURI toOSUri) throws ObjectStorageException;
- long size(String fileName) throws ObjectStorageException;
+ InputStream getInputStream(OSURI osUri) throws ObjectStorageException;
- void write(String sourceFile, String targetFileName) throws ObjectStorageException;
+ void putLocalFile(OSURI osUri, File lcoalFile) throws ObjectStorageException;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/OSFileOutputFactory.java b/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3MetaData.java
similarity index 66%
copy from tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/OSFileOutputFactory.java
copy to object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3MetaData.java
index 73409ed39a7..35746be1dd6 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/OSFileOutputFactory.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3MetaData.java
@@ -16,13 +16,27 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.tsfile.fileSystem.fileOutputFactory;
+package org.apache.iotdb.os.io.aws;
-import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
+import org.apache.iotdb.os.io.IMetaData;
+
+public class S3MetaData implements IMetaData {
+ private long length;
+
+ private long lastModified;
+
+ public S3MetaData(long length, long lastModified) {
+ this.length = length;
+ this.lastModified = lastModified;
+ }
+
+ @Override
+ public long length() {
+ return length;
+ }
-public class OSFileOutputFactory implements FileOutputFactory {
@Override
- public TsFileOutput getTsFileOutput(String filePath, boolean append) {
- return null;
+ public long lastModified() {
+ return lastModified;
}
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java b/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
index f294daf82ac..7f7f8004b6d 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
@@ -21,21 +21,24 @@ package org.apache.iotdb.os.io.aws;
import org.apache.iotdb.os.exception.ObjectStorageException;
import org.apache.iotdb.os.fileSystem.OSURI;
+import org.apache.iotdb.os.io.IMetaData;
import org.apache.iotdb.os.io.ObjectStorageConnector;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectRequest;
+import software.amazon.awssdk.services.s3.model.DeleteObjectResponse;
+import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
-import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
-import software.amazon.awssdk.services.s3.model.S3Object;
import java.io.File;
-import java.util.List;
+import java.io.InputStream;
public class S3ObjectStorageConnector implements ObjectStorageConnector {
private S3Client s3Client;
@@ -51,38 +54,94 @@ public class S3ObjectStorageConnector implements ObjectStorageConnector {
@Override
public boolean doesObjectExist(OSURI osUri) throws ObjectStorageException {
try {
- HeadObjectRequest objectRequest =
- HeadObjectRequest.builder().key(osUri.getKey()).bucket(osUri.getBucket()).build();
- s3Client.headObject(objectRequest);
+ HeadObjectRequest req =
+ HeadObjectRequest.builder().bucket(osUri.getBucket()).key(osUri.getKey()).build();
+ s3Client.headObject(req);
+ return true;
} catch (NoSuchKeyException e) {
return false;
} catch (S3Exception e) {
throw new ObjectStorageException(e);
}
- return true;
}
- public long size(String fileName) throws ObjectStorageException {
- ListObjectsV2Request listObjectsRequest =
- ListObjectsV2Request.builder().bucket(AWSS3Config.getBucketName()).prefix(fileName).build();
- ListObjectsV2Response res = s3Client.listObjectsV2(listObjectsRequest);
- List<S3Object> objects = res.contents();
- if (objects.size() != 1) {
- throw new ObjectStorageException(
- String.format("expected 1 S3Object with prefix %s but get %d", fileName, objects.size()));
+ @Override
+ public IMetaData getMetaData(OSURI osUri) throws ObjectStorageException {
+ try {
+ HeadObjectRequest req =
+ HeadObjectRequest.builder().bucket(osUri.getBucket()).key(osUri.getKey()).build();
+ HeadObjectResponse resp = s3Client.headObject(req);
+ return new S3MetaData(resp.contentLength(), resp.lastModified().toEpochMilli());
+ } catch (S3Exception e) {
+ throw new ObjectStorageException(e);
+ }
+ }
+
+ @Override
+ public boolean createNewEmptyObject(OSURI osUri) throws ObjectStorageException {
+ try {
+ PutObjectRequest req =
+ PutObjectRequest.builder().bucket(osUri.getBucket()).key(osUri.getKey()).build();
+ s3Client.putObject(req, RequestBody.empty());
+ return true;
+ } catch (S3Exception e) {
+ throw new ObjectStorageException(e);
+ }
+ }
+
+ @Override
+ public boolean delete(OSURI osUri) throws ObjectStorageException {
+ try {
+ DeleteObjectRequest req =
+ DeleteObjectRequest.builder().bucket(osUri.getBucket()).key(osUri.getKey()).build();
+ DeleteObjectResponse resp = s3Client.deleteObject(req);
+ return resp.deleteMarker();
+ } catch (S3Exception e) {
+ throw new ObjectStorageException(e);
}
- return objects.get(0).size();
}
@Override
- public void write(String sourceFile, String targetFileName) throws ObjectStorageException {
+ public boolean renameTo(OSURI fromOSUri, OSURI toOSUri) throws ObjectStorageException {
try {
- PutObjectRequest putOb =
- PutObjectRequest.builder()
- .bucket(AWSS3Config.getBucketName())
- .key(targetFileName)
+ CopyObjectRequest copyReq =
+ CopyObjectRequest.builder()
+ .sourceBucket(fromOSUri.getBucket())
+ .sourceKey(fromOSUri.getKey())
+ .destinationBucket(toOSUri.getBucket())
+ .destinationKey(toOSUri.getKey())
+ .build();
+ s3Client.copyObject(copyReq);
+
+ DeleteObjectRequest deleteReq =
+ DeleteObjectRequest.builder()
+ .bucket(fromOSUri.getBucket())
+ .key(fromOSUri.getKey())
.build();
- s3Client.putObject(putOb, RequestBody.fromFile(new File(sourceFile)));
+ DeleteObjectResponse resp = s3Client.deleteObject(deleteReq);
+ return resp.deleteMarker();
+ } catch (S3Exception e) {
+ throw new ObjectStorageException(e);
+ }
+ }
+
+ @Override
+ public InputStream getInputStream(OSURI osUri) throws ObjectStorageException {
+ try {
+ GetObjectRequest req =
+ GetObjectRequest.builder().bucket(osUri.getBucket()).key(osUri.getKey()).build();
+ return s3Client.getObject(req);
+ } catch (S3Exception e) {
+ throw new ObjectStorageException(e);
+ }
+ }
+
+ @Override
+ public void putLocalFile(OSURI osUri, File lcoalFile) throws ObjectStorageException {
+ try {
+ PutObjectRequest req =
+ PutObjectRequest.builder().bucket(osUri.getBucket()).key(osUri.getKey()).build();
+ s3Client.putObject(req, RequestBody.fromFile(lcoalFile));
} catch (S3Exception e) {
throw new ObjectStorageException(e);
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/OSFileInputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/OSFileInputFactory.java
index 05793508c7e..d020331d4c7 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/OSFileInputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileInputFactory/OSFileInputFactory.java
@@ -24,12 +24,35 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
public class OSFileInputFactory implements FileInputFactory {
private static final Logger logger = LoggerFactory.getLogger(OSFileInputFactory.class);
+ private static final String OS_INPUT_CLASS_NAME = "org.apache.iotdb.os.fileSystem.OSInput";
+ private static Constructor constructor;
+
+ static {
+ try {
+ Class<?> clazz = Class.forName(OS_INPUT_CLASS_NAME);
+ constructor = clazz.getConstructor(String.class);
+ } catch (ClassNotFoundException | NoSuchMethodException e) {
+ logger.error(
+ "Failed to get OSInput in object storage. Please check your dependency of object storage module.",
+ e);
+ }
+ }
@Override
public TsFileInput getTsFileInput(String filePath) throws IOException {
- return null;
+ try {
+ return (TsFileInput) constructor.newInstance(filePath);
+ } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+ throw new IOException(
+ String.format(
+ "Failed to get TsFile input of file: %s. Please check your dependency of object storage module.",
+ filePath),
+ e);
+ }
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/OSFileOutputFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/OSFileOutputFactory.java
index 73409ed39a7..dfea67401cc 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/OSFileOutputFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fileOutputFactory/OSFileOutputFactory.java
@@ -23,6 +23,6 @@ import org.apache.iotdb.tsfile.write.writer.TsFileOutput;
public class OSFileOutputFactory implements FileOutputFactory {
@Override
public TsFileOutput getTsFileOutput(String filePath, boolean append) {
- return null;
+ throw new UnsupportedOperationException("Cannot directly write to object storage.");
}
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
index 2df88cada38..8029bcca1d5 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/FSFactory.java
@@ -137,6 +137,6 @@ public interface FSFactory {
*/
boolean deleteIfExists(File file) throws IOException;
- /** Force delete the directory */
+ /** TODO(zhm) Force delete the directory */
void deleteDirectory(String dir) throws IOException;
}
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
index 5ab3028f768..eb2b4c837ec 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/fileSystem/fsFactory/OSFSFactory.java
@@ -27,72 +27,212 @@ import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.net.URI;
public class OSFSFactory implements FSFactory {
private static final Logger logger = LoggerFactory.getLogger(OSFSFactory.class);
+ private static final String OS_FILE_CLASS_NAME = "org.apache.iotdb.os.fileSystem.OSFile";
+
+ private static Constructor constructorWithPathname;
+ private static Constructor constructorWithParentStringAndChild;
+ private static Constructor constructorWithParentFileAndChild;
+ private static Constructor constructorWithUri;
+ private static Method getBufferedReader;
+ private static Method getBufferedWriter;
+ private static Method getBufferedInputStream;
+ private static Method getBufferedOutputStream;
+ private static Method listFilesBySuffix;
+ private static Method listFilesByPrefix;
+ private static Method renameTo;
+
+ static {
+ try {
+ Class<?> clazz = Class.forName(OS_FILE_CLASS_NAME);
+ constructorWithPathname = clazz.getConstructor(String.class);
+ constructorWithParentStringAndChild = clazz.getConstructor(String.class, String.class);
+ constructorWithParentFileAndChild = clazz.getConstructor(File.class, String.class);
+ constructorWithUri = clazz.getConstructor(URI.class);
+ getBufferedReader = clazz.getMethod("getBufferedReader");
+ getBufferedWriter = clazz.getMethod("getBufferedWriter", boolean.class);
+ getBufferedInputStream = clazz.getMethod("getBufferedInputStream");
+ getBufferedOutputStream = clazz.getMethod("getBufferedOutputStream");
+ listFilesBySuffix = clazz.getMethod("listFilesBySuffix", String.class, String.class);
+ listFilesByPrefix = clazz.getMethod("listFilesByPrefix", String.class, String.class);
+ renameTo = clazz.getMethod("renameTo", File.class);
+ } catch (ClassNotFoundException | NoSuchMethodException e) {
+ logger.error(
+ "Failed to get object storage. Please check your dependency of object storage module.",
+ e);
+ }
+ }
@Override
public File getFileWithParent(String pathname) {
- return null;
+ try {
+ return (File) constructorWithPathname.newInstance(pathname);
+ } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+ logger.error(
+ "Failed to get file: {}. Please check your dependency of object storage module.",
+ pathname,
+ e);
+ return null;
+ }
}
@Override
public File getFile(String pathname) {
- return null;
+ try {
+ return (File) constructorWithPathname.newInstance(pathname);
+ } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+ logger.error(
+ "Failed to get file: {}. Please check your dependency of Hadoop module.", pathname, e);
+ return null;
+ }
}
@Override
public File getFile(String parent, String child) {
- return null;
+ try {
+ return (File) constructorWithParentStringAndChild.newInstance(parent, child);
+ } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+ logger.error(
+ "Failed to get file: {}. Please check your dependency of Hadoop module.",
+ parent + File.separator + child,
+ e);
+ return null;
+ }
}
@Override
public File getFile(File parent, String child) {
- return null;
+ try {
+ return (File) constructorWithParentFileAndChild.newInstance(parent, child);
+ } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+ logger.error(
+ "Failed to get file: {}. Please check your dependency of Hadoop module.",
+ parent.getAbsolutePath() + File.separator + child,
+ e);
+ return null;
+ }
}
@Override
public File getFile(URI uri) {
- return null;
+ try {
+ return (File) constructorWithUri.newInstance(uri);
+ } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+ logger.error(
+ "Failed to get file: {}. Please check your dependency of object storage module.", uri, e);
+ return null;
+ }
}
@Override
public BufferedReader getBufferedReader(String filePath) {
- return null;
+ try {
+ return (BufferedReader)
+ getBufferedReader.invoke(constructorWithPathname.newInstance(filePath));
+ } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+ logger.error(
+ "Failed to get buffered reader for {}. Please check your dependency of object storage module.",
+ filePath,
+ e);
+ return null;
+ }
}
@Override
public BufferedWriter getBufferedWriter(String filePath, boolean append) {
- return null;
+ try {
+ return (BufferedWriter)
+ getBufferedWriter.invoke(constructorWithPathname.newInstance(filePath), append);
+ } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+ logger.error(
+ "Failed to get buffered writer for {}. Please check your dependency of object storage module.",
+ filePath,
+ e);
+ return null;
+ }
}
@Override
public BufferedInputStream getBufferedInputStream(String filePath) {
- return null;
+ try {
+ return (BufferedInputStream)
+ getBufferedInputStream.invoke(constructorWithPathname.newInstance(filePath));
+ } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+ logger.error(
+ "Failed to get buffered input stream for {}. Please check your dependency of object storage module.",
+ filePath,
+ e);
+ return null;
+ }
}
@Override
public BufferedOutputStream getBufferedOutputStream(String filePath) {
- return null;
+ try {
+ return (BufferedOutputStream)
+ getBufferedOutputStream.invoke(constructorWithPathname.newInstance(filePath));
+ } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+ logger.error(
+ "Failed to get buffered output stream for {}. Please check your dependency of object storage module.",
+ filePath,
+ e);
+ return null;
+ }
}
@Override
- public void moveFile(File srcFile, File destFile) {}
+ public void moveFile(File srcFile, File destFile) {
+ try {
+ renameTo.invoke(constructorWithPathname.newInstance(srcFile.getAbsolutePath()), destFile);
+ } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+ logger.error(
+ "Failed to rename file from {} to {}. Please check your dependency of object storage module.",
+ srcFile.getName(),
+ destFile.getName());
+ }
+ }
@Override
public File[] listFilesBySuffix(String fileFolder, String suffix) {
- return new File[0];
+ try {
+ return (File[])
+ listFilesBySuffix.invoke(
+ constructorWithPathname.newInstance(fileFolder), fileFolder, suffix);
+ } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+ logger.error(
+ "Failed to list files in {} with SUFFIX {}. Please check your dependency of object storage module.",
+ fileFolder,
+ suffix,
+ e);
+ return null;
+ }
}
@Override
public File[] listFilesByPrefix(String fileFolder, String prefix) {
- return new File[0];
+ try {
+ return (File[])
+ listFilesByPrefix.invoke(
+ constructorWithPathname.newInstance(fileFolder), fileFolder, prefix);
+ } catch (InstantiationException | InvocationTargetException | IllegalAccessException e) {
+ logger.error(
+ "Failed to list files in {} with PREFIX {}. Please check your dependency of object storage module.",
+ fileFolder,
+ prefix,
+ e);
+ return null;
+ }
}
@Override
- public boolean deleteIfExists(File file) throws IOException {
- return false;
+ public boolean deleteIfExists(File file) {
+ return file.delete();
}
@Override