You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/15 08:09:09 UTC
[incubator-doris] 01/02: [feature] Support read hive external table and outfile into HDFS that authenticated by kerberos (#9579)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit a0256f3ebd8412a51004d5beba4dfa228c1658c9
Author: morningman <mo...@163.com>
AuthorDate: Wed Jun 15 16:06:29 2022 +0800
[feature] Support read hive external table and outfile into HDFS that authenticated by kerberos (#9579)
At present, Doris can only access the hadoop cluster with kerberos authentication enabled by broker, but Doris BE itself
does not supports access to a kerberos-authenticated HDFS file.
This PR hope solve the problem.
When create hive external table, users just specify following properties to access the hdfs data with kerberos authentication enabled:
```sql
CREATE EXTERNAL TABLE t_hive (
k1 int NOT NULL COMMENT "",
k2 char(10) NOT NULL COMMENT "",
k3 datetime NOT NULL COMMENT "",
k5 varchar(20) NOT NULL COMMENT "",
k6 double NOT NULL COMMENT ""
) ENGINE=HIVE
COMMENT "HIVE"
PROPERTIES (
'hive.metastore.uris' = 'thrift://192.168.0.1:9083',
'database' = 'hive_db',
'table' = 'hive_table',
'dfs.nameservices'='hacluster',
'dfs.ha.namenodes.hacluster'='n1,n2',
'dfs.namenode.rpc-address.hacluster.n1'='192.168.0.1:8020',
'dfs.namenode.rpc-address.hacluster.n2'='192.168.0.2:8020',
'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider',
'dfs.namenode.kerberos.principal'='hadoop/_HOST@REALM.COM'
'hadoop.security.authentication'='kerberos',
'hadoop.kerberos.principal'='doris_test@REALM.COM',
'hadoop.kerberos.keytab'='/path/to/doris_test.keytab'
);
```
If you want to `select into outfile` to HDFS that kerberos authentication enable, you can refer to the following SQL statement:
```sql
select * from test into outfile "hdfs://tmp/outfile1"
format as csv
properties
(
'fs.defaultFS'='hdfs://hacluster/',
'dfs.nameservices'='hacluster',
'dfs.ha.namenodes.hacluster'='n1,n2',
'dfs.namenode.rpc-address.hacluster.n1'='192.168.0.1:8020',
'dfs.namenode.rpc-address.hacluster.n2'='192.168.0.2:8020',
'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider',
'dfs.namenode.kerberos.principal'='hadoop/_HOST@REALM.COM'
'hadoop.security.authentication'='kerberos',
'hadoop.kerberos.principal'='doris_test@REALM.COM',
'hadoop.kerberos.keytab'='/path/to/doris_test.keytab'
);
---
be/src/exec/CMakeLists.txt | 1 +
be/src/exec/hdfs_builder.cpp | 78 ++++++++++++++++++++
be/src/exec/hdfs_builder.h | 47 ++++++++++++
be/src/exec/hdfs_file_reader.cpp | 31 ++------
be/src/exec/hdfs_file_reader.h | 6 +-
be/src/exec/hdfs_writer.cpp | 85 +++++++++-------------
be/src/exec/hdfs_writer.h | 12 ++-
.../org/apache/doris/analysis/OutFileClause.java | 14 +++-
.../java/org/apache/doris/catalog/AuthType.java | 60 +++++++++++++++
.../doris/catalog/HiveMetaStoreClientHelper.java | 13 ++++
.../java/org/apache/doris/catalog/HiveTable.java | 48 ++++++++++--
.../org/apache/doris/common/util/BrokerUtil.java | 57 +++++++++------
.../org/apache/doris/planner/BrokerScanNode.java | 3 +
.../org/apache/doris/planner/HiveScanNode.java | 2 +-
.../org/apache/doris/catalog/HiveTableTest.java | 4 +-
gensrc/thrift/PlanNodes.thrift | 7 +-
16 files changed, 340 insertions(+), 128 deletions(-)
diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 7940c8ac61..e09169c000 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -105,6 +105,7 @@ set(EXEC_FILES
hdfs_reader_writer.cpp
hdfs_file_reader.cpp
hdfs_writer.cpp
+ hdfs_builder.cpp
)
if (WITH_MYSQL)
diff --git a/be/src/exec/hdfs_builder.cpp b/be/src/exec/hdfs_builder.cpp
new file mode 100644
index 0000000000..5940958050
--- /dev/null
+++ b/be/src/exec/hdfs_builder.cpp
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/hdfs_builder.h"
+
+#include <fmt/format.h>
+
+#include <fstream>
+
+#include "agent/utils.h"
+#include "common/logging.h"
+#include "util/uid_util.h"
+#include "util/url_coding.h"
+namespace doris {
+
+const std::string TICKET_CACHE_PATH = "/tmp/krb5cc_doris_";
+
+Status HDFSCommonBuilder::run_kinit() {
+ if (hdfs_kerberos_principal.empty() || hdfs_kerberos_keytab.empty()) {
+ return Status::InvalidArgument("Invalid hdfs_kerberos_principal or hdfs_kerberos_keytab");
+ }
+ std::string ticket_path = TICKET_CACHE_PATH + generate_uuid_string();
+ fmt::memory_buffer kinit_command;
+ fmt::format_to(kinit_command, "kinit -c {} -R -t {} -k {}", ticket_path, hdfs_kerberos_keytab,
+ hdfs_kerberos_principal);
+ VLOG_NOTICE << "kinit command: " << fmt::to_string(kinit_command);
+ std::string msg;
+ AgentUtils util;
+ bool rc = util.exec_cmd(fmt::to_string(kinit_command), &msg);
+ if (!rc) {
+ return Status::InternalError("Kinit failed, errMsg: " + msg);
+ }
+ hdfsBuilderSetKerbTicketCachePath(hdfs_builder, ticket_path.c_str());
+ return Status::OK();
+}
+
+HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams) {
+ HDFSCommonBuilder builder;
+ hdfsBuilderSetNameNode(builder.get(), hdfsParams.fs_name.c_str());
+ // set hdfs user
+ if (hdfsParams.__isset.user) {
+ hdfsBuilderSetUserName(builder.get(), hdfsParams.user.c_str());
+ }
+ // set kerberos conf
+ if (hdfsParams.__isset.hdfs_kerberos_principal) {
+ builder.need_kinit = true;
+ builder.hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal;
+ hdfsBuilderSetPrincipal(builder.get(), hdfsParams.hdfs_kerberos_principal.c_str());
+ }
+ if (hdfsParams.__isset.hdfs_kerberos_keytab) {
+ builder.need_kinit = true;
+ builder.hdfs_kerberos_keytab = hdfsParams.hdfs_kerberos_keytab;
+ }
+ // set other conf
+ if (hdfsParams.__isset.hdfs_conf) {
+ for (const THdfsConf& conf : hdfsParams.hdfs_conf) {
+ hdfsBuilderConfSetStr(builder.get(), conf.key.c_str(), conf.value.c_str());
+ }
+ }
+
+ return builder;
+}
+
+} // namespace doris
diff --git a/be/src/exec/hdfs_builder.h b/be/src/exec/hdfs_builder.h
new file mode 100644
index 0000000000..70ac723f1c
--- /dev/null
+++ b/be/src/exec/hdfs_builder.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <hdfs/hdfs.h>
+
+#include "gen_cpp/PlanNodes_types.h"
+#include "exec/file_reader.h"
+
+namespace doris {
+
+class HDFSCommonBuilder {
+ friend HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams);
+
+public:
+ HDFSCommonBuilder() : hdfs_builder(hdfsNewBuilder()) {};
+ ~HDFSCommonBuilder() { hdfsFreeBuilder(hdfs_builder); };
+
+ hdfsBuilder* get() { return hdfs_builder; };
+ bool is_need_kinit() { return need_kinit; };
+ Status run_kinit();
+
+private:
+ hdfsBuilder* hdfs_builder;
+ bool need_kinit {false};
+ std::string hdfs_kerberos_keytab;
+ std::string hdfs_kerberos_principal;
+};
+
+HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams);
+
+} // namespace doris
diff --git a/be/src/exec/hdfs_file_reader.cpp b/be/src/exec/hdfs_file_reader.cpp
index a047df6236..b042018e55 100644
--- a/be/src/exec/hdfs_file_reader.cpp
+++ b/be/src/exec/hdfs_file_reader.cpp
@@ -31,7 +31,8 @@ HdfsFileReader::HdfsFileReader(const THdfsParams& hdfs_params, const std::string
_current_offset(start_offset),
_file_size(-1),
_hdfs_fs(nullptr),
- _hdfs_file(nullptr) {
+ _hdfs_file(nullptr),
+ _builder(createHDFSBuilder(_hdfs_params)) {
_namenode = _hdfs_params.fs_name;
}
@@ -40,32 +41,10 @@ HdfsFileReader::~HdfsFileReader() {
}
Status HdfsFileReader::connect() {
- hdfsBuilder* hdfs_builder = hdfsNewBuilder();
- hdfsBuilderSetNameNode(hdfs_builder, _namenode.c_str());
- // set hdfs user
- if (_hdfs_params.__isset.user) {
- hdfsBuilderSetUserName(hdfs_builder, _hdfs_params.user.c_str());
+ if (_builder.is_need_kinit()) {
+ RETURN_IF_ERROR(_builder.run_kinit());
}
- // set kerberos conf
- if (_hdfs_params.__isset.kerb_principal) {
- hdfsBuilderSetPrincipal(hdfs_builder, _hdfs_params.kerb_principal.c_str());
- }
- if (_hdfs_params.__isset.kerb_ticket_cache_path) {
- hdfsBuilderSetKerbTicketCachePath(hdfs_builder,
- _hdfs_params.kerb_ticket_cache_path.c_str());
- }
- // set token
- if (_hdfs_params.__isset.token) {
- hdfsBuilderSetToken(hdfs_builder, _hdfs_params.token.c_str());
- }
- // set other conf
- if (_hdfs_params.__isset.hdfs_conf) {
- for (const THdfsConf& conf : _hdfs_params.hdfs_conf) {
- hdfsBuilderConfSetStr(hdfs_builder, conf.key.c_str(), conf.value.c_str());
- }
- }
- _hdfs_fs = hdfsBuilderConnect(hdfs_builder);
- hdfsFreeBuilder(hdfs_builder);
+ _hdfs_fs = hdfsBuilderConnect(_builder.get());
if (_hdfs_fs == nullptr) {
std::stringstream ss;
ss << "connect failed. " << _namenode;
diff --git a/be/src/exec/hdfs_file_reader.h b/be/src/exec/hdfs_file_reader.h
index d4430de3e2..e81af8a186 100644
--- a/be/src/exec/hdfs_file_reader.h
+++ b/be/src/exec/hdfs_file_reader.h
@@ -17,10 +17,9 @@
#pragma once
-#include <hdfs/hdfs.h>
-
-#include "exec/file_reader.h"
#include "gen_cpp/PlanNodes_types.h"
+#include "exec/file_reader.h"
+#include "exec/hdfs_builder.h"
namespace doris {
@@ -56,6 +55,7 @@ private:
int64_t _file_size;
hdfsFS _hdfs_fs;
hdfsFile _hdfs_file;
+ HDFSCommonBuilder _builder;
};
} // namespace doris
diff --git a/be/src/exec/hdfs_writer.cpp b/be/src/exec/hdfs_writer.cpp
index 8aaae7e78e..b45fbe4449 100644
--- a/be/src/exec/hdfs_writer.cpp
+++ b/be/src/exec/hdfs_writer.cpp
@@ -24,23 +24,32 @@
namespace doris {
const static std::string FS_KEY = "fs.defaultFS";
-const static std::string USER = "hdfs_user";
-const static std::string KERBEROS_PRINCIPAL = "kerberos_principal";
-const static std::string KERB_TICKET_CACHE_PATH = "kerb_ticket_cache_path";
+const static std::string USER = "hadoop.username";
+const static std::string KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
+const static std::string KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
const static std::string TOKEN = "token";
HDFSWriter::HDFSWriter(std::map<std::string, std::string>& properties, const std::string& path)
: _properties(properties),
_path(path),
- _hdfs_fs(nullptr) {
- _parse_properties(_properties);
-}
+ _hdfs_fs(nullptr),
+ _hdfs_params(_parse_properties(_properties)),
+ _builder(createHDFSBuilder(_hdfs_params)) {}
HDFSWriter::~HDFSWriter() {
close();
}
Status HDFSWriter::open() {
+ if (_namenode.empty()) {
+ LOG(WARNING) << "hdfs properties is incorrect.";
+ return Status::InternalError("hdfs properties is incorrect");
+ }
+ // if the format of _path is hdfs://ip:port/path, replace it to /path.
+ // path like hdfs://ip:port/path can't be used by libhdfs3.
+ if (_path.find(_namenode) != _path.npos) {
+ _path = _path.substr(_namenode.size());
+ }
RETURN_IF_ERROR(_connect());
if (_hdfs_fs == nullptr) {
return Status::InternalError("HDFS writer open without client");
@@ -130,31 +139,10 @@ Status HDFSWriter::close() {
}
Status HDFSWriter::_connect() {
- hdfsBuilder* hdfs_builder = hdfsNewBuilder();
- hdfsBuilderSetNameNode(hdfs_builder, _namenode.c_str());
- // set hdfs user
- if (!_user.empty()) {
- hdfsBuilderSetUserName(hdfs_builder, _user.c_str());
- }
- // set kerberos conf
- if (!_kerb_principal.empty()) {
- hdfsBuilderSetPrincipal(hdfs_builder, _kerb_principal.c_str());
- }
- if (!_kerb_ticket_cache_path.empty()) {
- hdfsBuilderSetKerbTicketCachePath(hdfs_builder, _kerb_ticket_cache_path.c_str());
- }
- // set token
- if (!_token.empty()) {
- hdfsBuilderSetToken(hdfs_builder, _token.c_str());
- }
- // set other conf
- if (!_properties.empty()) {
- std::map<std::string, std::string>::iterator iter;
- for (iter = _properties.begin(); iter != _properties.end(); ++iter) {
- hdfsBuilderConfSetStr(hdfs_builder, iter->first.c_str(), iter->second.c_str());
- }
+ if (_builder.is_need_kinit()) {
+ RETURN_IF_ERROR(_builder.run_kinit());
}
- _hdfs_fs = hdfsBuilderConnect(hdfs_builder);
+ _hdfs_fs = hdfsBuilderConnect(_builder.get());
if (_hdfs_fs == nullptr) {
std::stringstream ss;
ss << "connect failed. namenode:" << _namenode;
@@ -163,41 +151,34 @@ Status HDFSWriter::_connect() {
return Status::OK();
}
-Status HDFSWriter::_parse_properties(std::map<std::string, std::string>& prop) {
+THdfsParams HDFSWriter::_parse_properties(std::map<std::string, std::string>& prop) {
std::map<std::string, std::string>::iterator iter;
+ std::vector<THdfsConf> hdfs_configs;
+ THdfsParams hdfsParams;
for (iter = prop.begin(); iter != prop.end();) {
if (iter->first.compare(FS_KEY) == 0) {
_namenode = iter->second;
+ hdfsParams.__set_fs_name(_namenode);
iter = prop.erase(iter);
} else if (iter->first.compare(USER) == 0) {
- _user = iter->second;
+ hdfsParams.__set_user(iter->second);
iter = prop.erase(iter);
} else if (iter->first.compare(KERBEROS_PRINCIPAL) == 0) {
- _kerb_principal = iter->second;
+ hdfsParams.__set_hdfs_kerberos_principal(iter->second);
iter = prop.erase(iter);
- } else if (iter->first.compare(KERB_TICKET_CACHE_PATH) == 0) {
- _kerb_ticket_cache_path = iter->second;
- iter = prop.erase(iter);
- } else if (iter->first.compare(TOKEN) == 0) {
- _token = iter->second;
+ } else if (iter->first.compare(KERBEROS_KEYTAB) == 0) {
+ hdfsParams.__set_hdfs_kerberos_keytab(iter->second);
iter = prop.erase(iter);
} else {
- ++iter;
+ THdfsConf item;
+ item.key = iter->first;
+ item.value = iter->second;
+ hdfs_configs.push_back(item);
+ iter = prop.erase(iter);
}
}
-
- if (_namenode.empty()) {
- LOG(WARNING) << "hdfs properties is incorrect.";
- return Status::InternalError("hdfs properties is incorrect");
- }
-
- // if the format of _path is hdfs://ip:port/path, replace it to /path.
- // path like hdfs://ip:port/path can't be used by libhdfs3.
- if (_path.find(_namenode) != _path.npos) {
- _path = _path.substr(_namenode.size());
- }
-
- return Status::OK();
+ hdfsParams.__set_hdfs_conf(hdfs_configs);
+ return hdfsParams;
}
}// end namespace doris
diff --git a/be/src/exec/hdfs_writer.h b/be/src/exec/hdfs_writer.h
index a3f17ec166..2b99bacb5e 100644
--- a/be/src/exec/hdfs_writer.h
+++ b/be/src/exec/hdfs_writer.h
@@ -17,12 +17,12 @@
#pragma once
-#include <hdfs/hdfs.h>
-
#include <map>
#include <string>
+#include "gen_cpp/PlanNodes_types.h"
#include "exec/file_writer.h"
+#include "exec/hdfs_builder.h"
namespace doris {
class HDFSWriter : public FileWriter {
@@ -40,18 +40,16 @@ public:
private:
Status _connect();
- Status _parse_properties(std::map<std::string, std::string>& prop);
+ THdfsParams _parse_properties(std::map<std::string, std::string>& prop);
std::map<std::string, std::string> _properties;
- std::string _user = "";
std::string _namenode = "";
std::string _path = "";
- std::string _kerb_principal = "";
- std::string _kerb_ticket_cache_path = "";
- std::string _token = "";
hdfsFS _hdfs_fs = nullptr;
hdfsFile _hdfs_file = nullptr;
bool _closed = false;
+ THdfsParams _hdfs_params;
+ HDFSCommonBuilder _builder;
};
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index c82cd1ab62..1817bf7d2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.Config;
import org.apache.doris.common.FeNameFormat;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.common.util.ParseUtil;
import org.apache.doris.common.util.PrintableMap;
import org.apache.doris.qe.ConnectContext;
@@ -87,7 +88,8 @@ public class OutFileClause {
public static final String LOCAL_FILE_PREFIX = "file:///";
private static final String S3_FILE_PREFIX = "S3://";
private static final String HDFS_FILE_PREFIX = "hdfs://";
- private static final String HDFS_PROP_PREFIX = "hdfs.";
+ private static final String HADOOP_FS_PROP_PREFIX = "dfs.";
+ private static final String HADOOP_PROP_PREFIX = "hadoop.";
private static final String BROKER_PROP_PREFIX = "broker.";
private static final String PROP_BROKER_NAME = "broker.name";
private static final String PROP_COLUMN_SEPARATOR = "column_separator";
@@ -416,9 +418,13 @@ public class OutFileClause {
} else if (entry.getKey().toUpperCase().startsWith(S3Storage.S3_PROPERTIES_PREFIX)) {
brokerProps.put(entry.getKey(), entry.getValue());
processedPropKeys.add(entry.getKey());
- } else if (entry.getKey().startsWith(HDFS_PROP_PREFIX)
- && storageType == StorageBackend.StorageType.HDFS) {
- brokerProps.put(entry.getKey().substring(HDFS_PROP_PREFIX.length()), entry.getValue());
+ } else if (entry.getKey().contains(BrokerUtil.HADOOP_FS_NAME)
+ && storageType == StorageBackend.StorageType.HDFS) {
+ brokerProps.put(entry.getKey(), entry.getValue());
+ processedPropKeys.add(entry.getKey());
+ } else if ((entry.getKey().startsWith(HADOOP_FS_PROP_PREFIX) || entry.getKey().startsWith(HADOOP_PROP_PREFIX))
+ && storageType == StorageBackend.StorageType.HDFS) {
+ brokerProps.put(entry.getKey(), entry.getValue());
processedPropKeys.add(entry.getKey());
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthType.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthType.java
new file mode 100644
index 0000000000..c0c97530a0
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthType.java
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+/**
+ * Define different auth type for external table such as hive/iceberg,
+ * so that BE could call secured under fileStorageSystem (enable kerberos)
+ */
+public enum AuthType {
+ SIMPLE(0, "simple"),
+ KERBEROS(1, "kerberos");
+
+ private int code;
+ private String desc;
+
+ AuthType(int code, String desc) {
+ this.code = code;
+ this.desc = desc;
+ }
+
+ public static boolean isSupportedAuthType(String authType) {
+ for (AuthType auth : values()) {
+ if (auth.getDesc().equals(authType)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public int getCode() {
+ return code;
+ }
+
+ public void setCode(int code) {
+ this.code = code;
+ }
+
+ public String getDesc() {
+ return desc;
+ }
+
+ public void setDesc(String desc) {
+ this.desc = desc;
+ }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index c378eecf24..f192df298b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -31,6 +31,7 @@ import org.apache.doris.analysis.NullLiteral;
import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.StringLiteral;
import org.apache.doris.common.DdlException;
+import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TExprOpcode;
@@ -54,6 +55,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
@@ -245,14 +247,25 @@ public class HiveMetaStoreClientHelper {
private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(Table table, Map<String, String> properties) throws DdlException {
List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
Configuration configuration = new Configuration(false);
+ boolean isSecurityEnabled = false;
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
configuration.set(entry.getKey(), entry.getValue());
}
+ if (entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION)
+ && entry.getValue().equals(AuthType.KERBEROS.getDesc())) {
+ isSecurityEnabled = true;
+ }
}
String location = table.getSd().getLocation();
org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(location);
try {
+ if (isSecurityEnabled) {
+ UserGroupInformation.setConfiguration(configuration);
+ // login user from keytab
+ UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
+ properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
+ }
FileSystem fileSystem = path.getFileSystem(configuration);
iterators.add(fileSystem.listLocatedStatus(path));
} catch (IOException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
index d418e47307..19be317f12 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
@@ -19,6 +19,7 @@ package org.apache.doris.catalog;
import org.apache.doris.common.DdlException;
import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.BrokerUtil;
import org.apache.doris.thrift.THiveTable;
import org.apache.doris.thrift.TTableDescriptor;
import org.apache.doris.thrift.TTableType;
@@ -39,6 +40,7 @@ import java.util.Map;
*/
public class HiveTable extends Table {
private static final String PROPERTY_MISSING_MSG = "Hive %s is null. Please add properties('%s'='xxx') when create table";
+ private static final String PROPERTY_ERROR_MSG = "Hive table properties('%s'='%s') is illegal or not supported. Please check it";
private static final String HIVE_DB = "database";
private static final String HIVE_TABLE = "table";
@@ -77,7 +79,7 @@ public class HiveTable extends Table {
private void validate(Map<String, String> properties) throws DdlException {
if (properties == null) {
throw new DdlException("Please set properties of hive table, "
- + "they are: database, table and 'hive.metastore.uris'");
+ + "they are: database, table and 'hive.metastore.uris'");
}
Map<String, String> copiedProps = Maps.newHashMap(properties);
@@ -94,14 +96,48 @@ public class HiveTable extends Table {
copiedProps.remove(HIVE_TABLE);
// check hive properties
- // hive.metastore.uris
- String hiveMetastoreUris = copiedProps.get(HIVE_METASTORE_URIS);
- if (Strings.isNullOrEmpty(hiveMetastoreUris)) {
+ // hive.metastore.uris
+ String hiveMetaStoreUris = copiedProps.get(HIVE_METASTORE_URIS);
+ if (Strings.isNullOrEmpty(hiveMetaStoreUris)) {
throw new DdlException(String.format(PROPERTY_MISSING_MSG, HIVE_METASTORE_URIS, HIVE_METASTORE_URIS));
}
copiedProps.remove(HIVE_METASTORE_URIS);
- hiveProperties.put(HIVE_METASTORE_URIS, hiveMetastoreUris);
+ hiveProperties.put(HIVE_METASTORE_URIS, hiveMetaStoreUris);
+ // check auth type
+ String authType = copiedProps.get(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION);
+ if (Strings.isNullOrEmpty(authType)) {
+ authType = AuthType.SIMPLE.getDesc();
+ }
+ if (!AuthType.isSupportedAuthType(authType)) {
+ throw new DdlException(String.format(PROPERTY_ERROR_MSG, BrokerUtil.HADOOP_SECURITY_AUTHENTICATION, authType));
+ }
+ copiedProps.remove(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION);
+ hiveProperties.put(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION, authType);
+
+ if (AuthType.KERBEROS.getDesc().equals(authType)) {
+ // check principal
+ String principal = copiedProps.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL);
+ if (Strings.isNullOrEmpty(principal)) {
+ throw new DdlException(String.format(PROPERTY_MISSING_MSG, BrokerUtil.HADOOP_KERBEROS_PRINCIPAL, BrokerUtil.HADOOP_KERBEROS_PRINCIPAL));
+ }
+ hiveProperties.put(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL, principal);
+ copiedProps.remove(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL);
+ // check keytab
+ String keytabPath = copiedProps.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB);
+ if (Strings.isNullOrEmpty(keytabPath)) {
+ throw new DdlException(String.format(PROPERTY_MISSING_MSG, BrokerUtil.HADOOP_KERBEROS_KEYTAB, BrokerUtil.HADOOP_KERBEROS_KEYTAB));
+ }
+ if (!Strings.isNullOrEmpty(keytabPath)) {
+ hiveProperties.put(BrokerUtil.HADOOP_KERBEROS_KEYTAB, keytabPath);
+ copiedProps.remove(BrokerUtil.HADOOP_KERBEROS_KEYTAB);
+ }
+ }
+ String HDFSUserName = copiedProps.get(BrokerUtil.HADOOP_USER_NAME);
+ if (!Strings.isNullOrEmpty(HDFSUserName)) {
+ hiveProperties.put(BrokerUtil.HADOOP_USER_NAME, HDFSUserName);
+ copiedProps.remove(BrokerUtil.HADOOP_USER_NAME);
+ }
if (!copiedProps.isEmpty()) {
Iterator<Map.Entry<String, String>> iter = copiedProps.entrySet().iterator();
while(iter.hasNext()) {
@@ -148,7 +184,7 @@ public class HiveTable extends Table {
public TTableDescriptor toThrift() {
THiveTable tHiveTable = new THiveTable(getHiveDb(), getHiveTable(), getHiveProperties());
TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE,
- fullSchema.size(), 0, getName(), "");
+ fullSchema.size(), 0, getName(), "");
tTableDescriptor.setHiveTable(tHiveTable);
return tTableDescriptor;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index 9e170e47b7..c10256e69b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.StorageBackend;
import org.apache.doris.backup.RemoteFile;
import org.apache.doris.backup.S3Storage;
import org.apache.doris.backup.Status;
+import org.apache.doris.catalog.AuthType;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.FsBroker;
import org.apache.doris.common.AnalysisException;
@@ -64,6 +65,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.thrift.TException;
@@ -82,27 +84,26 @@ import java.util.Map;
public class BrokerUtil {
private static final Logger LOG = LogManager.getLogger(BrokerUtil.class);
- private static int READ_BUFFER_SIZE_B = 1024 * 1024;
- private static String HDFS_FS_KEY = "fs.defaultFS";
- private static String HDFS_USER_KEY = "hdfs_user";
- private static String HDFS_KERB_PRINCIPAL = "kerb_principal";
- private static String HDFS_KERB_TICKET_CACHE_PATH = "kerb_ticket_cache_path";
- private static String HDFS_KERB_TOKEN = "kerb_token";
+ private static final int READ_BUFFER_SIZE_B = 1024 * 1024;
+ public static String HADOOP_FS_NAME = "fs.defaultFS";
+ // simple or kerberos
+ public static String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
+ public static String HADOOP_USER_NAME = "hadoop.username";
+ public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
+ public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
public static void generateHdfsParam(Map<String, String> properties, TBrokerRangeDesc rangeDesc) {
rangeDesc.setHdfsParams(new THdfsParams());
rangeDesc.hdfs_params.setHdfsConf(new ArrayList<>());
for (Map.Entry<String, String> property : properties.entrySet()) {
- if (property.getKey().equalsIgnoreCase(HDFS_FS_KEY)) {
+ if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) {
rangeDesc.hdfs_params.setFsName(property.getValue());
- } else if (property.getKey().equalsIgnoreCase(HDFS_USER_KEY)) {
+ } else if (property.getKey().equalsIgnoreCase(HADOOP_USER_NAME)) {
rangeDesc.hdfs_params.setUser(property.getValue());
- } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_PRINCIPAL)) {
- rangeDesc.hdfs_params.setKerbPrincipal(property.getValue());
- } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_TICKET_CACHE_PATH)) {
- rangeDesc.hdfs_params.setKerbTicketCachePath(property.getValue());
- } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_TOKEN)) {
- rangeDesc.hdfs_params.setToken(property.getValue());
+ } else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_PRINCIPAL)) {
+ rangeDesc.hdfs_params.setHdfsKerberosPrincipal(property.getValue());
+ } else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_KEYTAB)) {
+ rangeDesc.hdfs_params.setHdfsKerberosKeytab(property.getValue());
} else {
THdfsConf hdfsConf = new THdfsConf();
hdfsConf.setKey(property.getKey());
@@ -171,27 +172,35 @@ public class BrokerUtil {
}
}
} else if (brokerDesc.getStorageType() == StorageBackend.StorageType.HDFS) {
- if (!brokerDesc.getProperties().containsKey(HDFS_FS_KEY)
- || !brokerDesc.getProperties().containsKey(HDFS_USER_KEY)) {
+ if (!brokerDesc.getProperties().containsKey(HADOOP_FS_NAME)
+ || !brokerDesc.getProperties().containsKey(HADOOP_USER_NAME)) {
throw new UserException(String.format(
- "The properties of hdfs is invalid. %s and %s are needed", HDFS_FS_KEY, HDFS_USER_KEY));
+ "The properties of hdfs is invalid. %s and %s are needed", HADOOP_FS_NAME, HADOOP_USER_NAME));
}
- String hdfsFsName = brokerDesc.getProperties().get(HDFS_FS_KEY);
- String user = brokerDesc.getProperties().get(HDFS_USER_KEY);
+ String fsName = brokerDesc.getProperties().get(HADOOP_FS_NAME);
+ String userName = brokerDesc.getProperties().get(HADOOP_USER_NAME);
Configuration conf = new Configuration();
+ boolean isSecurityEnabled = false;
for (Map.Entry<String, String> propEntry : brokerDesc.getProperties().entrySet()) {
- if (propEntry.getKey().equals(HDFS_FS_KEY) || propEntry.getKey().equals(HDFS_USER_KEY)) {
- continue;
- }
conf.set(propEntry.getKey(), propEntry.getValue());
+ if (propEntry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION)
+ && propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) {
+ isSecurityEnabled = true;
+ }
}
try {
- FileSystem fs = FileSystem.get(new URI(hdfsFsName), conf, user);
+ if (isSecurityEnabled) {
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation.loginUserFromKeytab(
+ brokerDesc.getProperties().get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
+ brokerDesc.getProperties().get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
+ }
+ FileSystem fs = FileSystem.get(new URI(fsName), conf, userName);
FileStatus[] statusList = fs.globStatus(new Path(path));
for (FileStatus status : statusList) {
if (status.isFile()) {
fileStatuses.add(new TBrokerFileStatus(status.getPath().toUri().getPath(),
- status.isDirectory(), status.getLen(), status.isFile()));
+ status.isDirectory(), status.getLen(), status.isFile()));
}
}
} catch (IOException | InterruptedException | URISyntaxException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index 417b91e145..187646e1d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -68,6 +68,9 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
index a67f5394ca..711e63e695 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
@@ -179,7 +179,7 @@ public class HiveScanNode extends BrokerScanNode {
}
List<TBrokerFileStatus> fileStatuses = new ArrayList<>();
this.hdfsUri = HiveMetaStoreClientHelper.getHiveDataFiles(hiveTable, hivePartitionPredicate,
- fileStatuses, remoteHiveTable);
+ fileStatuses, remoteHiveTable);
fileStatusesList.add(fileStatuses);
filesAdded += fileStatuses.size();
for (TBrokerFileStatus fstatus : fileStatuses) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/HiveTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/HiveTableTest.java
index 00b42a4ec0..c7dad72037 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/HiveTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/HiveTableTest.java
@@ -21,6 +21,7 @@ import org.apache.doris.common.DdlException;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -53,7 +54,8 @@ public class HiveTableTest {
public void testNormal() throws DdlException {
HiveTable table = new HiveTable(1000, "hive_table", columns, properties);
Assert.assertEquals(String.format("%s.%s", hiveDb, hiveTable), table.getHiveDbTable());
- Assert.assertEquals(1, table.getHiveProperties().size());
+ // HiveProperties={hadoop.security.authentication=simple, hive.metastore.uris=thrift://127.0.0.1:9083}
+ Assert.assertEquals(2, table.getHiveProperties().size());
}
@Test(expected = DdlException.class)
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index c46b0f3c24..9d9124b056 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -118,10 +118,9 @@ struct THdfsConf {
struct THdfsParams {
1: optional string fs_name
2: optional string user
- 3: optional string kerb_principal
- 4: optional string kerb_ticket_cache_path
- 5: optional string token
- 6: optional list<THdfsConf> hdfs_conf
+ 3: optional string hdfs_kerberos_principal
+ 4: optional string hdfs_kerberos_keytab
+ 5: optional list<THdfsConf> hdfs_conf
}
// One broker range information.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org