You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/16 11:15:51 UTC

[incubator-doris] branch master updated: [feature] Support hive on s3 (#10128)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f1c9105af1 [feature] Support hive on s3 (#10128)
f1c9105af1 is described below

commit f1c9105af1939dd695268498593f98a10ef21217
Author: Jibing-Li <64...@users.noreply.github.com>
AuthorDate: Thu Jun 16 19:15:46 2022 +0800

    [feature] Support hive on s3 (#10128)
    
    Support query hive table on S3. Pass AK/SK, Region and s3 endpoint to hive table while creating the external table.
    
    example create table sql:
    ```
    CREATE TABLE `region_s3` (
    `r_regionkey` integer NOT NULL,
    `r_name` char(25) NOT NULL,
    `r_comment` varchar(152) )
    engine=hive
    properties
    ("database"="default",
    "table"="region_s3",
    “hive.metastore.uris"="thrift://127.0.0.1:9083",
    “AWS_ACCESS_KEY”=“YOUR_ACCESS_KEY",
    “AWS_SECRET_KEY”=“YOUR_SECRET_KEY",
    "AWS_ENDPOINT"="s3.us-east-1.amazonaws.com",
    “AWS_REGION”=“us-east-1”);
    ```
---
 .../docs/ecosystem/external-table/hive-of-doris.md | 24 +++++++++++-
 .../docs/ecosystem/external-table/hive-of-doris.md | 24 +++++++++++-
 .../doris/catalog/HiveMetaStoreClientHelper.java   | 43 +++++++++++++++++++---
 .../java/org/apache/doris/catalog/HiveTable.java   | 19 ++++++----
 .../org/apache/doris/planner/HiveScanNode.java     | 21 +++++++++--
 5 files changed, 114 insertions(+), 17 deletions(-)

diff --git a/docs/en/docs/ecosystem/external-table/hive-of-doris.md b/docs/en/docs/ecosystem/external-table/hive-of-doris.md
index 1159bb2a61..ce0bd3579b 100644
--- a/docs/en/docs/ecosystem/external-table/hive-of-doris.md
+++ b/docs/en/docs/ecosystem/external-table/hive-of-doris.md
@@ -91,7 +91,6 @@ PROPERTIES (
 'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
 );
 
-
 -- Example 3: Create the hive external table under hive_db in Hive cluster with HDFS HA and enable kerberos authentication. 
 CREATE TABLE `t_hive` (
   `k1` int NOT NULL COMMENT "",
@@ -116,6 +115,25 @@ PROPERTIES (
 'hadoop.kerberos.keytab'='/path/to/doris_test.keytab'
 );
 
+-- Example 4: Create the hive_table table under hive_db in a Hive cluster with data stored on S3
+CREATE TABLE `t_hive` (
+  `k1` int NOT NULL COMMENT "",
+  `k2` char(10) NOT NULL COMMENT "",
+  `k3` datetime NOT NULL COMMENT "",
+  `k5` varchar(20) NOT NULL COMMENT "",
+  `k6` double NOT NULL COMMENT ""
+) ENGINE=HIVE
+COMMENT "HIVE"
+PROPERTIES (
+'hive.metastore.uris' = 'thrift://192.168.0.1:9083',
+'database' = 'hive_db',
+'table' = 'hive_table',
+'AWS_ACCESS_KEY'='your_access_key',
+'AWS_SECRET_KEY'='your_secret_key',
+'AWS_ENDPOINT'='s3.us-east-1.amazonaws.com',
+'AWS_REGION'='us-east-1'
+);
+
 ```
 
 #### Parameter Description
@@ -139,6 +157,10 @@ PROPERTIES (
     - `hadoop.security.authentication`: HDFS authentication type please set kerberos, default simple
     - `hadoop.kerberos.principal`: The Kerberos pincipal that Doris will use when connectiong to HDFS.
     - `hadoop.kerberos.keytab`: HDFS client keytab location.
+    - `AWS_ACCESS_KEY`: AWS access key id.
+    - `AWS_SECRET_KEY`: AWS secret access key.
+    - `AWS_ENDPOINT`: S3 endpoint. e.g. s3.us-east-1.amazonaws.com
+    - `AWS_REGION`: AWS region. e.g. us-east-1
 
 **Note:**
 - To enable Doris to access the hadoop cluster with kerberos authentication enabled, you need to deploy the Kerberos client kinit on the Doris all FE and BE nodes, configure krb5.conf, and fill in the KDC service information.
diff --git a/docs/zh-CN/docs/ecosystem/external-table/hive-of-doris.md b/docs/zh-CN/docs/ecosystem/external-table/hive-of-doris.md
index 70a92333fe..e5441d1315 100644
--- a/docs/zh-CN/docs/ecosystem/external-table/hive-of-doris.md
+++ b/docs/zh-CN/docs/ecosystem/external-table/hive-of-doris.md
@@ -91,7 +91,6 @@ PROPERTIES (
 'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
 );
 
-
 -- 例子3:创建 Hive 集群中 hive_db 下的 hive_table 表, HDFS使用HA配置并开启kerberos认证方式
 CREATE TABLE `t_hive` (
   `k1` int NOT NULL COMMENT "",
@@ -116,6 +115,25 @@ PROPERTIES (
 'hadoop.kerberos.keytab'='/path/to/doris_test.keytab'
 );
 
+-- 例子4:创建 Hive 集群中 hive_db 下的 hive_table 表, Hive数据存储在S3上
+CREATE TABLE `t_hive` (
+  `k1` int NOT NULL COMMENT "",
+  `k2` char(10) NOT NULL COMMENT "",
+  `k3` datetime NOT NULL COMMENT "",
+  `k5` varchar(20) NOT NULL COMMENT "",
+  `k6` double NOT NULL COMMENT ""
+) ENGINE=HIVE
+COMMENT "HIVE"
+PROPERTIES (
+'hive.metastore.uris' = 'thrift://192.168.0.1:9083',
+'database' = 'hive_db',
+'table' = 'hive_table',
+'AWS_ACCESS_KEY' = 'your_access_key',
+'AWS_SECRET_KEY' = 'your_secret_key',
+'AWS_ENDPOINT' = 's3.us-east-1.amazonaws.com',
+'AWS_REGION' = 'us-east-1'
+);
+
 ```
 
 #### 参数说明:
@@ -140,6 +158,10 @@ PROPERTIES (
     - `dfs.namenode.kerberos.principal`:HDFS namenode 服务的Kerberos 主体
     - `hadoop.kerberos.principal`:设置 Doris 连接 HDFS 时使用的 Kerberos 主体
     - `hadoop.kerberos.keytab`:设置 keytab 本地文件路径
+    - `AWS_ACCESS_KEY`: AWS账户的access key id.
+    - `AWS_SECRET_KEY`: AWS账户的secret access key.
+    - `AWS_ENDPOINT`: S3 endpoint. 例如:s3.us-east-1.amazonaws.com
+    - `AWS_REGION`: AWS区域. 例如:us-east-1
 
 **注意:**
 - 若要使 Doris 访问开启kerberos认证方式的hadoop集群,需要在 Doris 集群所有运行节点上部署 Kerberos 客户端 kinit,并配置 krb5.conf,填写KDC 服务信息等。
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index 88a7fd1826..946657b7e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -29,6 +29,7 @@ import org.apache.doris.analysis.IntLiteral;
 import org.apache.doris.analysis.LiteralExpr;
 import org.apache.doris.analysis.NullLiteral;
 import org.apache.doris.analysis.SlotRef;
+import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.util.BrokerUtil;
@@ -171,16 +172,17 @@ public class HiveMetaStoreClientHelper {
      */
     public static String getHiveDataFiles(HiveTable hiveTable, ExprNodeGenericFuncDesc hivePartitionPredicate,
                                           List<TBrokerFileStatus> fileStatuses,
-                                          Table remoteHiveTbl) throws DdlException {
+                                          Table remoteHiveTbl, StorageBackend.StorageType type) throws DdlException {
         List<RemoteIterator<LocatedFileStatus>> remoteIterators;
+        Boolean onS3 = type.equals(StorageBackend.StorageType.S3);
         if (remoteHiveTbl.getPartitionKeys().size() > 0) {
             String metaStoreUris = hiveTable.getHiveProperties().get(HiveTable.HIVE_METASTORE_URIS);
             // hive partitioned table, get file iterator from table partition sd info
             List<Partition> hivePartitions = getHivePartitions(metaStoreUris, remoteHiveTbl, hivePartitionPredicate);
-            remoteIterators = getRemoteIterator(hivePartitions, hiveTable.getHiveProperties());
+            remoteIterators = getRemoteIterator(hivePartitions, hiveTable.getHiveProperties(), onS3);
         } else {
             // hive non-partitioned table, get file iterator from table sd info
-            remoteIterators = getRemoteIterator(remoteHiveTbl, hiveTable.getHiveProperties());
+            remoteIterators = getRemoteIterator(remoteHiveTbl, hiveTable.getHiveProperties(), onS3);
         }
 
         String hdfsUrl = "";
@@ -195,6 +197,12 @@ public class HiveMetaStoreClientHelper {
                     // path = "/path/to/partition/file_name"
                     // eg: /home/work/dev/hive/apache-hive-2.3.7-bin/data/warehouse/dae.db/customer/state=CA/city=SanJose/000000_0
                     String path = fileStatus.getPath().toUri().getPath();
+                    if (onS3) {
+                        // Backend need full s3 path (with s3://bucket at the beginning) to read the data on s3.
+                        // path = "s3://bucket/path/to/partition/file_name"
+                        // eg: s3://hive-s3-test/region/region.tbl
+                        path = fileStatus.getPath().toString();
+                    }
                     brokerFileStatus.setPath(path);
                     fileStatuses.add(brokerFileStatus);
                     if (StringUtils.isEmpty(hdfsUrl)) {
@@ -239,7 +247,24 @@ public class HiveMetaStoreClientHelper {
         return hivePartitions;
     }
 
-    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(List<Partition> partitions, Map<String, String> properties) throws DdlException {
+    private static void setS3Configuration(Configuration configuration, Map<String, String> properties) {
+        if (properties.containsKey(HiveTable.S3_AK)) {
+            configuration.set("fs.s3a.access.key", properties.get(HiveTable.S3_AK));
+        }
+        if (properties.containsKey(HiveTable.S3_SK)) {
+            configuration.set("fs.s3a.secret.key", properties.get(HiveTable.S3_SK));
+        }
+        if (properties.containsKey(HiveTable.S3_ENDPOINT)) {
+            configuration.set("fs.s3a.endpoint", properties.get(HiveTable.S3_ENDPOINT));
+        }
+        configuration.set("fs.s3.impl.disable.cache", "true");
+        configuration.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+        configuration.set("fs.s3a.attempts.maximum", "2");
+    }
+
+    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(
+            List<Partition> partitions, Map<String, String> properties, boolean onS3)
+            throws DdlException {
         List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
         Configuration configuration = new Configuration(false);
         for (Map.Entry<String, String> entry : properties.entrySet()) {
@@ -247,6 +272,9 @@ public class HiveMetaStoreClientHelper {
                 configuration.set(entry.getKey(), entry.getValue());
             }
         }
+        if (onS3) {
+            setS3Configuration(configuration, properties);
+        }
         for (Partition p : partitions) {
             String location = p.getSd().getLocation();
             org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(location);
@@ -261,7 +289,9 @@ public class HiveMetaStoreClientHelper {
         return iterators;
     }
 
-    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(Table table, Map<String, String> properties) throws DdlException {
+    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(
+            Table table, Map<String, String> properties, boolean onS3)
+            throws DdlException {
         List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
         Configuration configuration = new Configuration(false);
         boolean isSecurityEnabled = false;
@@ -274,6 +304,9 @@ public class HiveMetaStoreClientHelper {
                 isSecurityEnabled = true;
             }
         }
+        if (onS3) {
+            setS3Configuration(configuration, properties);
+        }
         String location = table.getSd().getLocation();
         org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(location);
         try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
index 259e743f0b..3736548948 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
@@ -42,15 +42,19 @@ public class HiveTable extends Table {
     private static final String PROPERTY_MISSING_MSG = "Hive %s is null. Please add properties('%s'='xxx') when create table";
     private static final String PROPERTY_ERROR_MSG = "Hive table properties('%s'='%s') is illegal or not supported. Please check it";
 
-    private static final String HIVE_DB = "database";
-    private static final String HIVE_TABLE = "table";
-    public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
-    public static final String HIVE_HDFS_PREFIX = "dfs";
-
     private String hiveDb;
     private String hiveTable;
     private Map<String, String> hiveProperties = Maps.newHashMap();
 
+    public static final String HIVE_DB = "database";
+    public static final String HIVE_TABLE = "table";
+    public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
+    public static final String HIVE_HDFS_PREFIX = "dfs";
+    public static final String S3_PROPERTIES_PREFIX = "AWS";
+    public static final String S3_AK = "AWS_ACCESS_KEY";
+    public static final String S3_SK = "AWS_SECRET_KEY";
+    public static final String S3_ENDPOINT = "AWS_ENDPOINT";
+
     public HiveTable() {
         super(TableType.HIVE);
     }
@@ -142,8 +146,9 @@ public class HiveTable extends Table {
             Iterator<Map.Entry<String, String>> iter = copiedProps.entrySet().iterator();
             while (iter.hasNext()) {
                 Map.Entry<String, String> entry = iter.next();
-                if (entry.getKey().startsWith(HIVE_HDFS_PREFIX)) {
-                    hiveProperties.put(entry.getKey(), entry.getValue());
+                String key = entry.getKey();
+                if (key.startsWith(HIVE_HDFS_PREFIX) || key.startsWith(S3_PROPERTIES_PREFIX)) {
+                    hiveProperties.put(key, entry.getValue());
                     iter.remove();
                 }
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
index 4fda0515e7..de4f3b5261 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.planner;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.doris.analysis.Analyzer;
 import org.apache.doris.analysis.BrokerDesc;
 import org.apache.doris.analysis.Expr;
@@ -68,6 +69,7 @@ public class HiveScanNode extends BrokerScanNode {
     private String fileFormat;
     private String path;
     private List<String> partitionKeys = new ArrayList<>();
+    private StorageBackend.StorageType storageType;
     /* hive table properties */
 
     public String getHostUri() {
@@ -123,13 +125,26 @@ public class HiveScanNode extends BrokerScanNode {
                         getFileFormat(),
                         getPartitionKeys(),
                         getParsedColumnExprList()));
-        brokerDesc = new BrokerDesc("HiveTableDesc", StorageBackend.StorageType.HDFS, hiveTable.getHiveProperties());
+        brokerDesc = new BrokerDesc("HiveTableDesc", storageType, hiveTable.getHiveProperties());
         targetTable = hiveTable;
     }
 
-    private void initHiveTblProperties() throws DdlException {
+    private void setStorageType(String location) throws UserException {
+        String[] strings = StringUtils.split(location, "/");
+        String storagePrefix = strings[0].split(":")[0];
+        if (storagePrefix.equalsIgnoreCase("s3")) {
+            this.storageType = StorageBackend.StorageType.S3;
+        } else if (storagePrefix.equalsIgnoreCase("hdfs")) {
+            this.storageType = StorageBackend.StorageType.HDFS;
+        } else {
+            throw new UserException("Not supported storage type: " + storagePrefix);
+        }
+    }
+
+    private void initHiveTblProperties() throws UserException {
         this.remoteHiveTable = HiveMetaStoreClientHelper.getTable(hiveTable);
         this.fileFormat = HiveMetaStoreClientHelper.HiveFileFormat.getFormat(remoteHiveTable.getSd().getInputFormat());
+        this.setStorageType(remoteHiveTable.getSd().getLocation());
 
         Map<String, String> serDeInfoParams = remoteHiveTable.getSd().getSerdeInfo().getParameters();
         this.columnSeparator = Strings.isNullOrEmpty(serDeInfoParams.get("field.delim"))
@@ -179,7 +194,7 @@ public class HiveScanNode extends BrokerScanNode {
         }
         List<TBrokerFileStatus> fileStatuses = new ArrayList<>();
         this.hdfsUri = HiveMetaStoreClientHelper.getHiveDataFiles(hiveTable, hivePartitionPredicate,
-            fileStatuses, remoteHiveTable);
+                fileStatuses, remoteHiveTable, storageType);
         fileStatusesList.add(fileStatuses);
         filesAdded += fileStatuses.size();
         for (TBrokerFileStatus fstatus : fileStatuses) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org