You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/15 08:56:39 UTC

[doris] branch master updated: [fix](resource) HdfsStorage can get default.Fs from path or configuration (#15079)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6625e650c4 [fix](resource) HdfsStorage can get default.Fs from path or configuration (#15079)
6625e650c4 is described below

commit 6625e650c4bf5df57b3612dd56b38cec76c16110
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Thu Dec 15 16:56:32 2022 +0800

    [fix](resource) HdfsStorage can get default.Fs from path or configuration (#15079)
---
 .../org/apache/doris/analysis/OutFileClause.java   |  5 ++-
 .../java/org/apache/doris/backup/HdfsStorage.java  | 36 ++++++++---------
 .../org/apache/doris/catalog/HdfsResource.java     |  8 ----
 .../external_catalog_p0/hive/test_hive_other.out   |  8 ++++
 .../hive/test_hive_other.groovy                    | 47 +++++++++++++++++++++-
 5 files changed, 75 insertions(+), 29 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 31bd346b06..3d3f82b74b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -624,7 +624,6 @@ public class OutFileClause {
         } else if (filePath.toUpperCase().startsWith(HDFS_FILE_PREFIX.toUpperCase())) {
             brokerName = StorageBackend.StorageType.HDFS.name();
             storageType = StorageBackend.StorageType.HDFS;
-            filePath = filePath.substring(HDFS_FILE_PREFIX.length() - 1);
         } else {
             return;
         }
@@ -651,7 +650,9 @@ public class OutFileClause {
         if (storageType == StorageBackend.StorageType.S3) {
             S3Storage.checkS3(brokerProps);
         } else if (storageType == StorageBackend.StorageType.HDFS) {
-            HdfsStorage.checkHDFS(brokerProps);
+            if (!brokerProps.containsKey(HdfsResource.HADOOP_FS_NAME)) {
+                brokerProps.put(HdfsResource.HADOOP_FS_NAME, HdfsStorage.getFsName(filePath));
+            }
         }
 
         brokerDesc = new BrokerDesc(brokerName, storageType, brokerProps);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/HdfsStorage.java b/fe/fe-core/src/main/java/org/apache/doris/backup/HdfsStorage.java
index 6344a5ec75..e245ad6377 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/HdfsStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/HdfsStorage.java
@@ -23,7 +23,6 @@ import org.apache.doris.catalog.HdfsResource;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.URI;
 
-import org.apache.commons.collections.map.CaseInsensitiveMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -49,6 +48,7 @@ import java.nio.file.FileVisitOption;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -58,7 +58,7 @@ import java.util.Map;
  */
 public class HdfsStorage extends BlobStorage {
     private static final Logger LOG = LogManager.getLogger(HdfsStorage.class);
-    private final Map<String, String> caseInsensitiveProperties;
+    private final Map<String, String> hdfsProperties;
 
     private final int readBufferSize = 128 << 10; // 128k
     private final int writeBufferSize = 128 << 10; // 128k
@@ -71,30 +71,26 @@ public class HdfsStorage extends BlobStorage {
      * @param properties parameters to access HDFS.
      */
     public HdfsStorage(Map<String, String> properties) {
-        caseInsensitiveProperties = new CaseInsensitiveMap();
+        hdfsProperties = new HashMap<>();
         setProperties(properties);
         setType(StorageBackend.StorageType.HDFS);
         setName(StorageBackend.StorageType.HDFS.name());
     }
 
-    public static void checkHDFS(Map<String, String> properties) throws UserException {
-        for (String field : HdfsResource.REQUIRED_FIELDS) {
-            if (!properties.containsKey(field)) {
-                throw new UserException(
-                        String.format("The properties of hdfs is invalid. %s are needed", field));
-            }
-        }
+    public static String getFsName(String path) {
+        Path hdfsPath = new Path(path);
+        String fullPath = hdfsPath.toUri().toString();
+        String filePath = hdfsPath.toUri().getPath();
+        return fullPath.replace(filePath, "");
     }
 
     @Override
     public FileSystem getFileSystem(String remotePath) throws UserException {
         if (dfsFileSystem == null) {
-            checkHDFS(caseInsensitiveProperties);
-            String hdfsFsName = caseInsensitiveProperties.get(HdfsResource.HADOOP_FS_NAME);
-            String username = caseInsensitiveProperties.get(HdfsResource.HADOOP_USER_NAME);
+            String username = hdfsProperties.get(HdfsResource.HADOOP_USER_NAME);
             Configuration conf = new HdfsConfiguration();
             boolean isSecurityEnabled = false;
-            for (Map.Entry<String, String> propEntry : caseInsensitiveProperties.entrySet()) {
+            for (Map.Entry<String, String> propEntry : hdfsProperties.entrySet()) {
                 conf.set(propEntry.getKey(), propEntry.getValue());
                 if (propEntry.getKey().equals(HdfsResource.HADOOP_SECURITY_AUTHENTICATION)
                         && propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) {
@@ -106,10 +102,14 @@ public class HdfsStorage extends BlobStorage {
                 if (isSecurityEnabled) {
                     UserGroupInformation.setConfiguration(conf);
                     UserGroupInformation.loginUserFromKeytab(
-                            caseInsensitiveProperties.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL),
-                            caseInsensitiveProperties.get(HdfsResource.HADOOP_KERBEROS_KEYTAB));
+                            hdfsProperties.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL),
+                            hdfsProperties.get(HdfsResource.HADOOP_KERBEROS_KEYTAB));
+                }
+                if (username == null) {
+                    dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf);
+                } else {
+                    dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf, username);
                 }
-                dfsFileSystem = FileSystem.get(java.net.URI.create(hdfsFsName), conf, username);
             } catch (Exception e) {
                 LOG.error("errors while connect to " + remotePath, e);
                 throw new UserException("errors while connect to " + remotePath, e);
@@ -121,7 +121,7 @@ public class HdfsStorage extends BlobStorage {
     @Override
     public void setProperties(Map<String, String> properties) {
         super.setProperties(properties);
-        caseInsensitiveProperties.putAll(properties);
+        hdfsProperties.putAll(properties);
     }
 
     @Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
index 868f032a44..5e0a5fe874 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
@@ -27,8 +27,6 @@ import com.google.common.collect.Maps;
 import com.google.gson.annotations.SerializedName;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -56,7 +54,6 @@ public class HdfsResource extends Resource {
     public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
     public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
     public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
-    public static List<String> REQUIRED_FIELDS = Collections.singletonList(HADOOP_FS_NAME);
 
     @SerializedName(value = "properties")
     private Map<String, String> properties;
@@ -75,11 +72,6 @@ public class HdfsResource extends Resource {
 
     @Override
     protected void setProperties(Map<String, String> properties) throws DdlException {
-        for (String field : REQUIRED_FIELDS) {
-            if (!properties.containsKey(field)) {
-                throw new DdlException("Missing [" + field + "] in properties.");
-            }
-        }
         // `dfs.client.read.shortcircuit` and `dfs.domain.socket.path` should be both set to enable short circuit read.
         // We should disable short circuit read if they are not both set because it will cause performance down.
         if (!properties.containsKey(HADOOP_SHORT_CIRCUIT) || !properties.containsKey(HADOOP_SOCKET_PATH)) {
diff --git a/regression-test/data/external_catalog_p0/hive/test_hive_other.out b/regression-test/data/external_catalog_p0/hive/test_hive_other.out
index 86af4c14ec..8d44514ed7 100644
--- a/regression-test/data/external_catalog_p0/hive/test_hive_other.out
+++ b/regression-test/data/external_catalog_p0/hive/test_hive_other.out
@@ -671,3 +671,11 @@ zyLjAtVdXV	GrJRf8WvRR
 2022-11-25	2022-11-25	zj9uWRywHa	5F8hzYcY8G	2022-11-25
 2022-11-25	2022-11-25	zvs3b72ERY	zorbigHkYB	2022-11-25
 
+-- !student --
+124	lisi	13	f	abcdefh	13056781234
+123	zhangsan	12	m	abcdefg	13012345678
+
+-- !tvf_student --
+124	lisi	13	f	abcdefh	13056781234
+123	zhangsan	12	m	abcdefg	13012345678
+
diff --git a/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy b/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy
index 82b6bb1cfc..f8401d70e9 100644
--- a/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy
+++ b/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy
@@ -77,6 +77,7 @@ suite("test_hive_other", "p0") {
     String enabled = context.config.otherConfigs.get("enableHiveTest")
     if (enabled != null && enabled.equalsIgnoreCase("true")) {
         String hms_port = context.config.otherConfigs.get("hms_port")
+        String hdfs_port = context.config.otherConfigs.get("hdfs_port")
         String catalog_name = "hive_test_other"
         set_be_config.call()
 
@@ -100,7 +101,51 @@ suite("test_hive_other", "p0") {
         // order_qt_show_tables2 """show tables"""
         q01()
         sql """refresh table `default`.table_with_vertical_line"""
-        order_qt_after_refresh """ select dt, dt, k2, k5, dt from table_with_vertical_line where dt in ('2022-11-25') or dt in ('2022-11-24') order by k2 desc limit 10;"""        
+        order_qt_after_refresh """ select dt, dt, k2, k5, dt from table_with_vertical_line where dt in ('2022-11-25') or dt in ('2022-11-24') order by k2 desc limit 10;"""
+
+        // external table
+        sql """switch internal"""
+        sql """drop database if exists external_hive_table_test"""
+        sql """create database external_hive_table_test"""
+        sql """use external_hive_table_test"""
+        sql """drop table if exists external_hive_student"""
+
+        sql """
+            create external table `external_hive_student` (
+                `id` varchar(100),
+                `name` varchar(100),
+                `age` int,
+                `gender` varchar(100),
+                `addr` varchar(100),
+                `phone` varchar(100)
+            ) ENGINE=HIVE
+            PROPERTIES
+            (
+                'hive.metastore.uris' = 'thrift://127.0.0.1:${hms_port}',
+                'database' = 'default',
+                'table' = 'student'
+            );
+        """
+        qt_student """select * from external_hive_student order by name;"""
+
+        // read external table
+        String csv_output_dir = UUID.randomUUID().toString()
+        sql """
+            select * from external_hive_student
+            into outfile "hdfs://127.0.0.1:${hdfs_port}/user/test/student/${csv_output_dir}/csv_"
+            format as csv_with_names
+            properties (
+                "column_separator" = ",",
+                "line_delimiter" = "\n"
+            );
+        """
+        qt_tvf_student """
+            select * from hdfs (
+                "format" = "csv_with_names",
+                "fs.defaultFS" = "hdfs://127.0.0.1:${hdfs_port}",
+                "uri" = "hdfs://127.0.0.1:${hdfs_port}/user/test/student/${csv_output_dir}/csv_*"
+            ) order by name;
+        """
     }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org