You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/06 09:20:41 UTC

[incubator-doris] 02/11: [improvement](hive) Support hive with HA HDFS. Pass ha configuration through hive create table properties. (#9151)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 80c74b029dddf69f47c7457e995a0f5e825d2dd4
Author: Jibing-Li <64...@users.noreply.github.com>
AuthorDate: Thu May 5 23:43:11 2022 +0800

    [improvement](hive) Support hive with HA HDFS. Pass ha configuration through hive create table properties. (#9151)
    
    Doris couldn't resolve the defaultFS of HDFS with HA configuration, so it could query hive table on HA HDFS.
    This is because there's no way to send the HA configs to hive external table.
    
    Describe the overview of changes.
    Pass the ha configs to hive external table through create table properties.
    
    Usage:
    Example of creating hive table with ha configuration properties:
    
    CREATE TABLE region (
    r_regionkey integer NOT NULL,
    r_name char(25) NOT NULL,
    r_comment varchar(152)
    ) engine=hive properties
    ("database"="default",
    "table"="region",
    "hive.metastore.uris"="thrift://172.21.16.11:7004",
    "dfs.nameservices"="hacluster",
    "dfs.ha.namenodes.hacluster"="3,4",
    "dfs.namenode.rpc-address.hacluster.3"="192.168.0.93:8020",
    "dfs.namenode.rpc-address.hacluster.4"="172.21.16.11:8020", "dfs.client.failover.proxy.provider.hacluster"="org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
---
 .../doris/catalog/HiveMetaStoreClientHelper.java      | 19 +++++++++++++++----
 .../main/java/org/apache/doris/catalog/HiveTable.java | 13 +++++++++++++
 .../java/org/apache/doris/common/FeConstants.java     |  6 ++++++
 .../java/org/apache/doris/planner/BrokerScanNode.java | 18 +++++++++++++++---
 4 files changed, 49 insertions(+), 7 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index 5dcfd1273b..c378eecf24 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -66,6 +66,7 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.Map;
 import java.util.Stack;
 
 /**
@@ -183,10 +184,10 @@ public class HiveMetaStoreClientHelper {
             } finally {
                 client.close();
             }
-            remoteIterators = getRemoteIterator(hivePartitions);
+            remoteIterators = getRemoteIterator(hivePartitions, hiveTable.getHiveProperties());
         } else {
             // hive non-partitioned table, get file iterator from table sd info
-            remoteIterators = getRemoteIterator(remoteHiveTbl);
+            remoteIterators = getRemoteIterator(remoteHiveTbl, hiveTable.getHiveProperties());
         }
 
         String hdfsUrl = "";
@@ -219,9 +220,14 @@ public class HiveMetaStoreClientHelper {
         return hdfsUrl;
     }
 
-    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(List<Partition> partitions) throws DdlException {
+    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(List<Partition> partitions, Map<String, String> properties) throws DdlException {
         List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
         Configuration configuration = new Configuration(false);
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
+                configuration.set(entry.getKey(), entry.getValue());
+            }
+        }
         for (Partition p : partitions) {
             String location = p.getSd().getLocation();
             org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(location);
@@ -236,9 +242,14 @@ public class HiveMetaStoreClientHelper {
         return iterators;
     }
 
-    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(Table table) throws DdlException {
+    private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(Table table, Map<String, String> properties) throws DdlException {
         List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
         Configuration configuration = new Configuration(false);
+        for (Map.Entry<String, String> entry : properties.entrySet()) {
+            if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
+                configuration.set(entry.getKey(), entry.getValue());
+            }
+        }
         String location = table.getSd().getLocation();
         org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(location);
         try {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
index ff4adc001f..d418e47307 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
@@ -29,6 +29,7 @@ import com.google.common.collect.Maps;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -42,6 +43,7 @@ public class HiveTable extends Table {
     private static final String HIVE_DB = "database";
     private static final String HIVE_TABLE = "table";
     public static final String HIVE_METASTORE_URIS = "hive.metastore.uris";
+    public static final String HIVE_HDFS_PREFIX = "dfs";
 
     private String hiveDb;
     private String hiveTable;
@@ -100,6 +102,17 @@ public class HiveTable extends Table {
         copiedProps.remove(HIVE_METASTORE_URIS);
         hiveProperties.put(HIVE_METASTORE_URIS, hiveMetastoreUris);
 
+        if (!copiedProps.isEmpty()) {
+            Iterator<Map.Entry<String, String>> iter = copiedProps.entrySet().iterator();
+            while(iter.hasNext()) {
+                Map.Entry<String, String> entry = iter.next();
+                if (entry.getKey().startsWith(HIVE_HDFS_PREFIX)) {
+                    hiveProperties.put(entry.getKey(), entry.getValue());
+                    iter.remove();
+                }
+            }
+        }
+
         if (!copiedProps.isEmpty()) {
             throw new DdlException("Unknown table properties: " + copiedProps.toString());
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
index 54b755c1e8..715eb0e6f9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java
@@ -57,4 +57,10 @@ public class FeConstants {
     public static String null_string = "\\N";
 
     public static long tablet_checker_interval_ms = 20 * 1000L;
+
+    public static String csv = "csv";
+    public static String csv_with_names = "csv_with_names";
+    public static String csv_with_names_and_types = "csv_with_names_and_types";
+
+    public static String text = "text";
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index bfab5124a9..499ef83728 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -36,6 +36,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.BrokerUtil;
+import org.apache.doris.common.FeConstants;
 import org.apache.doris.load.BrokerFileGroup;
 import org.apache.doris.load.Load;
 import org.apache.doris.load.loadv2.LoadTask;
@@ -409,7 +410,9 @@ public class BrokerScanNode extends LoadScanNode {
                 return TFileFormatType.FORMAT_ORC;
             } else if (fileFormat.toLowerCase().equals("json")) {
                 return TFileFormatType.FORMAT_JSON;
-            } else if (fileFormat.toLowerCase().equals("csv")) {
+            } else if (fileFormat.toLowerCase().equals(FeConstants.csv)
+                    // TODO: Add TEXTFILE to TFileFormatType to Support hive text file format.
+                    || fileFormat.toLowerCase().equals(FeConstants.text)) {
                 return TFileFormatType.FORMAT_CSV_PLAIN;
             } else {
                 throw new UserException("Not supported file format: " + fileFormat);
@@ -483,7 +486,11 @@ public class BrokerScanNode extends LoadScanNode {
                 } else {
                     TBrokerRangeDesc rangeDesc = createBrokerRangeDesc(curFileOffset, fileStatus, formatType,
                             leftBytes, columnsFromPath, numberOfColumnsFromFile, brokerDesc);
-                    rangeDesc.setHdfsParams(tHdfsParams);
+                    if (rangeDesc.hdfs_params != null && rangeDesc.hdfs_params.getFsName() == null) {
+                        rangeDesc.hdfs_params.setFsName(fsName);
+                    } else if (rangeDesc.hdfs_params == null) {
+                        rangeDesc.setHdfsParams(tHdfsParams);
+                    }
                     rangeDesc.setReadByColumnDef(true);
                     brokerScanRange(curLocations).addToRanges(rangeDesc);
                     curFileOffset = 0;
@@ -506,7 +513,12 @@ public class BrokerScanNode extends LoadScanNode {
                     rangeDesc.setNumAsString(context.fileGroup.isNumAsString());
                     rangeDesc.setReadJsonByLine(context.fileGroup.isReadJsonByLine());
                 }
-                rangeDesc.setHdfsParams(tHdfsParams);
+                if (rangeDesc.hdfs_params != null && rangeDesc.hdfs_params.getFsName() == null) {
+                    rangeDesc.hdfs_params.setFsName(fsName);
+                } else if (rangeDesc.hdfs_params == null) {
+                    rangeDesc.setHdfsParams(tHdfsParams);
+                }
+
                 rangeDesc.setReadByColumnDef(true);
                 brokerScanRange(curLocations).addToRanges(rangeDesc);
                 curFileOffset = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org