You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/23 04:11:51 UTC

[doris] 02/11: [Feature](multi-catalog)Add support for JuiceFS (#15969)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c9e68b7481f26e28f8077ebc7fd35fa95db53019
Author: Liqf <10...@users.noreply.github.com>
AuthorDate: Thu Jan 19 08:54:16 2023 +0800

    [Feature](multi-catalog)Add support for JuiceFS (#15969)
    
    The broker implements the interface to juicefs,It supports loading data from juicefs to doris through broker.
    At the same time, it also implements the multi catalog to read the hive data stored in juicefs
---
 docs/en/docs/advanced/broker.md                    |  15 ++-
 docs/zh-CN/docs/advanced/broker.md                 |  13 ++
 .../java/org/apache/doris/analysis/BrokerDesc.java |   1 +
 .../java/org/apache/doris/analysis/ExportStmt.java |   5 +-
 .../org/apache/doris/analysis/StorageBackend.java  |   3 +
 .../java/org/apache/doris/backup/BlobStorage.java  |   6 +-
 .../doris/catalog/HiveMetaStoreClientHelper.java   |   3 +-
 .../java/org/apache/doris/common/FeConstants.java  |   1 +
 .../org/apache/doris/planner/BrokerScanNode.java   |   3 +-
 .../org/apache/doris/planner/HiveScanNode.java     |   2 +
 .../doris/planner/external/HiveScanProvider.java   |   4 +-
 .../doris/broker/hdfs/FileSystemManager.java       | 132 ++++++++++++++++++++-
 gensrc/thrift/Types.thrift                         |   1 +
 13 files changed, 179 insertions(+), 10 deletions(-)

diff --git a/docs/en/docs/advanced/broker.md b/docs/en/docs/advanced/broker.md
index dd7ac2cf86..69ebe7af65 100644
--- a/docs/en/docs/advanced/broker.md
+++ b/docs/en/docs/advanced/broker.md
@@ -32,7 +32,8 @@ Broker is an optional process in the Doris cluster. It is mainly used to support
 - Aliyun OSS
 - Tencent Cloud CHDFS
 - Huawei Cloud OBS (since 1.2.0)
-- Amazon S3
+- Amazon S3 
+- JuiceFS (since master)
 
 Broker provides services through an RPC service port. It is a stateless JVM process that is responsible for encapsulating some POSIX-like file operations for read and write operations on remote storage, such as open, pred, pwrite, and so on.
 In addition, the Broker does not record any other information, so the connection information, file information, permission information, and so on stored remotely need to be passed to the Broker process in the RPC call through parameters in order for the Broker to read and write files correctly .
@@ -234,3 +235,15 @@ Same as Apache HDFS
     "fs.s3a.endpoint" = "xx"
 )
 ```
+
+#### JuiceFS
+
+```
+(
+    "fs.defaultFS" = "jfs://xxx/",
+    "fs.jfs.impl" = "io.juicefs.JuiceFileSystem",
+    "fs.AbstractFileSystem.jfs.impl" = "io.juicefs.JuiceFS",
+    "juicefs.meta" = "xxx",
+    "juicefs.access-log" = "xxx"
+)
+```
\ No newline at end of file
diff --git a/docs/zh-CN/docs/advanced/broker.md b/docs/zh-CN/docs/advanced/broker.md
index 8b95a5f497..59a7f2b2ca 100644
--- a/docs/zh-CN/docs/advanced/broker.md
+++ b/docs/zh-CN/docs/advanced/broker.md
@@ -33,6 +33,7 @@ Broker 是 Doris 集群中一种可选进程,主要用于支持 Doris 读写
 - 腾讯云 CHDFS
 - 华为云 OBS (1.2.0 版本后支持)
 - 亚马逊 S3
+- JuiceFS (master 版本支持)
 
 Broker 通过提供一个 RPC 服务端口来提供服务,是一个无状态的 Java 进程,负责为远端存储的读写操作封装一些类 POSIX 的文件操作,如 open,pread,pwrite 等等。除此之外,Broker 不记录任何其他信息,所以包括远端存储的连接信息、文件信息、权限信息等等,都需要通过参数在 RPC 调用中传递给 Broker 进程,才能使得 Broker 能够正确读写文件。
 
@@ -227,3 +228,15 @@ WITH BROKER "broker_name"
     "fs.s3a.endpoint" = "xx"
 )
 ```
+
+#### JuiceFS
+
+```
+(
+    "fs.defaultFS" = "jfs://xxx/",
+    "fs.jfs.impl" = "io.juicefs.JuiceFileSystem",
+    "fs.AbstractFileSystem.jfs.impl" = "io.juicefs.JuiceFS",
+    "juicefs.meta" = "xxx",
+    "juicefs.access-log" = "xxx"
+)
+```
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
index bac9e80b08..8f1fc5dfe4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/BrokerDesc.java
@@ -118,6 +118,7 @@ public class BrokerDesc extends StorageDesc implements Writable {
                 return TFileType.FILE_STREAM;
             case BROKER:
             case OFS:
+            case JFS:
             default:
                 return TFileType.FILE_BROKER;
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index 62fb2c9977..eb35fe4c0d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -243,9 +243,10 @@ public class ExportStmt extends StatementBase {
                     && !schema.equalsIgnoreCase("obs")
                     && !schema.equalsIgnoreCase("oss")
                     && !schema.equalsIgnoreCase("s3a")
-                    && !schema.equalsIgnoreCase("cosn"))) {
+                    && !schema.equalsIgnoreCase("cosn")
+                    && !schema.equalsIgnoreCase("jfs"))) {
                 throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'afs://' , 'bos://',"
-                        + " 'ofs://', 'obs://', 'oss://', 's3a://' or 'cosn://' path.");
+                        + " 'ofs://', 'obs://', 'oss://', 's3a://', 'cosn://' or 'jfs://' path.");
             }
         } else if (type == StorageBackend.StorageType.S3) {
             if (schema == null || !schema.equalsIgnoreCase("s3")) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
index dfdbd4f8a2..dc212a481b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StorageBackend.java
@@ -115,6 +115,7 @@ public class StorageBackend extends StorageDesc implements ParseNode {
         HDFS("Hadoop Distributed File System"),
         LOCAL("Local file system"),
         OFS("Tencent CHDFS"),
+        JFS("Juicefs"),
         STREAM("Stream load pipe");
 
         private final String description;
@@ -136,6 +137,8 @@ public class StorageBackend extends StorageDesc implements ParseNode {
                     return TStorageBackendType.HDFS;
                 case OFS:
                     return TStorageBackendType.OFS;
+                case JFS:
+                    return TStorageBackendType.JFS;
                 case LOCAL:
                     return TStorageBackendType.LOCAL;
                 default:
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java b/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java
index e4a3362490..be02be0690 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/BlobStorage.java
@@ -50,10 +50,12 @@ public abstract class BlobStorage implements Writable {
     public static BlobStorage create(String name, StorageBackend.StorageType type, Map<String, String> properties) {
         if (type == StorageBackend.StorageType.S3) {
             return new S3Storage(properties);
-        } else if (type == StorageBackend.StorageType.HDFS || type == StorageBackend.StorageType.OFS) {
+        } else if (type == StorageBackend.StorageType.HDFS
+                || type == StorageBackend.StorageType.OFS
+                || type == StorageBackend.StorageType.JFS) {
             BlobStorage storage = new HdfsStorage(properties);
             // as of ofs files, use hdfs storage, but it's type should be ofs
-            if (type == StorageBackend.StorageType.OFS) {
+            if (type == StorageBackend.StorageType.OFS || type == StorageBackend.StorageType.JFS) {
                 storage.setType(type);
                 storage.setName(type.name());
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index fd7c27760b..adf5dccc0a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -213,7 +213,8 @@ public class HiveMetaStoreClientHelper {
     private static String getAllFileStatus(List<TBrokerFileStatus> fileStatuses,
             List<RemoteIterator<LocatedFileStatus>> remoteIterators, BlobStorage storage) throws UserException {
         boolean needFullPath = storage.getStorageType() == StorageBackend.StorageType.S3
-                || storage.getStorageType() == StorageBackend.StorageType.OFS;
+                || storage.getStorageType() == StorageBackend.StorageType.OFS
+                || storage.getStorageType() == StorageBackend.StorageType.JFS;
         String hdfsUrl = "";
         Queue<RemoteIterator<LocatedFileStatus>> queue = Queues.newArrayDeque(remoteIterators);
         while (queue.peek() != null) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index bff471f039..dfde23e0a0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -73,6 +73,7 @@ public class FeConstants {
     public static String FS_PREFIX_COS = "cos";
     public static String FS_PREFIX_OBS = "obs";
     public static String FS_PREFIX_OFS = "ofs";
+    public static String FS_PREFIX_JFS = "jfs";
     public static String FS_PREFIX_HDFS = "hdfs";
     public static String FS_PREFIX_FILE = "file";
     public static final String INTERNAL_DB_NAME = "__internal_schema";
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index 38284b097e..1f9ddee238 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -326,7 +326,8 @@ public class BrokerScanNode extends LoadScanNode {
                 throw new UserException(e.getMessage());
             }
             brokerScanRange.addToBrokerAddresses(new TNetworkAddress(broker.ip, broker.port));
-        } else if (brokerDesc.getStorageType() == StorageBackend.StorageType.OFS) {
+        } else if (brokerDesc.getStorageType() == StorageBackend.StorageType.OFS
+                || brokerDesc.getStorageType() == StorageBackend.StorageType.JFS) {
             FsBroker broker = Env.getCurrentEnv().getBrokerMgr().getAnyAliveBroker();
             if (broker == null) {
                 throw new UserException("No alive broker.");
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
index 121e2df54d..0b932eb976 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
@@ -135,6 +135,8 @@ public class HiveScanNode extends BrokerScanNode {
             this.storageType = StorageBackend.StorageType.HDFS;
         } else if (storagePrefix.equalsIgnoreCase(FeConstants.FS_PREFIX_OFS)) {
             this.storageType = StorageBackend.StorageType.OFS;
+        } else if (storagePrefix.equalsIgnoreCase(FeConstants.FS_PREFIX_JFS)) {
+            this.storageType = StorageBackend.StorageType.JFS;
         } else {
             throw new UserException("Not supported storage type: " + storagePrefix);
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
index 340597f6da..daf238ee8c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/HiveScanProvider.java
@@ -124,7 +124,9 @@ public class HiveScanProvider extends HMSTableScanProvider {
                 return TFileType.FILE_HDFS;
             } else if (location.startsWith(FeConstants.FS_PREFIX_FILE)) {
                 return TFileType.FILE_LOCAL;
-            }  else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
+            } else if (location.startsWith(FeConstants.FS_PREFIX_OFS)) {
+                return TFileType.FILE_BROKER;
+            } else if (location.startsWith(FeConstants.FS_PREFIX_JFS)) {
                 return TFileType.FILE_BROKER;
             }
         }
diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 27b548af8c..f66872a10a 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -73,6 +73,7 @@ public class FileSystemManager {
     private static final String OSS_SCHEME = "oss";
     private static final String COS_SCHEME = "cosn";
     private static final String BOS_SCHEME = "bos";
+    private static final String JFS_SCHEME = "jfs";
     private static final String AFS_SCHEME = "afs";
 
     private static final String USER_NAME_KEY = "username";
@@ -142,7 +143,6 @@ public class FileSystemManager {
     private static final String FS_BOS_IMPL = "fs.bos.impl";
     private static final String FS_BOS_MULTIPART_UPLOADS_BLOCK_SIZE = "fs.bos.multipart.uploads.block.size";
 
-
     // arguments for afs
     private static final String HADOOP_JOB_GROUP_NAME = "hadoop.job.group.name";
     private static final String HADOOP_JOB_UGI = "hadoop.job.ugi";
@@ -219,7 +219,9 @@ public class FileSystemManager {
             brokerFileSystem = getCOSFileSystem(path, properties);
         } else if (scheme.equals(BOS_SCHEME)) {
             brokerFileSystem = getBOSFileSystem(path, properties);
-        } else {
+        } else if (scheme.equals(JFS_SCHEME)) {
+            brokerFileSystem = getJuiceFileSystem(path, properties);
+        }else {
             throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
                 "invalid path. scheme is not supported");
         }
@@ -805,6 +807,132 @@ public class FileSystemManager {
         }
     }
 
+    /**
+     * visible for test
+     *
+     * file system handle is cached, the identity is for all juicefs.
+     * @param path
+     * @param properties
+     * @return
+     * @throws URISyntaxException
+     * @throws Exception
+     */
+    public BrokerFileSystem getJuiceFileSystem(String path, Map<String, String> properties) {
+        WildcardURI pathUri = new WildcardURI(path);
+        String host = JFS_SCHEME;
+        if (Strings.isNullOrEmpty(pathUri.getAuthority())) {
+            if (properties.containsKey(FS_DEFAULTFS_KEY)) {
+                host = properties.get(FS_DEFAULTFS_KEY);
+                logger.info("no schema and authority in path. use fs.defaultFs");
+            } else {
+                logger.warn("invalid jfs path. authority is null,path:" + path);
+                throw  new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                    "invalid jfs path. authority is null");
+            }
+        }
+        String authentication = properties.getOrDefault(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+            AUTHENTICATION_SIMPLE);
+        if (Strings.isNullOrEmpty(authentication) || (!authentication.equals(AUTHENTICATION_SIMPLE)
+            && !authentication.equals(AUTHENTICATION_KERBEROS))) {
+            logger.warn("invalid authentication:" + authentication);
+            throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                "invalid authentication:" + authentication);
+        }
+
+        FileSystemIdentity fileSystemIdentity = null;
+        if (authentication.equals(AUTHENTICATION_SIMPLE)) {
+            fileSystemIdentity = new FileSystemIdentity(host, "");
+        } else {
+            // for kerberos, use host + principal + keytab as filesystemindentity
+            String kerberosContent = "";
+            if (properties.containsKey(KERBEROS_KEYTAB)) {
+                kerberosContent = properties.get(KERBEROS_KEYTAB);
+            } else if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) {
+                kerberosContent = properties.get(KERBEROS_KEYTAB_CONTENT);
+            } else {
+                throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                    "keytab is required for kerberos authentication");
+            }
+            if (!properties.containsKey(KERBEROS_PRINCIPAL)) {
+                throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                    "principal is required for kerberos authentication");
+            } else {
+                kerberosContent = kerberosContent + properties.get(KERBEROS_PRINCIPAL);
+            }
+            try {
+                MessageDigest digest = MessageDigest.getInstance("md5");
+                byte[] result = digest.digest(kerberosContent.getBytes());
+                String kerberosUgi = new String(result);
+                fileSystemIdentity = new FileSystemIdentity(host, kerberosUgi);
+            } catch (NoSuchAlgorithmException e) {
+                throw  new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                    e.getMessage());
+            }
+        }
+        BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties);
+        fileSystem.getLock().lock();
+        try {
+            // create a new filesystem
+            Configuration conf = new Configuration();
+            for (Map.Entry<String, String> propElement : properties.entrySet()) {
+                conf.set(propElement.getKey(), propElement.getValue());
+            }
+
+            if (fileSystem.getDFSFileSystem() == null) {
+                logger.info("create file system for new path " + path);
+                String tmpFilePath = null;
+                if (authentication.equals(AUTHENTICATION_KERBEROS)){
+                    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+                        AUTHENTICATION_KERBEROS);
+
+                    String principal = preparePrincipal(properties.get(KERBEROS_PRINCIPAL));
+                    String keytab = "";
+                    if (properties.containsKey(KERBEROS_KEYTAB)) {
+                        keytab = properties.get(KERBEROS_KEYTAB);
+                    } else if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) {
+                        // pass kerberos keytab content use base64 encoding
+                        // so decode it and write it to tmp path under /tmp
+                        // because ugi api only accept a local path as argument
+                        String keytabContent = properties.get(KERBEROS_KEYTAB_CONTENT);
+                        byte[] base64decodedBytes = Base64.getDecoder().decode(keytabContent);
+                        long currentTime = System.currentTimeMillis();
+                        Random random = new Random(currentTime);
+                        int randNumber = random.nextInt(10000);
+                        tmpFilePath = "/tmp/." + Long.toString(currentTime) + "_" + Integer.toString(randNumber);
+                        FileOutputStream fileOutputStream = new FileOutputStream(tmpFilePath);
+                        fileOutputStream.write(base64decodedBytes);
+                        fileOutputStream.close();
+                        keytab = tmpFilePath;
+                    } else {
+                        throw  new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                            "keytab is required for kerberos authentication");
+                    }
+                    UserGroupInformation.setConfiguration(conf);
+                    UserGroupInformation.loginUserFromKeytab(principal, keytab);
+                    if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) {
+                        try {
+                            File file = new File(tmpFilePath);
+                            if(!file.delete()){
+                                logger.warn("delete tmp file:" +  tmpFilePath + " failed");
+                            }
+                        } catch (Exception e) {
+                            throw new  BrokerException(TBrokerOperationStatusCode.FILE_NOT_FOUND,
+                                e.getMessage());
+                        }
+                    }
+                }
+                FileSystem jfsFileSystem = FileSystem.get(pathUri.getUri(), conf);
+                fileSystem.setFileSystem(jfsFileSystem);
+            }
+            return fileSystem;
+        } catch (Exception e) {
+            logger.error("errors while connect to " + path, e);
+            throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+        } finally {
+            fileSystem.getLock().unlock();
+        }
+    }
+
     private BrokerFileSystem getAfsFileSystem(String path, Map<String, String> properties) {
         URI pathUri = new WildcardURI(path).getUri();
         String host = pathUri.getScheme() + "://" + pathUri.getAuthority();
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 589b95f77c..ba3d28e1a2 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -105,6 +105,7 @@ enum TStorageBackendType {
     BROKER,
     S3,
     HDFS,
+    JFS,
     LOCAL,
     OFS
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org