You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by he...@apache.org on 2023/05/11 06:33:06 UTC
[iotdb] branch tiered_storage updated: add OSURI
This is an automated email from the ASF dual-hosted git repository.
heiming pushed a commit to branch tiered_storage
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/tiered_storage by this push:
new 5c2f4b92613 add OSURI
5c2f4b92613 is described below
commit 5c2f4b9261300626dce5e0f0c8f3a2c6b430cd63
Author: HeimingZ <zh...@qq.com>
AuthorDate: Thu May 11 14:32:48 2023 +0800
add OSURI
---
object-storage/pom.xml | 6 +-
.../org/apache/iotdb/os/cache/CacheManger.java | 27 +-
.../org/apache/iotdb/os/cache/PersistentCache.java | 13 +-
.../apache/iotdb/os/conf/ObjectStorageConfig.java | 16 ++
.../iotdb/os/conf/ObjectStorageDescriptor.java | 14 +
.../org/apache/iotdb/os/fileSystem/OSFile.java | 289 +++++++++++++++++++++
.../java/org/apache/iotdb/os/fileSystem/OSURI.java | 75 ++++++
...rageWriter.java => ObjectStorageConnector.java} | 5 +-
...geWriter.java => S3ObjectStorageConnector.java} | 26 +-
.../ObjectStorageConstant.java} | 6 +-
10 files changed, 457 insertions(+), 20 deletions(-)
diff --git a/object-storage/pom.xml b/object-storage/pom.xml
index ab510226314..c313b65bc53 100644
--- a/object-storage/pom.xml
+++ b/object-storage/pom.xml
@@ -41,7 +41,7 @@
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
- <version>2.16.1</version>
+ <version>2.20.1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
@@ -53,10 +53,6 @@
<artifactId>tsfile</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>com.github.ben-manes.caffeine</groupId>
- <artifactId>caffeine</artifactId>
- </dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheManger.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheManger.java
index 405516bcaf6..02175eb4ea7 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheManger.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheManger.java
@@ -18,4 +18,29 @@
*/
package org.apache.iotdb.os.cache;
-public class CacheManger {}
+import org.apache.iotdb.os.conf.ObjectStorageConfig;
+import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class CacheManger {
+ private final ObjectStorageConfig config = ObjectStorageDescriptor.getInstance().getConfig();
+ private final List<PersistentCache> caches = new ArrayList<>();
+
+ private CacheManger() {
+ for (String cacheDir : config.getCacheDirs()) {
+ caches.add(new PersistentCache(cacheDir));
+ }
+ }
+
+ public static CacheManger getInstance() {
+ return InstanceHolder.INSTANCE;
+ }
+
+ private static class InstanceHolder {
+ private InstanceHolder() {}
+
+ private static final CacheManger INSTANCE = new CacheManger();
+ }
+}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/PersistentCache.java b/object-storage/src/main/java/org/apache/iotdb/os/cache/PersistentCache.java
index 7ef1ebbbaeb..3310ee25547 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/PersistentCache.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/cache/PersistentCache.java
@@ -18,14 +18,17 @@
*/
package org.apache.iotdb.os.cache;
-import org.apache.iotdb.tsfile.file.metadata.ChunkMetadata;
-
-import com.github.benmanes.caffeine.cache.LoadingCache;
-
import java.io.File;
+import java.nio.channels.FileChannel;
public class PersistentCache {
+ private final String cacheDir;
private File cacheFile;
+ private FileChannel rwChannel;
+ private FileChannel readChannel;
- private LoadingCache<ChunkMetadata, CacheEntry> lruCache;
+ public PersistentCache(String cacheDir) {
+ this.cacheDir = cacheDir;
+ this.cacheFile = cacheFile;
+ }
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java b/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
index f383a45c9be..9db205ca472 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageConfig.java
@@ -37,4 +37,20 @@ public class ObjectStorageConfig {
private static long pageSize = 32 * 1024;
ObjectStorageConfig() {}
+
+ public static ObjectStorageType getOsType() {
+ return osType;
+ }
+
+ public static String[] getCacheDirs() {
+ return cacheDirs;
+ }
+
+ public static long getCacheSizeThreshold() {
+ return cacheSizeThreshold;
+ }
+
+ public static long getPageSize() {
+ return pageSize;
+ }
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageDescriptor.java b/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageDescriptor.java
index 7fcdeca5671..5c832c98409 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageDescriptor.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/conf/ObjectStorageDescriptor.java
@@ -20,4 +20,18 @@ package org.apache.iotdb.os.conf;
public class ObjectStorageDescriptor {
private final ObjectStorageConfig conf = new ObjectStorageConfig();
+
+ public ObjectStorageConfig getConfig() {
+ return conf;
+ }
+
+ public static ObjectStorageDescriptor getInstance() {
+ return InstanceHolder.INSTANCE;
+ }
+
+ private static class InstanceHolder {
+ private InstanceHolder() {}
+
+ private static final ObjectStorageDescriptor INSTANCE = new ObjectStorageDescriptor();
+ }
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
index 8b482413043..722500dae26 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSFile.java
@@ -18,28 +18,317 @@
*/
package org.apache.iotdb.os.fileSystem;
+import java.io.FileFilter;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.nio.file.Path;
+import org.apache.iotdb.os.conf.ObjectStorageConfig;
+import org.apache.iotdb.os.conf.ObjectStorageDescriptor;
+import org.apache.iotdb.os.exception.ObjectStorageException;
+import org.apache.iotdb.os.io.ObjectStorageConnector;
+import org.apache.iotdb.os.io.aws.S3ObjectStorageConnector;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.net.URI;
+import static org.apache.iotdb.os.utils.ObjectStorageConstant.FILE_SEPARATOR;
+
+
public class OSFile extends File {
private static final Logger logger = LoggerFactory.getLogger(OSFile.class);
+ private static final String UNSUPPORT_OPERATION = "Current object storage file doesn't support this operation.";
+ private static final ObjectStorageConfig config =
+ ObjectStorageDescriptor.getInstance().getConfig();
+ private static final ObjectStorageConnector connector;
+
+ static {
+ switch (config.getOsType()) {
+ case AWS_S3:
+ connector = new S3ObjectStorageConnector();
+ break;
+ default:
+ connector = null;
+ }
+ }
+
+ private final OSURI osUri;
public OSFile(String pathname) {
super(pathname);
+ this.osUri = new OSURI(pathname);
}
public OSFile(String parent, String child) {
super(parent, child);
+ this.osUri = new OSURI(parent + FILE_SEPARATOR + child);
}
public OSFile(File parent, String child) {
super(parent, child);
+ this.osUri = new OSURI(parent.toURI() + FILE_SEPARATOR + child);
}
public OSFile(URI uri) {
super(uri);
+ this.osUri = new OSURI(uri);
+ }
+
+ public OSFile(OSURI osUri) {
+ super(osUri.getURI());
+ this.osUri = osUri;
+ }
+
+ @Override
+ public String getName() {
+ return osUri.getKey();
+ }
+
+ @Override
+ public String getParent() {
+ File parent = getParentFile();
+ return parent == null ? null : parent.toString();
+ }
+
+ @Override
+ public File getParentFile() {
+ int lastSeparatorIdx = osUri.getKey().lastIndexOf(FILE_SEPARATOR);
+ if(lastSeparatorIdx <= 0) {
+ return null;
+ }
+ return new OSFile(new OSURI(osUri.getBucket(), osUri.getKey().substring(0, lastSeparatorIdx)));
+ }
+
+ @Override
+ public String getPath() {
+ return osUri.toString();
+ }
+
+ @Override
+ public boolean isAbsolute() {
+ return true;
+ }
+
+ @Override
+ public String getAbsolutePath() {
+ return osUri.toString();
+ }
+
+ @Override
+ public File getAbsoluteFile() {
+ return this;
+ }
+
+ @Override
+ public String getCanonicalPath() throws IOException {
+ return osUri.toString();
+ }
+
+ @Override
+ public File getCanonicalFile() throws IOException {
+ return this;
+ }
+
+ @Override
+ public URL toURL() throws MalformedURLException {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public URI toURI() {
+ return osUri.getURI();
+ }
+
+ @Override
+ public boolean canRead() {
+ return this.exists();
+ }
+
+ @Override
+ public boolean canWrite() {
+ return this.exists();
+ }
+
+ @Override
+ public boolean exists() {
+ try {
+ return connector.doesObjectExist(osUri);
+ } catch (ObjectStorageException e) {
+ logger.error("Fail to get object {}.", osUri, e);
+ }
+ return false;
+ }
+
+ @Override
+ public boolean isDirectory() {
+ return false;
+ }
+
+ @Override
+ public boolean isFile() {
+ return true;
+ }
+
+ @Override
+ public boolean isHidden() {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public long lastModified() {
+ return super.lastModified();
+ }
+
+ @Override
+ public long length() {
+ return super.length();
+ }
+
+ @Override
+ public boolean createNewFile() throws IOException {
+ return super.createNewFile();
+ }
+
+ @Override
+ public boolean delete() {
+ return super.delete();
+ }
+
+ @Override
+ public void deleteOnExit() {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public String[] list() {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public String[] list(FilenameFilter filter) {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public File[] listFiles() {
+ return super.listFiles();
+ }
+
+ @Override
+ public File[] listFiles(FilenameFilter filter) {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public File[] listFiles(FileFilter filter) {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public boolean mkdir() {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public boolean mkdirs() {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public boolean renameTo(File dest) {
+ return super.renameTo(dest);
+ }
+
+ @Override
+ public boolean setLastModified(long time) {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public boolean setReadOnly() {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public boolean setWritable(boolean writable, boolean ownerOnly) {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public boolean setWritable(boolean writable) {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public boolean setReadable(boolean readable, boolean ownerOnly) {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public boolean setReadable(boolean readable) {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public boolean setExecutable(boolean executable, boolean ownerOnly) {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public boolean setExecutable(boolean executable) {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public boolean canExecute() {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public long getTotalSpace() {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public long getFreeSpace() {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public long getUsableSpace() {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
+ }
+
+ @Override
+ public int compareTo(File pathname) {
+ return this.toString().compareTo(pathname.toString());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof OSFile)) {
+ return false;
+ }
+ OSFile other = (OSFile) obj;
+ return osUri.equals(other.osUri);
+ }
+
+ @Override
+ public int hashCode() {
+ return osUri.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return osUri.toString();
+ }
+
+ @Override
+ public Path toPath() {
+ throw new UnsupportedOperationException(UNSUPPORT_OPERATION);
}
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSURI.java b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSURI.java
new file mode 100644
index 00000000000..28e6b03a5cb
--- /dev/null
+++ b/object-storage/src/main/java/org/apache/iotdb/os/fileSystem/OSURI.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.os.fileSystem;
+
+import org.apache.iotdb.os.utils.ObjectStorageConstant;
+
+import java.net.URI;
+
+import static org.apache.iotdb.os.utils.ObjectStorageConstant.FILE_SEPARATOR;
+
+/** The OSURI format is os://{bucket}/{key} */
+public class OSURI {
+ public static final String SCHEME = "os";
+ public static final String OS_PREFIX = SCHEME + "://";
+ private final URI uri;
+
+ public OSURI(String bucket, String key) {
+ this(OS_PREFIX + bucket + FILE_SEPARATOR + key);
+ }
+
+ public OSURI(String path) {
+ this(URI.create(path));
+ }
+
+ public OSURI(URI uri) {
+ if (!uri.getScheme().equals(SCHEME)) {
+ throw new IllegalArgumentException();
+ }
+ this.uri = uri;
+ }
+
+ public URI getURI() {
+ return uri;
+ }
+
+ public String getBucket() {
+ return uri.getAuthority();
+ }
+
+ public String getKey() {
+ return uri.getPath().substring(ObjectStorageConstant.FILE_SEPARATOR.length());
+ }
+
+ public String toString() {
+ return uri.toString();
+ }
+
+ public boolean equals(Object obj) {
+ if (!(obj instanceof OSURI)) {
+ return false;
+ }
+ OSURI other = (OSURI) obj;
+ return uri.equals(other.uri);
+ }
+
+ public int hashCode() {
+ return uri.hashCode();
+ }
+}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageWriter.java b/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
similarity index 86%
rename from object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageWriter.java
rename to object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
index 3d39f912ae4..2e70268bdcd 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageWriter.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/io/ObjectStorageConnector.java
@@ -20,8 +20,11 @@
package org.apache.iotdb.os.io;
import org.apache.iotdb.os.exception.ObjectStorageException;
+import org.apache.iotdb.os.fileSystem.OSURI;
+
+public interface ObjectStorageConnector {
+ boolean doesObjectExist(OSURI osuri) throws ObjectStorageException;
-public interface ObjectStorageWriter {
void write(String sourceFile, String containerName, String targetFileName)
throws ObjectStorageException;
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageWriter.java b/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
similarity index 71%
rename from object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageWriter.java
rename to object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
index 6692c6bc358..31cd148bcc0 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageWriter.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/io/aws/S3ObjectStorageConnector.java
@@ -20,21 +20,23 @@
package org.apache.iotdb.os.io.aws;
import org.apache.iotdb.os.exception.ObjectStorageException;
-import org.apache.iotdb.os.io.ObjectStorageWriter;
+import org.apache.iotdb.os.fileSystem.OSURI;
+import org.apache.iotdb.os.io.ObjectStorageConnector;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.NoSuchKeyException;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;
import java.io.File;
-public class S3ObjectStorageWriter implements ObjectStorageWriter {
-
+public class S3ObjectStorageConnector implements ObjectStorageConnector {
private S3Client s3Client;
- public S3ObjectStorageWriter() {
+ public S3ObjectStorageConnector() {
s3Client =
S3Client.builder()
.region(Region.of(AWSS3Config.getRegion()))
@@ -42,6 +44,20 @@ public class S3ObjectStorageWriter implements ObjectStorageWriter {
.build();
}
+ @Override
+ public boolean doesObjectExist(OSURI osUri) throws ObjectStorageException {
+ try {
+ HeadObjectRequest objectRequest =
+ HeadObjectRequest.builder().key(osUri.getKey()).bucket(osUri.getBucket()).build();
+ s3Client.headObject(objectRequest);
+ } catch (NoSuchKeyException e) {
+ return false;
+ } catch (S3Exception e) {
+ throw new ObjectStorageException(e);
+ }
+ return true;
+ }
+
@Override
public void write(String sourceFile, String containerName, String targetFileName)
throws ObjectStorageException {
@@ -51,9 +67,7 @@ public class S3ObjectStorageWriter implements ObjectStorageWriter {
.bucket(AWSS3Config.getBucketName())
.key(targetFileName)
.build();
-
s3Client.putObject(putOb, RequestBody.fromFile(new File(sourceFile)));
-
} catch (S3Exception e) {
throw new ObjectStorageException(e);
}
diff --git a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheManger.java b/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageConstant.java
similarity index 87%
copy from object-storage/src/main/java/org/apache/iotdb/os/cache/CacheManger.java
copy to object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageConstant.java
index 405516bcaf6..553a1badd8e 100644
--- a/object-storage/src/main/java/org/apache/iotdb/os/cache/CacheManger.java
+++ b/object-storage/src/main/java/org/apache/iotdb/os/utils/ObjectStorageConstant.java
@@ -16,6 +16,8 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.iotdb.os.cache;
+package org.apache.iotdb.os.utils;
-public class CacheManger {}
+public class ObjectStorageConstant {
+ public static String FILE_SEPARATOR = "/";
+}