You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/05 15:43:17 UTC

[incubator-doris] branch master updated: [improvement](hive) Support hive with HA HDFS. Pass ha configuration through hive create table properties. (#9151)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a5f9031c89 [improvement](hive) Support hive with HA HDFS. Pass ha configuration through hive create table properties. (#9151)
a5f9031c89 is described below

commit a5f9031c892017e8f7ec09e47ab103e9325c2c6e
Author: Jibing-Li <64...@users.noreply.github.com>
AuthorDate: Thu May 5 23:43:11 2022 +0800

    [improvement](hive) Support hive with HA HDFS. Pass ha configuration through hive create table properties. (#9151)
    
    Doris couldn't resolve the defaultFS of HDFS with HA configuration, so it could query hive table on HA HDFS.
    This is because there's no way to send the HA configs to hive external table.
    
    Describe the overview of changes.
    Pass the ha configs to hive external table through create table properties.
    
    Usage:
    Example of creating hive table with ha configuration properties:
    
    CREATE TABLE region (
    r_regionkey integer NOT NULL,
    r_name char(25) NOT NULL,
    r_comment varchar(152)
    ) engine=hive properties
    ("database"="default",
    "table"="region",
    "hive.metastore.uris"="thrift://172.21.16.11:7004",
    "dfs.nameservices"="hacluster",
    "dfs.ha.namenodes.hacluster"="3,4",
    "dfs.namenode.rpc-address.hacluster.3"="192.168.0.93:8020",
    "dfs.namenode.rpc-address.hacluster.4"="172.21.16.11:8020", "dfs.client.failover.proxy.provider.hacluster"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
---
 docs/en/ecosystem/external-table/hive-of-doris.md  | 28 ++++++++++++++++---
 .../ecosystem/external-table/hive-of-doris.md      | 31 +++++++++++++++++-----
 .../doris/catalog/HiveMetaStoreClientHelper.java   | 19 ++++++++++---
 .../java/org/apache/doris/catalog/HiveTable.java   | 13 +++++++++
 .../java/org/apache/doris/common/FeConstants.java  |  2 ++
 .../org/apache/doris/planner/BrokerScanNode.java   | 18 ++++++++++---
 6 files changed, 95 insertions(+), 16 deletions(-)

diff --git a/docs/en/ecosystem/external-table/hive-of-doris.md b/docs/en/ecosystem/external-table/hive-of-doris.md
index e7b531295d..741e167c16 100644
--- a/docs/en/ecosystem/external-table/hive-of-doris.md
+++ b/docs/en/ecosystem/external-table/hive-of-doris.md
@@ -44,8 +44,6 @@ This document introduces how to use this feature and the considerations.
 
 ### Create Hive External Table 
 
-Refer to the specific table syntax:[CREATE TABLE](../../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.html)
-
 ```sql
 -- Syntax
 CREATE [EXTERNAL] TABLE table_name (
@@ -57,7 +55,7 @@ PROPERTIES (
   ...
 );
 
--- Example: Create the hive_table table under hive_db in a Hive cluster
+-- Example 1: Create the hive_table table under hive_db in a Hive cluster
 CREATE TABLE `t_hive` (
   `k1` int NOT NULL COMMENT "",
   `k2` char(10) NOT NULL COMMENT "",
@@ -71,6 +69,26 @@ PROPERTIES (
 'database' = 'hive_db',
 'table' = 'hive_table'
 );
+
+-- Example 2: Create the hive_table table under hive_db in a Hive cluster with HDFS HA configuration.
+CREATE TABLE `t_hive` (
+  `k1` int NOT NULL COMMENT "",
+  `k2` char(10) NOT NULL COMMENT "",
+  `k3` datetime NOT NULL COMMENT "",
+  `k5` varchar(20) NOT NULL COMMENT "",
+  `k6` double NOT NULL COMMENT ""
+) ENGINE=HIVE
+COMMENT "HIVE"
+PROPERTIES (
+'hive.metastore.uris' = 'thrift://192.168.0.1:9083',
+'database' = 'hive_db',
+'table' = 'hive_table',
+'dfs.nameservices'='hacluster',
+'dfs.ha.namenodes.hacluster'='3,4',
+'dfs.namenode.rpc-address.hacluster.3'='192.168.0.93:8020',
+'dfs.namenode.rpc-address.hacluster.4'='172.21.16.11:8020',
+'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
+);
 ```
 
 #### Parameter Description
@@ -85,6 +103,10 @@ PROPERTIES (
     - `hive.metastore.uris`: Hive Metastore service address
     - `database`: the name of the database to which Hive is mounted
     - `table`: the name of the table to which Hive is mounted
+    - `dfs.nameservices`:the logical name for this new nameservice. See hdfs-site.xml
+    - `dfs.ha.namenodes.[nameservice ID]`:unique identifiers for each NameNode in the nameservice. See hdfs-site.xml
+    - `dfs.namenode.rpc-address.[nameservice ID].[name node ID]`:the fully-qualified RPC address for each NameNode to listen on. See hdfs-site.xml
+    - `dfs.client.failover.proxy.provider.[nameservice ID]`:the Java class that HDFS clients use to contact the Active NameNode, usually it is org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
     
 ## Data Type Matching
 
diff --git a/docs/zh-CN/ecosystem/external-table/hive-of-doris.md b/docs/zh-CN/ecosystem/external-table/hive-of-doris.md
index 558f9ea249..f5c7a017e4 100644
--- a/docs/zh-CN/ecosystem/external-table/hive-of-doris.md
+++ b/docs/zh-CN/ecosystem/external-table/hive-of-doris.md
@@ -44,8 +44,6 @@ Hive External Table of Doris 提供了 Doris 直接访问 Hive 外部表的能
 
 ### Doris 中创建 Hive 的外表
 
-具体建表语法参照:[CREATE TABLE](../../sql-manual/sql-reference/Data-Definition-Statements/Create/CREATE-TABLE.html)
-
 ```sql
 -- 语法
 CREATE [EXTERNAL] TABLE table_name (
@@ -57,7 +55,7 @@ PROPERTIES (
   ...
 );
 
--- 例子:创建 Hive 集群中 hive_db 下的 hive_table 表
+-- 例子1:创建 Hive 集群中 hive_db 下的 hive_table 表
 CREATE TABLE `t_hive` (
   `k1` int NOT NULL COMMENT "",
   `k2` char(10) NOT NULL COMMENT "",
@@ -71,6 +69,26 @@ PROPERTIES (
 'database' = 'hive_db',
 'table' = 'hive_table'
 );
+
+-- 例子2:创建 Hive 集群中 hive_db 下的 hive_table 表,HDFS使用HA配置
+CREATE TABLE `t_hive` (
+  `k1` int NOT NULL COMMENT "",
+  `k2` char(10) NOT NULL COMMENT "",
+  `k3` datetime NOT NULL COMMENT "",
+  `k5` varchar(20) NOT NULL COMMENT "",
+  `k6` double NOT NULL COMMENT ""
+) ENGINE=HIVE
+COMMENT "HIVE"
+PROPERTIES (
+'hive.metastore.uris' = 'thrift://192.168.0.1:9083',
+'database' = 'hive_db',
+'table' = 'hive_table',
+'dfs.nameservices'='hacluster',
+'dfs.ha.namenodes.hacluster'='3,4',
+'dfs.namenode.rpc-address.hacluster.3'='192.168.0.93:8020',
+'dfs.namenode.rpc-address.hacluster.4'='172.21.16.11:8020',
+'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
+);
 ```
 
 #### 参数说明:
@@ -85,6 +103,10 @@ PROPERTIES (
     - `hive.metastore.uris`:Hive Metastore 服务地址
     - `database`:挂载 Hive 对应的数据库名
     - `table`:挂载 Hive 对应的表名
+    - `dfs.nameservices`:name service名称,与hdfs-site.xml保持一致
+    - `dfs.ha.namenodes.[nameservice ID]:namenode的id列表,与hdfs-site.xml保持一致
+    - `dfs.namenode.rpc-address.[nameservice ID].[name node ID]`:Name node的rpc地址,数量与namenode数量相同,与hdfs-site.xml保持一致
+    - `dfs.client.failover.proxy.provider.[nameservice ID] `:HDFS客户端连接活跃namenode的java类,通常是"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
 
 ## 类型匹配
 
@@ -117,6 +139,3 @@ PROPERTIES (
 ```sql
 select * from t_hive where k1 > 1000 and k3 ='term' or k4 like '%doris';
 ```
-
-
-
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index 8ca9d7b9cf..366e720ec3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -66,6 +66,7 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 import java.util.Stack;
 
 /**
@@ -183,10 +184,10 @@ public class HiveMetaStoreClientHelper {
             } finally {
                 client.close();
             }
-            remoteIterators = getRemoteIterator(hivePartitions);
+            remoteIterators = getRemoteIterator(hivePartitions, hiveTable.getHiveProperties());
         } else {
             // hive non-partitioned table, get file iterator from table sd info
-            remoteIterators = getRemoteIterator(remoteHiveTbl);
+            remoteIterators = getRemoteIterator(remoteHiveTbl, hiveTable.getHiveProperties());
         }
 
         String hdfsUrl = "";
@@ -219,9 +220,14 @@ public class HiveMetaStoreClientHelper {
         return hdfsUrl;
     }
 
-    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(List<Partition> partitions) throws DdlException {
+    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(List<Partition> partitions, Map<String, String> properties) throws DdlException {
         List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
         Configuration configuration = new Configuration(false);
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
+                configuration.set(entry.getKey(), entry.getValue());
+            }
+        }
         for (Partition p : partitions) {
             String location = p.getSd().getLocation();
             org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(location);
@@ -236,9 +242,14 @@ public class HiveMetaStoreClientHelper {
         return iterators;
     }
 
-    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(Table table) throws DdlException {
+    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(Table table, Map<String, String> properties) throws DdlException {
         List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
         Configuration configuration = new Configuration(false);
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
+                configuration.set(entry.getKey(), entry.getValue());
+            }
+        }
         String location = table.getSd().getLocation();
         org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(location);
         try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
index ff4adc001f..d418e47307 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -42,6 +43,7 @@ public class HiveTable extends Table {
     private static final String HIVE_DB = "database";
     private static final String HIVE_TABLE = "table";
     public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
+    public static final String HIVE_HDFS_PREFIX = "dfs";
 
     private String hiveDb;
     private String hiveTable;
@@ -100,6 +102,17 @@ public class HiveTable extends Table {
         copiedProps.remove(HIVE_METASTORE_URIS);
         hiveProperties.put(HIVE_METASTORE_URIS, hiveMetastoreUris);
 
+        if (!copiedProps.isEmpty()) {
+            Iterator<Map.Entry<String, String>> iter = copiedProps.entrySet().iterator();
+            while(iter.hasNext()) {
+                Map.Entry<String, String> entry = iter.next();
+                if (entry.getKey().startsWith(HIVE_HDFS_PREFIX)) {
+                    hiveProperties.put(entry.getKey(), entry.getValue());
+                    iter.remove();
+                }
+            }
+        }
+
         if (!copiedProps.isEmpty()) {
             throw new DdlException("Unknown table properties: " + copiedProps.toString());
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 20ff3551b9..27ffda6d87 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -60,4 +60,6 @@ public class FeConstants {
     public static String csv = "csv";
     public static String csv_with_names = "csv_with_names";
     public static String csv_with_names_and_types = "csv_with_names_and_types";
+
+    public static String text = "text";
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index 73aa1fb04d..a7b9493b88 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -420,7 +420,9 @@ public class BrokerScanNode extends LoadScanNode {
                 // csv/csv_with_name/csv_with_names_and_types treat as csv format
             } else if (fileFormat.toLowerCase().equals(FeConstants.csv)
                     || fileFormat.toLowerCase().equals(FeConstants.csv_with_names)
-                    || fileFormat.toLowerCase().equals(FeConstants.csv_with_names_and_types)) {
+                    || fileFormat.toLowerCase().equals(FeConstants.csv_with_names_and_types)
+                    // TODO: Add TEXTFILE to TFileFormatType to Support hive text file format.
+                    || fileFormat.toLowerCase().equals(FeConstants.text)) {
                 return TFileFormatType.FORMAT_CSV_PLAIN;
             } else {
                 throw new UserException("Not supported file format: " + fileFormat);
@@ -506,7 +508,12 @@ public class BrokerScanNode extends LoadScanNode {
                 } else {
                     TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType,
                             leftBytes, columnsFromPath, numberOfColumnsFromFile, brokerDesc, header_type);
-                    rangeDesc.setHdfsParams(tHdfsParams);
+                    if (rangeDesc.hdfs_params != null && rangeDesc.hdfs_params.getFsName() == null) {
+                        rangeDesc.hdfs_params.setFsName(fsName);
+                    } else if (rangeDesc.hdfs_params == null) {
+                        rangeDesc.setHdfsParams(tHdfsParams);
+                    }
+
                     rangeDesc.setReadByColumnDef(true);
                     brokerScanRange(curLocations).addToRanges(rangeDesc);
                     curFileOffset = 0;
@@ -529,7 +536,12 @@ public class BrokerScanNode extends LoadScanNode {
                     rangeDesc.setNumAsString(context.fileGroup.isNumAsString());
                     rangeDesc.setReadJsonByLine(context.fileGroup.isReadJsonByLine());
                 }
-                rangeDesc.setHdfsParams(tHdfsParams);
+                if (rangeDesc.hdfs_params != null && rangeDesc.hdfs_params.getFsName() == null) {
+                    rangeDesc.hdfs_params.setFsName(fsName);
+                } else if (rangeDesc.hdfs_params == null) {
+                    rangeDesc.setHdfsParams(tHdfsParams);
+                }
+
                 rangeDesc.setReadByColumnDef(true);
                 brokerScanRange(curLocations).addToRanges(rangeDesc);
                 curFileOffset = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org