You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2022/12/30 05:01:59 UTC

[doris] branch branch-1.2-lts updated: [Broker](bos) suppoert baidu bos object storage for broker (#15448)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new 7ccd504932 [Broker](bos) suppoert baidu bos object storage for broker (#15448)
7ccd504932 is described below

commit 7ccd5049325515d21489eaee506084f47d3a4101
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Fri Dec 30 12:39:10 2022 +0800

    [Broker](bos) suppoert baidu bos object storage for broker (#15448)
---
 .../java/org/apache/doris/analysis/ExportStmt.java |   8 +-
 .../doris/broker/hdfs/FileSystemIdentity.java      |  17 +++
 .../doris/broker/hdfs/FileSystemManager.java       | 119 +++++++++++++++++++--
 3 files changed, 135 insertions(+), 9 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
index 077729c81e..62fb2c9977 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ExportStmt.java
@@ -236,14 +236,16 @@ public class ExportStmt extends StatementBase {
         URI uri = URI.create(path);
         String schema = uri.getScheme();
         if (type == StorageBackend.StorageType.BROKER) {
-            if (schema == null || (!schema.equalsIgnoreCase("hdfs")
+            if (schema == null || (!schema.equalsIgnoreCase("bos")
+                    && !schema.equalsIgnoreCase("afs")
+                    && !schema.equalsIgnoreCase("hdfs")
                     && !schema.equalsIgnoreCase("ofs")
                     && !schema.equalsIgnoreCase("obs")
                     && !schema.equalsIgnoreCase("oss")
                     && !schema.equalsIgnoreCase("s3a")
                     && !schema.equalsIgnoreCase("cosn"))) {
-                throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'ofs://', 'obs://',"
-                    + "'oss://', 's3a://' or 'cosn://' path.");
+                throw new AnalysisException("Invalid broker path. please use valid 'hdfs://', 'afs://' , 'bos://',"
+                        + " 'ofs://', 'obs://', 'oss://', 's3a://' or 'cosn://' path.");
             }
         } else if (type == StorageBackend.StorageType.S3) {
             if (schema == null || !schema.equalsIgnoreCase("s3")) {
diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemIdentity.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemIdentity.java
index 9446638a52..2885ba1a58 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemIdentity.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemIdentity.java
@@ -21,10 +21,19 @@ public class FileSystemIdentity {
 
     private final String hostName;
     private final String ugiInfo;
+
+    private final String extraInfo;
     
     public FileSystemIdentity(String hostName, String ugiInfo) {
         this.hostName = hostName;
         this.ugiInfo = ugiInfo;
+        this.extraInfo = null;
+    }
+
+    public FileSystemIdentity(String hostName, String ugiInfo, String extraInfo) {
+        this.hostName = hostName;
+        this.ugiInfo = ugiInfo;
+        this.extraInfo = extraInfo;
     }
 
     @Override
@@ -34,6 +43,7 @@ public class FileSystemIdentity {
         result = prime * result
                 + ((hostName == null) ? 0 : hostName.hashCode());
         result = prime * result + ((ugiInfo == null) ? 0 : ugiInfo.hashCode());
+        result = prime * result + ((extraInfo == null) ? 0 : extraInfo.hashCode());
         return result;
     }
 
@@ -63,6 +73,13 @@ public class FileSystemIdentity {
         } else if (!ugiInfo.equals(other.ugiInfo)) {
             return false;
         }
+        if (extraInfo == null) {
+            if (other.extraInfo != null) {
+                return false;
+            }
+        } else if (!extraInfo.equals(other.extraInfo)) {
+            return false;
+        }
         return true;
     }
 }
diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 42ff97123b..27b548af8c 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -42,6 +42,7 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.InetAddress;
+import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
@@ -71,6 +72,8 @@ public class FileSystemManager {
     private static final String OBS_SCHEME = "obs";
     private static final String OSS_SCHEME = "oss";
     private static final String COS_SCHEME = "cosn";
+    private static final String BOS_SCHEME = "bos";
+    private static final String AFS_SCHEME = "afs";
 
     private static final String USER_NAME_KEY = "username";
     private static final String PASSWORD_KEY = "password";
@@ -132,6 +135,23 @@ public class FileSystemManager {
     private static final String FS_COS_IMPL = "fs.cosn.impl";
     private static final String FS_COS_IMPL_DISABLE_CACHE = "fs.cosn.impl.disable.cache";
 
+    // arguments for bos
+    private static final String FS_BOS_ACCESS_KEY = "fs.bos.access.key";
+    private static final String FS_BOS_SECRET_KEY = "fs.bos.secret.access.key";
+    private static final String FS_BOS_ENDPOINT = "fs.bos.endpoint";
+    private static final String FS_BOS_IMPL = "fs.bos.impl";
+    private static final String FS_BOS_MULTIPART_UPLOADS_BLOCK_SIZE = "fs.bos.multipart.uploads.block.size";
+
+
+    // arguments for afs
+    private static final String HADOOP_JOB_GROUP_NAME = "hadoop.job.group.name";
+    private static final String HADOOP_JOB_UGI = "hadoop.job.ugi";
+    private static final String FS_DEFAULT_NAME = "fs.default.name";
+    private static final String FS_AFS_IMPL = "fs.afs.impl";
+    private static final String DFS_AGENT_PORT = "dfs.agent.port";
+    private static final String DFS_CLIENT_AUTH_METHOD = "dfs.client.auth.method";
+    private static final String DFS_RPC_TIMEOUT = "dfs.rpc.timeout";
+
     private ScheduledExecutorService handleManagementPool = Executors.newScheduledThreadPool(2);
 
     private int readBufferSize = 128 << 10; // 128k
@@ -197,6 +217,8 @@ public class FileSystemManager {
             brokerFileSystem = getOSSFileSystem(path, properties);
         } else if (scheme.equals(COS_SCHEME)) {
             brokerFileSystem = getCOSFileSystem(path, properties);
+        } else if (scheme.equals(BOS_SCHEME)) {
+            brokerFileSystem = getBOSFileSystem(path, properties);
         } else {
             throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
                 "invalid path. scheme is not supported");
@@ -548,8 +570,8 @@ public class FileSystemManager {
         String endpoint = properties.getOrDefault(FS_OSS_ENDPOINT, "");
         String disableCache = properties.getOrDefault(FS_OSS_IMPL_DISABLE_CACHE, "true");
         String host = OSS_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost();
-        String obsUgi = accessKey + "," + secretKey;
-        FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, obsUgi);
+        String ossUgi = accessKey + "," + secretKey;
+        FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, ossUgi);
         cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity));
         BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties);
         fileSystem.getLock().lock();
@@ -608,11 +630,11 @@ public class FileSystemManager {
             } else if (properties.containsKey(KERBEROS_KEYTAB_CONTENT)) {
                 kerberosContent = properties.get(KERBEROS_KEYTAB_CONTENT);
             } else {
-                throw  new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
                         "keytab is required for kerberos authentication");
             }
             if (!properties.containsKey(KERBEROS_PRINCIPAL)) {
-                throw  new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
+                throw new BrokerException(TBrokerOperationStatusCode.INVALID_ARGUMENT,
                         "principal is required for kerberos authentication");
             } else {
                 kerberosContent = kerberosContent + properties.get(KERBEROS_PRINCIPAL);
@@ -651,8 +673,8 @@ public class FileSystemManager {
                         // pass kerberos keytab content use base64 encoding
                         // so decode it and write it to tmp path under /tmp
                         // because ugi api only accept a local path as argument
-                        String keytab_content = properties.get(KERBEROS_KEYTAB_CONTENT);
-                        byte[] base64decodedBytes = Base64.getDecoder().decode(keytab_content);
+                        String keytabContent = properties.get(KERBEROS_KEYTAB_CONTENT);
+                        byte[] base64decodedBytes = Base64.getDecoder().decode(keytabContent);
                         long currentTime = System.currentTimeMillis();
                         Random random = new Random(currentTime);
                         int randNumber = random.nextInt(10000);
@@ -737,6 +759,91 @@ public class FileSystemManager {
         }
     }
 
+    /**
+     * visible for test
+     * <p>
+     * file system handle is cached, the identity is endpoint + bucket + accessKey_secretKey
+     *
+     * @param path
+     * @param properties
+     * @return
+     * @throws URISyntaxException
+     * @throws Exception
+     */
+    public BrokerFileSystem getBOSFileSystem(String path, Map<String, String> properties) {
+        WildcardURI pathUri = new WildcardURI(path);
+        String accessKey = properties.getOrDefault(FS_BOS_ACCESS_KEY, "");
+        String secretKey = properties.getOrDefault(FS_BOS_SECRET_KEY, "");
+        String endpoint = properties.getOrDefault(FS_BOS_ENDPOINT, "");
+        String multiPartUploadBlockSize = properties.getOrDefault(FS_BOS_MULTIPART_UPLOADS_BLOCK_SIZE, "9437184");
+        // endpoint is the server host, pathUri.getUri().getHost() is the bucket
+        // we should use these two params as the host identity, because FileSystem will cache both.
+        String host = BOS_SCHEME + "://" + endpoint + "/" + pathUri.getUri().getHost();
+        String bosUgi = accessKey + "," + secretKey;
+        FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, bosUgi);
+        BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties);
+        fileSystem.getLock().lock();
+        try {
+            if (fileSystem.getDFSFileSystem() == null) {
+                logger.info("could not find file system for path " + path + " create a new one");
+                // create a new filesystem
+                Configuration conf = new Configuration();
+                conf.set(FS_BOS_ACCESS_KEY, accessKey);
+                conf.set(FS_BOS_SECRET_KEY, secretKey);
+                conf.set(FS_BOS_ENDPOINT, endpoint);
+                conf.set(FS_BOS_IMPL, "org.apache.hadoop.fs.bos.BaiduBosFileSystem");
+                conf.set(FS_BOS_MULTIPART_UPLOADS_BLOCK_SIZE, multiPartUploadBlockSize);
+                FileSystem bosFileSystem = FileSystem.get(pathUri.getUri(), conf);
+                fileSystem.setFileSystem(bosFileSystem);
+            }
+            return fileSystem;
+        } catch (Exception e) {
+            logger.error("errors while connect to " + path, e);
+            throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+        } finally {
+            fileSystem.getLock().unlock();
+        }
+    }
+
+    private BrokerFileSystem getAfsFileSystem(String path, Map<String, String> properties) {
+        URI pathUri = new WildcardURI(path).getUri();
+        String host = pathUri.getScheme() + "://" + pathUri.getAuthority();
+        String username = properties.containsKey(USER_NAME_KEY) ? properties.get(USER_NAME_KEY) : "";
+        String password = properties.containsKey(PASSWORD_KEY) ? properties.get(PASSWORD_KEY) : "";
+        String group = properties.containsKey(HADOOP_JOB_GROUP_NAME) ? properties.get(HADOOP_JOB_GROUP_NAME) : "";
+        String afsUgi = username + "," + password;
+        FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, afsUgi, group);
+        cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity));
+        BrokerFileSystem fileSystem = updateCachedFileSystem(fileSystemIdentity, properties);
+        fileSystem.getLock().lock();
+        try {
+            if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
+                // this means the file system is closed by file system checker thread
+                // it is a corner case
+                return null;
+            }
+            if (fileSystem.getDFSFileSystem() == null) {
+                logger.info("could not find file system for path " + path + " create a new one");
+                // create a new file system
+                Configuration conf = new Configuration();
+                conf.set(HADOOP_JOB_UGI, afsUgi);
+                conf.set(HADOOP_JOB_GROUP_NAME, group);
+                conf.set(FS_DEFAULT_NAME, host);
+                conf.set(FS_AFS_IMPL, "org.apache.hadoop.fs.DFileSystem");
+                conf.set(DFS_CLIENT_AUTH_METHOD, properties.getOrDefault(DFS_CLIENT_AUTH_METHOD, "3"));
+                conf.set(DFS_AGENT_PORT, properties.getOrDefault(DFS_AGENT_PORT, "20001"));
+                conf.set(DFS_RPC_TIMEOUT, properties.getOrDefault(DFS_RPC_TIMEOUT, "300000"));
+                FileSystem dfsFileSystem = FileSystem.get(URI.create(host), conf);
+                fileSystem.setFileSystem(dfsFileSystem);
+            }
+            return fileSystem;
+        } catch (Exception e) {
+            logger.error("errors while connect to " + path, e);
+            throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e, e.getMessage());
+        } finally {
+            fileSystem.getLock().unlock();
+        }
+    }
 
     public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> properties) {
         List<TBrokerFileStatus> resultFileStatus = null;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org