You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2020/03/01 04:53:57 UTC

[incubator-doris] branch master updated: Support Amazon S3 data source in Broker Load (#3004)

This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 078e35a  Support Amazon S3 data source in Broker Load  (#3004)
078e35a is described below

commit 078e35a62e07d54236a96e3f1043ddfe0a6c6382
Author: frwrdt <38...@users.noreply.github.com>
AuthorDate: Sun Mar 1 12:53:50 2020 +0800

    Support Amazon S3 data source in Broker Load  (#3004)
---
 .../Data Manipulation/BROKER LOAD.md               |  26 +++++-
 fs_brokers/apache_hdfs_broker/pom.xml              |   7 ++
 .../doris/broker/hdfs/FileSystemManager.java       | 101 +++++++++++++++++++--
 .../org/apache/doris/common/BrokerPerfMonitor.java |   2 +-
 .../doris/broker/hdfs/TestFileSystemManager.java   |  11 +++
 5 files changed, 138 insertions(+), 9 deletions(-)

diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md
index a89f26c..08061b2 100644
--- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md	
+++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md	
@@ -28,6 +28,7 @@ under the License.
     2. Baidu AFS:百度内部的 afs,仅限于百度内部使用。
     3. Baidu Object Storage(BOS):百度对象存储。仅限百度内部用户、公有云用户或其他可以访问 BOS 的用户使用。
     4. Apache HDFS:社区版本 hdfs。
+    5. Amazon S3:Amazon对象存储。
 
 语法:
 
@@ -145,6 +146,13 @@ under the License.
             dfs.namenode.rpc-address.xxx.nn:指定 namenode 的rpc地址信息。其中 nn 表示 dfs.ha.namenodes.xxx 中配置的 namenode 的名字,如:"dfs.namenode.rpc-address.my_ha.my_nn" = "host:port"
             dfs.client.failover.proxy.provider:指定 client 连接 namenode 的 provider,默认为:org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
 
+        4. Amazon S3
+
+            需提供:
+            fs.s3a.access.key:AmazonS3的access key
+            fs.s3a.secret.key:AmazonS3的secret key
+            fs.s3a.endpoint:AmazonS3的endpoint 
+        
     4. opt_properties
 
         用于指定一些特殊参数。
@@ -375,7 +383,23 @@ under the License.
         INTO TABLE `my_table`
         where k1 > k2
         )
-     
+
+    11. 从 AmazonS3 导入Parquet文件中数据,指定 FORMAT 为parquet,默认是通过文件后缀判断:
+        
+        LOAD LABEL example_db.label11
+        (
+        DATA INFILE("s3a://my_bucket/input/file")
+        INTO TABLE `my_table`
+        FORMAT AS "parquet"
+        (k1, k2, k3)
+        )
+        WITH BROKER my_s3a_broker
+        (
+        "fs.s3a.access.key" = "xxxxxxxxxxxxxxxxxxxxxxxxxx",
+        "fs.s3a.secret.key" = "yyyyyyyyyyyyyyyyyyyy",
+        "fs.s3a.endpoint" = "s3.amazonaws.com"
+        )
+         
 ## keyword
 
     BROKER,LOAD
diff --git a/fs_brokers/apache_hdfs_broker/pom.xml b/fs_brokers/apache_hdfs_broker/pom.xml
index cc95619..1db9c68 100644
--- a/fs_brokers/apache_hdfs_broker/pom.xml
+++ b/fs_brokers/apache_hdfs_broker/pom.xml
@@ -266,6 +266,13 @@ under the License.
             <version>1.3.2</version>
         </dependency>
 
+        <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws -->
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-aws</artifactId>
+            <version>2.7.3</version>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
index 5e0d90f..8b0274a 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/broker/hdfs/FileSystemManager.java
@@ -61,7 +61,10 @@ public class FileSystemManager {
 
     private static Logger logger = Logger
             .getLogger(FileSystemManager.class.getName());
-    private static final String HDFS_SCHEME = "hdfs://";
+    // supported scheme
+    private static final String HDFS_SCHEME = "hdfs";
+    private static final String S3A_SCHEME = "s3a";
+
     private static final String HDFS_UGI_CONF = "hadoop.job.ugi";
     private static final String USER_NAME_KEY = "username";
     private static final String PASSWORD_KEY = "password";
@@ -81,7 +84,12 @@ public class FileSystemManager {
     private static final String DEFAULT_DFS_CLIENT_FAILOVER_PROXY_PROVIDER =
             "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider";
     private static final String FS_DEFAULTFS_KEY = "fs.defaultFS";
-    
+
+    // arguments for s3a
+    private static final String FS_S3A_ACCESS_KEY = "fs.s3a.access.key";
+    private static final String FS_S3A_SECRET_KEY = "fs.s3a.secret.key";
+    private static final String FS_S3A_ENDPOINT = "fs.s3a.endpoint";
+
     private ScheduledExecutorService handleManagementPool = Executors.newScheduledThreadPool(2);
     
     private int readBufferSize = 128 << 10; // 128k
@@ -120,18 +128,46 @@ public class FileSystemManager {
     /**
      * visible for test
      * 
+     * @param path
+     * @param properties
+     * @return BrokerFileSystem with different FileSystem based on scheme
+     * @throws URISyntaxException 
+     * @throws Exception 
+     */
+    public BrokerFileSystem getFileSystem(String path, Map<String, String> properties) {
+        WildcardURI pathUri = new WildcardURI(path);
+        String scheme = pathUri.getUri().getScheme();
+        if (Strings.isNullOrEmpty(scheme)) {
+            throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
+                "invalid path. scheme is null");
+        }
+        BrokerFileSystem brokerFileSystem = null;
+        if (scheme.equals(HDFS_SCHEME)) {
+            brokerFileSystem = getDistributedFileSystem(path, properties);
+        } else if (scheme.equals(S3A_SCHEME)) {
+            brokerFileSystem = getS3AFileSystem(path, properties);
+        } else {
+            throw new BrokerException(TBrokerOperationStatusCode.INVALID_INPUT_FILE_PATH,
+                "invalid path. scheme is not supported");
+        }
+        return brokerFileSystem;
+    }
+
+    /**
+     * visible for test
+     *
      * file system handle is cached, the identity is host + username_password
      * it will have safety problem if only hostname is used because one user may specify username and password
      * and then access hdfs, another user may not specify username and password but could also access data
      * @param path
      * @param properties
      * @return
-     * @throws URISyntaxException 
-     * @throws Exception 
+     * @throws URISyntaxException
+     * @throws Exception
      */
-    public BrokerFileSystem getFileSystem(String path, Map<String, String> properties) {
+    public BrokerFileSystem getDistributedFileSystem(String path, Map<String, String> properties) {
         WildcardURI pathUri = new WildcardURI(path);
-        String host = HDFS_SCHEME + pathUri.getAuthority();
+        String host = HDFS_SCHEME + "://" + pathUri.getAuthority();
         if (Strings.isNullOrEmpty(pathUri.getAuthority())) {
             if (properties.containsKey(FS_DEFAULTFS_KEY)) {
                 host = properties.get(FS_DEFAULTFS_KEY);
@@ -307,7 +343,58 @@ public class FileSystemManager {
             fileSystem.getLock().unlock();
         }
     }
-    
+
+    /**
+     * visible for test
+     *
+     * file system handle is cached, the identity is host + accessKey_secretKey
+     * @param path
+     * @param properties
+     * @return
+     * @throws URISyntaxException
+     * @throws Exception
+     */
+    public BrokerFileSystem getS3AFileSystem(String path, Map<String, String> properties) {
+        WildcardURI pathUri = new WildcardURI(path);
+        String accessKey = properties.getOrDefault(FS_S3A_ACCESS_KEY, "");
+        String secretKey = properties.getOrDefault(FS_S3A_SECRET_KEY, "");
+        String endpoint = properties.getOrDefault(FS_S3A_ENDPOINT, "");
+        String host = S3A_SCHEME + "://" + endpoint;
+        String s3aUgi = accessKey + "," + secretKey;
+        FileSystemIdentity fileSystemIdentity = new FileSystemIdentity(host, s3aUgi);
+        BrokerFileSystem fileSystem = null;
+        cachedFileSystem.putIfAbsent(fileSystemIdentity, new BrokerFileSystem(fileSystemIdentity));
+        fileSystem = cachedFileSystem.get(fileSystemIdentity);
+        if (fileSystem == null) {
+            // it means it is removed concurrently by checker thread
+            return null;
+        }
+        fileSystem.getLock().lock();
+        try {
+            if (!cachedFileSystem.containsKey(fileSystemIdentity)) {
+                // this means the file system is closed by file system checker thread
+                // it is a corner case
+                return null;
+            }
+            if (fileSystem.getDFSFileSystem() == null) {
+                logger.info("could not find file system for path " + path + " create a new one");
+                // create a new filesystem
+                Configuration conf = new Configuration();
+                conf.set(FS_S3A_ACCESS_KEY, accessKey);
+                conf.set(FS_S3A_SECRET_KEY, secretKey);
+                conf.set(FS_S3A_ENDPOINT, endpoint);
+                FileSystem s3AFileSystem = FileSystem.get(pathUri.getUri(), conf);
+                fileSystem.setFileSystem(s3AFileSystem);
+            }
+            return fileSystem;
+        } catch (Exception e) {
+            logger.error("errors while connect to " + path, e);
+            throw new BrokerException(TBrokerOperationStatusCode.NOT_AUTHORIZED, e);
+        } finally {
+            fileSystem.getLock().unlock();
+        }
+    }
+
     public List<TBrokerFileStatus> listPath(String path, boolean fileNameOnly, Map<String, String> properties) {
         List<TBrokerFileStatus> resultFileStatus = null;
         WildcardURI pathUri = new WildcardURI(path);
diff --git a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/BrokerPerfMonitor.java b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/BrokerPerfMonitor.java
index 5780405..346472b 100644
--- a/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/BrokerPerfMonitor.java
+++ b/fs_brokers/apache_hdfs_broker/src/main/java/org/apache/doris/common/BrokerPerfMonitor.java
@@ -22,7 +22,7 @@ import com.google.common.base.Stopwatch;
 public class BrokerPerfMonitor {
 
     public static Stopwatch startWatch() {
-        Stopwatch stopwatch = new Stopwatch();
+        Stopwatch stopwatch = Stopwatch.createUnstarted();
         stopwatch.start();
         return stopwatch;
     }
diff --git a/fs_brokers/apache_hdfs_broker/src/test/java/org/apache/doris/broker/hdfs/TestFileSystemManager.java b/fs_brokers/apache_hdfs_broker/src/test/java/org/apache/doris/broker/hdfs/TestFileSystemManager.java
index 3eaafec..7c89b72 100644
--- a/fs_brokers/apache_hdfs_broker/src/test/java/org/apache/doris/broker/hdfs/TestFileSystemManager.java
+++ b/fs_brokers/apache_hdfs_broker/src/test/java/org/apache/doris/broker/hdfs/TestFileSystemManager.java
@@ -200,4 +200,15 @@ public class TestFileSystemManager extends TestCase {
         isPathExist = fileSystemManager.checkPathExist(tempFile2, properties);
         assertFalse(isPathExist);
     }
+
+    @Test
+    public void testGetFileSystemForS3aScheme() throws IOException {
+        Map<String, String> properties = new HashMap<String, String>();
+        properties.put("fs.s3a.access.key", "accessKey");
+        properties.put("fs.s3a.secret.key", "secretKey");
+        properties.put("fs.s3a.endpoint", "s3.test.com");
+        BrokerFileSystem fs = fileSystemManager.getFileSystem("s3a://testbucket/data/abc/logs", properties);
+        assertNotNull(fs);
+        fs.getDFSFileSystem().close();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org