You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2021/12/26 03:10:59 UTC

[incubator-doris] branch master updated: [feature](broker) support ks3 for kmr in ksyun (#7484)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 755e069  [feature](broker) support ks3 for kmr in ksyun (#7484)
755e069 is described below

commit 755e0693b931e81380b22cc8a4472fe14cb6823e
Author: xuzifu666 <12...@qq.com>
AuthorDate: Sun Dec 26 11:10:47 2021 +0800

    [feature](broker) support ks3 for kmr in ksyun (#7484)
---
 .../doris/broker/hdfs/FileSystemManager.java       | 70 +++++++++++++++++++++-
 1 file changed, 69 insertions(+), 1 deletion(-)

diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 6cedd70..f3cfb8d 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -66,6 +66,7 @@ public class FileSystemManager {
     // supported scheme
     private static final String HDFS_SCHEME = "hdfs";
     private static final String S3A_SCHEME = "s3a";
+    private static final String KS3_SCHEME = "ks3";
 
     private static final String USER_NAME_KEY = "username";
     private static final String PASSWORD_KEY = "password";
@@ -96,6 +97,14 @@ public class FileSystemManager {
     // This property is used like 'fs.hdfs.impl.disable.cache'
     private static final String FS_S3A_IMPL_DISABLE_CACHE = "fs.s3a.impl.disable.cache";
 
+    // arguments for ks3
+    private static final String FS_KS3_ACCESS_KEY = "fs.ks3.AccessKey";
+    private static final String FS_KS3_SECRET_KEY = "fs.ks3.AccessSecret";
+    private static final String FS_KS3_ENDPOINT = "fs.ks3.endpoint";
+    private static final String FS_KS3_IMPL = "fs.ks3.impl";
+    // This property is used like 'fs.ks3.impl.disable.cache'
+    private static final String FS_KS3_IMPL_DISABLE_CACHE = "fs.ks3.impl.disable.cache";
+
     private ScheduledExecutorService handleManagementPool = Executors.newScheduledThreadPool(2);
     
     private int readBufferSize = 128 << 10; // 128k
@@ -152,7 +161,9 @@ public class FileSystemManager {
             brokerFileSystem = getDistributedFileSystem(path, properties);
         } else if (scheme.equals(S3A_SCHEME)) {
             brokerFileSystem = getS3AFileSystem(path, properties);
-        } else {
+        } else if (scheme.equals(KS3_SCHEME)) {
+            brokerFileSystem = getKS3FileSystem(path, properties);
+        }else {
             throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
                 "invalid path. scheme is not supported");
         }
@@ -421,6 +432,63 @@ public class FileSystemManager {
         }
     }
 
+    /**
+     * visible for test
+     * <p>
+     * file system handle is cached, the identity is endpoint + bucket + accessKey_secretKey
+     *
+     * @param path
+     * @param properties
+     * @return
+     * @throws URISyntaxException
+     * @throws Exception
+     */
+    public BrokerFileSystem getKS3FileSystem(String path, Map<String, String> properties) {
+        WildcardURI pathUri = new WildcardURI(path);
+        String accessKey = properties.getOrDefault(FS_KS3_ACCESS_KEY, "");
+        String secretKey = properties.getOrDefault(FS_KS3_SECRET_KEY, "");
+        String endpoint = properties.getOrDefault(FS_KS3_ENDPOINT, "");
+        String disableCache = properties.getOrDefault(FS_KS3_IMPL_DISABLE_CACHE, "true");
+        // endpoint is the server host, pathUri.getUri().getHost() is the bucket
+        // we should use these two params as the host identity, because FileSystem will cache both.
+        String host = KS3_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost();
+        String ks3aUgi = accessKey + "," + secretKey;
+        FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, ks3aUgi);
+        BrokerFileSystem fileSystem = null;
+        cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity));
+        fileSystem = cachedFileSystem.get(fileSystemIdentity);
+        if (fileSystem == null) {
+            // it means it is removed concurrently by checker thread
+            return null;
+        }
+        fileSystem.getLock().lock();
+        try {
+            if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
+                // this means the file system is closed by file system checker thread
+                // it is a corner case
+                return null;
+            }
+            if (fileSystem.getDFSFileSystem() == null) {
+                logger.info("could not find file system for path " + path + " create a new one");
+                // create a new filesystem
+                Configuration conf = new Configuration();
+                conf.set(FS_KS3_ACCESS_KEY, accessKey);
+                conf.set(FS_KS3_SECRET_KEY, secretKey);
+                conf.set(FS_KS3_ENDPOINT, endpoint);
+                conf.set(FS_KS3_IMPL, "com.ksyun.kmr.hadoop.fs.ks3.Ks3FileSystem");
+                conf.set(FS_KS3_IMPL_DISABLE_CACHE, disableCache);
+                FileSystem ks3FileSystem = FileSystem.get(pathUri.getUri(), conf);
+                fileSystem.setFileSystem(ks3FileSystem);
+            }
+            return fileSystem;
+        } catch (Exception e) {
+            logger.error("errors while connect to " + path, e);
+            throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+        } finally {
+            fileSystem.getLock().unlock();
+        }
+    }
+
     public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> properties) {
         List<TBrokerFileStatus> resultFileStatus = null;
         WildcardURI pathUri = new WildcardURI(path);

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org