You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/12/26 03:10:59 UTC
[incubator-doris] branch master updated: [feature](broker) support ks3 for kmr in ksyun (#7484)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 755e069 [feature](broker) support ks3 for kmr in ksyun (#7484)
755e069 is described below
commit 755e0693b931e81380b22cc8a4472fe14cb6823e
Author: xuzifu666 <12...@qq.com>
AuthorDate: Sun Dec 26 11:10:47 2021 +0800
[feature](broker) support ks3 for kmr in ksyun (#7484)
---
.../doris/broker/hdfs/FileSystemManager.java | 70 +++++++++++++++++++++-
1 file changed, 69 insertions(+), 1 deletion(-)
diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 6cedd70..f3cfb8d 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -66,6 +66,7 @@ public class FileSystemManager {
// supported scheme
private static final String HDFS_SCHEME = "hdfs";
private static final String S3A_SCHEME = "s3a";
+ private static final String KS3_SCHEME = "ks3";
private static final String USER_NAME_KEY = "username";
private static final String PASSWORD_KEY = "password";
@@ -96,6 +97,14 @@ public class FileSystemManager {
// This property is used like 'fs.hdfs.impl.disable.cache'
private static final String FS_S3A_IMPL_DISABLE_CACHE = "fs.s3a.impl.disable.cache";
+ // arguments for ks3
+ private static final String FS_KS3_ACCESS_KEY = "fs.ks3.AccessKey";
+ private static final String FS_KS3_SECRET_KEY = "fs.ks3.AccessSecret";
+ private static final String FS_KS3_ENDPOINT = "fs.ks3.endpoint";
+ private static final String FS_KS3_IMPL = "fs.ks3.impl";
+ // This property is used like 'fs.ks3.impl.disable.cache'
+ private static final String FS_KS3_IMPL_DISABLE_CACHE = "fs.ks3.impl.disable.cache";
+
private ScheduledExecutorService handleManagementPool = Executors.newScheduledThreadPool(2);
private int readBufferSize = 128 << 10; // 128k
@@ -152,7 +161,9 @@ public class FileSystemManager {
brokerFileSystem = getDistributedFileSystem(path, properties);
} else if (scheme.equals(S3A_SCHEME)) {
brokerFileSystem = getS3AFileSystem(path, properties);
- } else {
+ } else if (scheme.equals(KS3_SCHEME)) {
+ brokerFileSystem = getKS3FileSystem(path, properties);
+ }else {
throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
"invalid path. scheme is not supported");
}
@@ -421,6 +432,63 @@ public class FileSystemManager {
}
}
+ /**
+ * visible for test
+ * <p>
+ * file system handle is cached, the identity is endpoint + bucket + accessKey_secretKey
+ *
+ * @param path
+ * @param properties
+ * @return
+ * @throws URISyntaxException
+ * @throws Exception
+ */
+ public BrokerFileSystem getKS3FileSystem(String path, Map<String, String> properties) {
+ WildcardURI pathUri = new WildcardURI(path);
+ String accessKey = properties.getOrDefault(FS_KS3_ACCESS_KEY, "");
+ String secretKey = properties.getOrDefault(FS_KS3_SECRET_KEY, "");
+ String endpoint = properties.getOrDefault(FS_KS3_ENDPOINT, "");
+ String disableCache = properties.getOrDefault(FS_KS3_IMPL_DISABLE_CACHE, "true");
+ // endpoint is the server host, pathUri.getUri().getHost() is the bucket
+ // we should use these two params as the host identity, because FileSystem will cache both.
+ String host = KS3_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost();
+ String ks3aUgi = accessKey + "," + secretKey;
+ FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, ks3aUgi);
+ BrokerFileSystem fileSystem = null;
+ cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity));
+ fileSystem = cachedFileSystem.get(fileSystemIdentity);
+ if (fileSystem == null) {
+ // it means it is removed concurrently by checker thread
+ return null;
+ }
+ fileSystem.getLock().lock();
+ try {
+ if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
+ // this means the file system is closed by file system checker thread
+ // it is a corner case
+ return null;
+ }
+ if (fileSystem.getDFSFileSystem() == null) {
+ logger.info("could not find file system for path " + path + " create a new one");
+ // create a new filesystem
+ Configuration conf = new Configuration();
+ conf.set(FS_KS3_ACCESS_KEY, accessKey);
+ conf.set(FS_KS3_SECRET_KEY, secretKey);
+ conf.set(FS_KS3_ENDPOINT, endpoint);
+ conf.set(FS_KS3_IMPL, "com.ksyun.kmr.hadoop.fs.ks3.Ks3FileSystem");
+ conf.set(FS_KS3_IMPL_DISABLE_CACHE, disableCache);
+ FileSystem ks3FileSystem = FileSystem.get(pathUri.getUri(), conf);
+ fileSystem.setFileSystem(ks3FileSystem);
+ }
+ return fileSystem;
+ } catch (Exception e) {
+ logger.error("errors while connect to " + path, e);
+ throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+ } finally {
+ fileSystem.getLock().unlock();
+ }
+ }
+
public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> properties) {
List<TBrokerFileStatus> resultFileStatus = null;
WildcardURI pathUri = new WildcardURI(path);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org