You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/15 08:56:39 UTC
[doris] branch master updated: [fix](resource) HdfsStorage can get default.Fs from path or configuration (#15079)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6625e650c4 [fix](resource) HdfsStorage can get default.Fs from path or configuration (#15079)
6625e650c4 is described below
commit 6625e650c4bf5df57b3612dd56b38cec76c16110
Author: Ashin Gau <As...@users.noreply.github.com>
AuthorDate: Thu Dec 15 16:56:32 2022 +0800
[fix](resource) HdfsStorage can get default.Fs from path or configuration (#15079)
---
.../org/apache/doris/analysis/OutFileClause.java | 5 ++-
.../java/org/apache/doris/backup/HdfsStorage.java | 36 ++++++++---------
.../org/apache/doris/catalog/HdfsResource.java | 8 ----
.../external_catalog_p0/hive/test_hive_other.out | 8 ++++
.../hive/test_hive_other.groovy | 47 +++++++++++++++++++++-
5 files changed, 75 insertions(+), 29 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 31bd346b06..3d3f82b74b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -624,7 +624,6 @@ public class OutFileClause {
} else if (filePath.toUpperCase().startsWith(HDFS_FILE_PREFIX.toUpperCase())) {
brokerName = StorageBackend.StorageType.HDFS.name();
storageType = StorageBackend.StorageType.HDFS;
- filePath = filePath.substring(HDFS_FILE_PREFIX.length() - 1);
} else {
return;
}
@@ -651,7 +650,9 @@ public class OutFileClause {
if (storageType == StorageBackend.StorageType.S3) {
S3Storage.checkS3(brokerProps);
} else if (storageType == StorageBackend.StorageType.HDFS) {
- HdfsStorage.checkHDFS(brokerProps);
+ if (!brokerProps.containsKey(HdfsResource.HADOOP_FS_NAME)) {
+ brokerProps.put(HdfsResource.HADOOP_FS_NAME, HdfsStorage.getFsName(filePath));
+ }
}
brokerDesc = new BrokerDesc(brokerName, storageType, brokerProps);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/backup/HdfsStorage.java b/fe/fe-core/src/main/java/org/apache/doris/backup/HdfsStorage.java
index 6344a5ec75..e245ad6377 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/backup/HdfsStorage.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/backup/HdfsStorage.java
@@ -23,7 +23,6 @@ import org.apache.doris.catalog.HdfsResource;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.URI;
-import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -49,6 +48,7 @@ import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -58,7 +58,7 @@ import java.util.Map;
*/
public class HdfsStorage extends BlobStorage {
private static final Logger LOG = LogManager.getLogger(HdfsStorage.class);
- private final Map<String, String> caseInsensitiveProperties;
+ private final Map<String, String> hdfsProperties;
private final int readBufferSize = 128 << 10; // 128k
private final int writeBufferSize = 128 << 10; // 128k
@@ -71,30 +71,26 @@ public class HdfsStorage extends BlobStorage {
* @param properties parameters to access HDFS.
*/
public HdfsStorage(Map<String, String> properties) {
- caseInsensitiveProperties = new CaseInsensitiveMap();
+ hdfsProperties = new HashMap<>();
setProperties(properties);
setType(StorageBackend.StorageType.HDFS);
setName(StorageBackend.StorageType.HDFS.name());
}
- public static void checkHDFS(Map<String, String> properties) throws UserException {
- for (String field : HdfsResource.REQUIRED_FIELDS) {
- if (!properties.containsKey(field)) {
- throw new UserException(
- String.format("The properties of hdfs is invalid. %s are needed", field));
- }
- }
+ public static String getFsName(String path) {
+ Path hdfsPath = new Path(path);
+ String fullPath = hdfsPath.toUri().toString();
+ String filePath = hdfsPath.toUri().getPath();
+ return fullPath.replace(filePath, "");
}
@Override
public FileSystem getFileSystem(String remotePath) throws UserException {
if (dfsFileSystem == null) {
- checkHDFS(caseInsensitiveProperties);
- String hdfsFsName = caseInsensitiveProperties.get(HdfsResource.HADOOP_FS_NAME);
- String username = caseInsensitiveProperties.get(HdfsResource.HADOOP_USER_NAME);
+ String username = hdfsProperties.get(HdfsResource.HADOOP_USER_NAME);
Configuration conf = new HdfsConfiguration();
boolean isSecurityEnabled = false;
- for (Map.Entry<String, String> propEntry : caseInsensitiveProperties.entrySet()) {
+ for (Map.Entry<String, String> propEntry : hdfsProperties.entrySet()) {
conf.set(propEntry.getKey(), propEntry.getValue());
if (propEntry.getKey().equals(HdfsResource.HADOOP_SECURITY_AUTHENTICATION)
&& propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) {
@@ -106,10 +102,14 @@ public class HdfsStorage extends BlobStorage {
if (isSecurityEnabled) {
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytab(
- caseInsensitiveProperties.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL),
- caseInsensitiveProperties.get(HdfsResource.HADOOP_KERBEROS_KEYTAB));
+ hdfsProperties.get(HdfsResource.HADOOP_KERBEROS_PRINCIPAL),
+ hdfsProperties.get(HdfsResource.HADOOP_KERBEROS_KEYTAB));
+ }
+ if (username == null) {
+ dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf);
+ } else {
+ dfsFileSystem = FileSystem.get(java.net.URI.create(remotePath), conf, username);
}
- dfsFileSystem = FileSystem.get(java.net.URI.create(hdfsFsName), conf, username);
} catch (Exception e) {
LOG.error("errors while connect to " + remotePath, e);
throw new UserException("errors while connect to " + remotePath, e);
@@ -121,7 +121,7 @@ public class HdfsStorage extends BlobStorage {
@Override
public void setProperties(Map<String, String> properties) {
super.setProperties(properties);
- caseInsensitiveProperties.putAll(properties);
+ hdfsProperties.putAll(properties);
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
index 868f032a44..5e0a5fe874 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HdfsResource.java
@@ -27,8 +27,6 @@ import com.google.common.collect.Maps;
import com.google.gson.annotations.SerializedName;
import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
import java.util.Map;
/**
@@ -56,7 +54,6 @@ public class HdfsResource extends Resource {
public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
public static String HADOOP_SHORT_CIRCUIT = "dfs.client.read.shortcircuit";
public static String HADOOP_SOCKET_PATH = "dfs.domain.socket.path";
- public static List<String> REQUIRED_FIELDS = Collections.singletonList(HADOOP_FS_NAME);
@SerializedName(value = "properties")
private Map<String, String> properties;
@@ -75,11 +72,6 @@ public class HdfsResource extends Resource {
@Override
protected void setProperties(Map<String, String> properties) throws DdlException {
- for (String field : REQUIRED_FIELDS) {
- if (!properties.containsKey(field)) {
- throw new DdlException("Missing [" + field + "] in properties.");
- }
- }
// `dfs.client.read.shortcircuit` and `dfs.domain.socket.path` should be both set to enable short circuit read.
// We should disable short circuit read if they are not both set because it will cause performance down.
if (!properties.containsKey(HADOOP_SHORT_CIRCUIT) || !properties.containsKey(HADOOP_SOCKET_PATH)) {
diff --git a/regression-test/data/external_catalog_p0/hive/test_hive_other.out b/regression-test/data/external_catalog_p0/hive/test_hive_other.out
index 86af4c14ec..8d44514ed7 100644
--- a/regression-test/data/external_catalog_p0/hive/test_hive_other.out
+++ b/regression-test/data/external_catalog_p0/hive/test_hive_other.out
@@ -671,3 +671,11 @@ zyLjAtVdXV GrJRf8WvRR
2022-11-25 2022-11-25 zj9uWRywHa 5F8hzYcY8G 2022-11-25
2022-11-25 2022-11-25 zvs3b72ERY zorbigHkYB 2022-11-25
+-- !student --
+124 lisi 13 f abcdefh 13056781234
+123 zhangsan 12 m abcdefg 13012345678
+
+-- !tvf_student --
+124 lisi 13 f abcdefh 13056781234
+123 zhangsan 12 m abcdefg 13012345678
+
diff --git a/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy b/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy
index 82b6bb1cfc..f8401d70e9 100644
--- a/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy
+++ b/regression-test/suites/external_catalog_p0/hive/test_hive_other.groovy
@@ -77,6 +77,7 @@ suite("test_hive_other", "p0") {
String enabled = context.config.otherConfigs.get("enableHiveTest")
if (enabled != null && enabled.equalsIgnoreCase("true")) {
String hms_port = context.config.otherConfigs.get("hms_port")
+ String hdfs_port = context.config.otherConfigs.get("hdfs_port")
String catalog_name = "hive_test_other"
set_be_config.call()
@@ -100,7 +101,51 @@ suite("test_hive_other", "p0") {
// order_qt_show_tables2 """show tables"""
q01()
sql """refresh table `default`.table_with_vertical_line"""
- order_qt_after_refresh """ select dt, dt, k2, k5, dt from table_with_vertical_line where dt in ('2022-11-25') or dt in ('2022-11-24') order by k2 desc limit 10;"""
+ order_qt_after_refresh """ select dt, dt, k2, k5, dt from table_with_vertical_line where dt in ('2022-11-25') or dt in ('2022-11-24') order by k2 desc limit 10;"""
+
+ // external table
+ sql """switch internal"""
+ sql """drop database if exists external_hive_table_test"""
+ sql """create database external_hive_table_test"""
+ sql """use external_hive_table_test"""
+ sql """drop table if exists external_hive_student"""
+
+ sql """
+ create external table `external_hive_student` (
+ `id` varchar(100),
+ `name` varchar(100),
+ `age` int,
+ `gender` varchar(100),
+ `addr` varchar(100),
+ `phone` varchar(100)
+ ) ENGINE=HIVE
+ PROPERTIES
+ (
+ 'hive.metastore.uris' = 'thrift://127.0.0.1:${hms_port}',
+ 'database' = 'default',
+ 'table' = 'student'
+ );
+ """
+ qt_student """select * from external_hive_student order by name;"""
+
+ // read external table
+ String csv_output_dir = UUID.randomUUID().toString()
+ sql """
+ select * from external_hive_student
+ into outfile "hdfs://127.0.0.1:${hdfs_port}/user/test/student/${csv_output_dir}/csv_"
+ format as csv_with_names
+ properties (
+ "column_separator" = ",",
+ "line_delimiter" = "\n"
+ );
+ """
+ qt_tvf_student """
+ select * from hdfs (
+ "format" = "csv_with_names",
+ "fs.defaultFS" = "hdfs://127.0.0.1:${hdfs_port}",
+ "uri" = "hdfs://127.0.0.1:${hdfs_port}/user/test/student/${csv_output_dir}/csv_*"
+ ) order by name;
+ """
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org