You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/04/12 07:37:36 UTC

[incubator-doris] branch branch-0.15 updated: [Feature] add support for tencent chdfs (#8964)

This is an automated email from the ASF dual-hosted git repository.

lide pushed a commit to branch branch-0.15
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/branch-0.15 by this push:
     new a90302b549 [Feature] add support for tencent chdfs (#8964)
a90302b549 is described below

commit a90302b549617d854f6d9792b465b0b31c7b723f
Author: wucheng <wu...@foxmail.com>
AuthorDate: Tue Apr 12 15:37:31 2022 +0800

    [Feature] add support for tencent chdfs (#8964)
    
    Co-authored-by: chengwu <ch...@tencent.com>
---
 .../java/org/apache/doris/analysis/ExportStmt.java |   5 +-
 .../org/apache/doris/analysis/StorageBackend.java  |   3 +
 .../doris/broker/hdfs/FileSystemManager.java       | 135 ++++++++++++++++++++-
 gensrc/thrift/Types.thrift                         |   1 +
 4 files changed, 141 insertions(+), 3 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index 275dc51bf1..095dbbd68a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -238,8 +238,9 @@ public class ExportStmt extends StatementBase {
             String schema = uri.getScheme();
             if (type == StorageBackend.StorageType.BROKER) {
                 if (schema == null || (!schema.equalsIgnoreCase("bos") && !schema.equalsIgnoreCase("afs")
-                    && !schema.equalsIgnoreCase("hdfs"))) {
-                    throw new AnalysisException("Invalid export path. please use valid 'HDFS://', 'AFS://' or 'BOS://' path.");
+                        && !schema.equalsIgnoreCase("hdfs") && !schema.equalsIgnoreCase("ofs"))) {
+                    throw new AnalysisException("Invalid export path. please use valid 'HDFS://', 'AFS://' , 'BOS://', "
+                            + "or 'ofs://' path.");
                 }
             } else if (type == StorageBackend.StorageType.S3) {
                 if (schema == null || !schema.equalsIgnoreCase("s3")) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
index 933153b728..70b10b4f06 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
@@ -111,6 +111,7 @@ public class StorageBackend extends StorageDesc implements ParseNode {
     public enum StorageType {
         BROKER("Doris Broker"),
         S3("Amazon S3 Simple Storage Service"),
+        OFS("Tencent CHDFS"),
         // the following is not used currently
         HDFS("Hadoop Distributed File System"),
         LOCAL("Local file system");
@@ -132,6 +133,8 @@ public class StorageBackend extends StorageDesc implements ParseNode {
                     return TStorageBackendType.S3;
                 case HDFS:
                     return TStorageBackendType.HDFS;
+                case OFS:
+                    return TStorageBackendType.OFS;
                 case LOCAL:
                     return TStorageBackendType.LOCAL;
                 default:
diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 6cedd70449..526c914c7e 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -66,6 +66,7 @@ public class FileSystemManager {
     // supported scheme
     private static final String HDFS_SCHEME = "hdfs";
     private static final String S3A_SCHEME = "s3a";
+    private static final String CHDFS_SCHEME = "ofs";
 
     private static final String USER_NAME_KEY = "username";
     private static final String PASSWORD_KEY = "password";
@@ -152,7 +153,9 @@ public class FileSystemManager {
             brokerFileSystem = getDistributedFileSystem(path, properties);
         } else if (scheme.equals(S3A_SCHEME)) {
             brokerFileSystem = getS3AFileSystem(path, properties);
-        } else {
+        } else if (scheme.equals(CHDFS_SCHEME)) {
+            brokerFileSystem = getChdfsFileSystem(path, properties);
+        }else {
             throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
                 "invalid path. scheme is not supported");
         }
@@ -421,6 +424,136 @@ public class FileSystemManager {
         }
     }
 
+    /**
+     * visible for test
+     *
+     * file system handle is cached, the identity is for all chdfs.
+     * @param path
+     * @param properties
+     * @return
+     * @throws URISyntaxException
+     * @throws Exception
+     */
+    public BrokerFileSystem getChdfsFileSystem(String path, Map<String, String> properties) {
+        WildcardURI pathUri = new WildcardURI(path);
+        String host = CHDFS_SCHEME;
+        String authentication = properties.getOrDefault(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+                AUTHENTICATION_SIMPLE);
+        if (Strings.isNullOrEmpty(authentication) || (!authentication.equals(AUTHENTICATION_SIMPLE)
+                && !authentication.equals(AUTHENTICATION_KERBEROS))) {
+            logger.warn("invalid authentication:" + authentication);
+            throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                    "invalid authentication:" + authentication);
+        }
+
+        FileSystemIdentity fileSystemIdentity = null;
+        if (authentication.equals(AUTHENTICATION_SIMPLE)) {
+            fileSystemIdentity = new FileSystemIdentity(host, "");
+        } else {
+            // for kerberos, use host + principal + keytab as filesystemindentity
+            String kerberosContent = "";
+            if (properties.containsKey(KERBEROS_KEYTAB)) {
+                kerberosContent = properties.get(KERBEROS_KEYTAB);
+            } else if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) {
+                kerberosContent = properties.get(KERBEROS_KEYTAB_CONTENT);
+            } else {
+                throw  new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                        "keytab is required for kerberos authentication");
+            }
+            if (!properties.containsKey(KERBEROS_PRINCIPAL)) {
+                throw  new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                        "principal is required for kerberos authentication");
+            } else {
+                kerberosContent = kerberosContent + properties.get(KERBEROS_PRINCIPAL);
+            }
+            try {
+                MessageDigest digest = MessageDigest.getInstance("md5");
+                byte[] result = digest.digest(kerberosContent.getBytes());
+                String kerberosUgi = new String(result);
+                fileSystemIdentity = new FileSystemIdentity(host, kerberosUgi);
+            } catch (NoSuchAlgorithmException e) {
+                throw  new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                        e.getMessage());
+            }
+        }
+
+
+        BrokerFileSystem fileSystem = null;
+        cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity));
+        fileSystem = cachedFileSystem.get(fileSystemIdentity);
+        if (fileSystem == null) {
+            // it means it is removed concurrently by checker thread
+            return null;
+        }
+        fileSystem.getLock().lock();
+
+        try {
+            if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
+                // this means the file system is closed by file system checker thread
+                // it is a corner case
+                return null;
+            }
+            // create a new filesystem
+            Configuration conf = new Configuration();
+            for (Map.Entry<String, String> propElement : properties.entrySet()) {
+                conf.set(propElement.getKey(), propElement.getValue());
+            }
+
+            if (fileSystem.getDFSFileSystem() == null) {
+                logger.info("create file system for new path " + path);
+                String tmpFilePath = null;
+                if (authentication.equals(AUTHENTICATION_KERBEROS)){
+                    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+                            AUTHENTICATION_KERBEROS);
+
+                    String principal = preparePrincipal(properties.get(KERBEROS_PRINCIPAL));
+                    String keytab = "";
+                    if (properties.containsKey(KERBEROS_KEYTAB)) {
+                        keytab = properties.get(KERBEROS_KEYTAB);
+                    } else if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) {
+                        // pass kerberos keytab content use base64 encoding
+                        // so decode it and write it to tmp path under /tmp
+                        // because ugi api only accept a local path as argument
+                        String keytab_content = properties.get(KERBEROS_KEYTAB_CONTENT);
+                        byte[] base64decodedBytes = Base64.getDecoder().decode(keytab_content);
+                        long currentTime = System.currentTimeMillis();
+                        Random random = new Random(currentTime);
+                        int randNumber = random.nextInt(10000);
+                        tmpFilePath = "/tmp/." + Long.toString(currentTime) + "_" + Integer.toString(randNumber);
+                        FileOutputStream fileOutputStream = new FileOutputStream(tmpFilePath);
+                        fileOutputStream.write(base64decodedBytes);
+                        fileOutputStream.close();
+                        keytab = tmpFilePath;
+                    } else {
+                        throw  new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                                "keytab is required for kerberos authentication");
+                    }
+                    UserGroupInformation.setConfiguration(conf);
+                    UserGroupInformation.loginUserFromKeytab(principal, keytab);
+                    if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) {
+                        try {
+                            File file = new File(tmpFilePath);
+                            if(!file.delete()){
+                                logger.warn("delete tmp file:" +  tmpFilePath + " failed");
+                            }
+                        } catch (Exception e) {
+                            throw new  BrokerException(TBrokerOperationStatusCode.FILE_NOT_FOUND,
+                                    e.getMessage());
+                        }
+                    }
+                }
+                FileSystem chdfsFileSystem = FileSystem.get(pathUri.getUri(), conf);
+                fileSystem.setFileSystem(chdfsFileSystem);
+            }
+            return fileSystem;
+        } catch (Exception e) {
+            logger.error("errors while connect to " + path, e);
+            throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+        } finally {
+            fileSystem.getLock().unlock();
+        }
+    }
+
     public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> properties) {
         List<TBrokerFileStatus> resultFileStatus = null;
         WildcardURI pathUri = new WildcardURI(path);
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 8c9a46ff48..ac0838b4db 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -91,6 +91,7 @@ enum TTypeNodeType {
 enum TStorageBackendType {
     BROKER,
     S3,
+    OFS,
     HDFS,
     LOCAL
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org