You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/15 08:09:09 UTC

[incubator-doris] 01/02: [feature] Support read hive external table and outfile into HDFS that authenticated by kerberos (#9579)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit a0256f3ebd8412a51004d5beba4dfa228c1658c9
Author: morningman <mo...@163.com>
AuthorDate: Wed Jun 15 16:06:29 2022 +0800

    [feature] Support read hive external table and outfile into HDFS that authenticated by kerberos (#9579)
    
    At present, Doris can only access the hadoop cluster with kerberos authentication enabled by broker, but Doris BE itself
    does not supports access to a kerberos-authenticated HDFS file.
    
    This PR hope solve the problem.
    
    When create hive external table, users just specify following properties to access the hdfs data with kerberos authentication enabled:
    
    ```sql
    CREATE EXTERNAL TABLE t_hive (
    k1 int NOT NULL COMMENT "",
    k2 char(10) NOT NULL COMMENT "",
    k3 datetime NOT NULL COMMENT "",
    k5 varchar(20) NOT NULL COMMENT "",
    k6 double NOT NULL COMMENT ""
    ) ENGINE=HIVE
    COMMENT "HIVE"
    PROPERTIES (
    'hive.metastore.uris' = 'thrift://192.168.0.1:9083',
    'database' = 'hive_db',
    'table' = 'hive_table',
    'dfs.nameservices'='hacluster',
    'dfs.ha.namenodes.hacluster'='n1,n2',
    'dfs.namenode.rpc-address.hacluster.n1'='192.168.0.1:8020',
    'dfs.namenode.rpc-address.hacluster.n2'='192.168.0.2:8020',
    'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider',
    'dfs.namenode.kerberos.principal'='hadoop/_HOST@REALM.COM'
    'hadoop.security.authentication'='kerberos',
    'hadoop.kerberos.principal'='doris_test@REALM.COM',
    'hadoop.kerberos.keytab'='/path/to/doris_test.keytab'
    );
    ```
    
    If you want  to `select into outfile` to HDFS that kerberos authentication enable, you can refer to the following SQL statement:
    
    ```sql
    select * from test into outfile "hdfs://tmp/outfile1"
    format as csv
    properties
    (
    'fs.defaultFS'='hdfs://hacluster/',
    'dfs.nameservices'='hacluster',
    'dfs.ha.namenodes.hacluster'='n1,n2',
    'dfs.namenode.rpc-address.hacluster.n1'='192.168.0.1:8020',
    'dfs.namenode.rpc-address.hacluster.n2'='192.168.0.2:8020',
    'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider',
    'dfs.namenode.kerberos.principal'='hadoop/_HOST@REALM.COM'
    'hadoop.security.authentication'='kerberos',
    'hadoop.kerberos.principal'='doris_test@REALM.COM',
    'hadoop.kerberos.keytab'='/path/to/doris_test.keytab'
    );
---
 be/src/exec/CMakeLists.txt                         |  1 +
 be/src/exec/hdfs_builder.cpp                       | 78 ++++++++++++++++++++
 be/src/exec/hdfs_builder.h                         | 47 ++++++++++++
 be/src/exec/hdfs_file_reader.cpp                   | 31 ++------
 be/src/exec/hdfs_file_reader.h                     |  6 +-
 be/src/exec/hdfs_writer.cpp                        | 85 +++++++++-------------
 be/src/exec/hdfs_writer.h                          | 12 ++-
 .../org/apache/doris/analysis/OutFileClause.java   | 14 +++-
 .../java/org/apache/doris/catalog/AuthType.java    | 60 +++++++++++++++
 .../doris/catalog/HiveMetaStoreClientHelper.java   | 13 ++++
 .../java/org/apache/doris/catalog/HiveTable.java   | 48 ++++++++++--
 .../org/apache/doris/common/util/BrokerUtil.java   | 57 +++++++++------
 .../org/apache/doris/planner/BrokerScanNode.java   |  3 +
 .../org/apache/doris/planner/HiveScanNode.java     |  2 +-
 .../org/apache/doris/catalog/HiveTableTest.java    |  4 +-
 gensrc/thrift/PlanNodes.thrift                     |  7 +-
 16 files changed, 340 insertions(+), 128 deletions(-)

diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index 7940c8ac61..e09169c000 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -105,6 +105,7 @@ set(EXEC_FILES
     hdfs_reader_writer.cpp
     hdfs_file_reader.cpp
     hdfs_writer.cpp
+    hdfs_builder.cpp
 )
 
 if (WITH_MYSQL)
diff --git a/be/src/exec/hdfs_builder.cpp b/be/src/exec/hdfs_builder.cpp
new file mode 100644
index 0000000000..5940958050
--- /dev/null
+++ b/be/src/exec/hdfs_builder.cpp
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/hdfs_builder.h"
+
+#include <fmt/format.h>
+
+#include <fstream>
+
+#include "agent/utils.h"
+#include "common/logging.h"
+#include "util/uid_util.h"
+#include "util/url_coding.h"
+namespace doris {
+
+const std::string TICKET_CACHE_PATH = "/tmp/krb5cc_doris_";
+
+Status HDFSCommonBuilder::run_kinit() {
+    if (hdfs_kerberos_principal.empty() || hdfs_kerberos_keytab.empty()) {
+        return Status::InvalidArgument("Invalid hdfs_kerberos_principal or hdfs_kerberos_keytab");
+    }
+    std::string ticket_path = TICKET_CACHE_PATH + generate_uuid_string();
+    fmt::memory_buffer kinit_command;
+    fmt::format_to(kinit_command, "kinit -c {} -R -t {} -k {}", ticket_path, hdfs_kerberos_keytab,
+                   hdfs_kerberos_principal);
+    VLOG_NOTICE << "kinit command: " << fmt::to_string(kinit_command);
+    std::string msg;
+    AgentUtils util;
+    bool rc = util.exec_cmd(fmt::to_string(kinit_command), &msg);
+    if (!rc) {
+        return Status::InternalError("Kinit failed, errMsg: " + msg);
+    }
+    hdfsBuilderSetKerbTicketCachePath(hdfs_builder, ticket_path.c_str());
+    return Status::OK();
+}
+
+HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams) {
+    HDFSCommonBuilder builder;
+    hdfsBuilderSetNameNode(builder.get(), hdfsParams.fs_name.c_str());
+    // set hdfs user
+    if (hdfsParams.__isset.user) {
+        hdfsBuilderSetUserName(builder.get(), hdfsParams.user.c_str());
+    }
+    // set kerberos conf
+    if (hdfsParams.__isset.hdfs_kerberos_principal) {
+        builder.need_kinit = true;
+        builder.hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal;
+        hdfsBuilderSetPrincipal(builder.get(), hdfsParams.hdfs_kerberos_principal.c_str());
+    }
+    if (hdfsParams.__isset.hdfs_kerberos_keytab) {
+        builder.need_kinit = true;
+        builder.hdfs_kerberos_keytab = hdfsParams.hdfs_kerberos_keytab;
+    }
+    // set other conf
+    if (hdfsParams.__isset.hdfs_conf) {
+        for (const THdfsConf& conf : hdfsParams.hdfs_conf) {
+            hdfsBuilderConfSetStr(builder.get(), conf.key.c_str(), conf.value.c_str());
+        }
+    }
+
+    return builder;
+}
+
+} // namespace doris
diff --git a/be/src/exec/hdfs_builder.h b/be/src/exec/hdfs_builder.h
new file mode 100644
index 0000000000..70ac723f1c
--- /dev/null
+++ b/be/src/exec/hdfs_builder.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <hdfs/hdfs.h>
+
+#include "gen_cpp/PlanNodes_types.h"
+#include "exec/file_reader.h"
+
+namespace doris {
+
+class HDFSCommonBuilder {
+    friend HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams);
+
+public:
+    HDFSCommonBuilder() : hdfs_builder(hdfsNewBuilder()) {};
+    ~HDFSCommonBuilder() { hdfsFreeBuilder(hdfs_builder); };
+
+    hdfsBuilder* get() { return hdfs_builder; };
+    bool is_need_kinit() { return need_kinit; };
+    Status run_kinit();
+
+private:
+    hdfsBuilder* hdfs_builder;
+    bool need_kinit {false};
+    std::string hdfs_kerberos_keytab;
+    std::string hdfs_kerberos_principal;
+};
+
+HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams);
+
+} // namespace doris
diff --git a/be/src/exec/hdfs_file_reader.cpp b/be/src/exec/hdfs_file_reader.cpp
index a047df6236..b042018e55 100644
--- a/be/src/exec/hdfs_file_reader.cpp
+++ b/be/src/exec/hdfs_file_reader.cpp
@@ -31,7 +31,8 @@ HdfsFileReader::HdfsFileReader(const THdfsParams& hdfs_params, const std::string
           _current_offset(start_offset),
           _file_size(-1),
           _hdfs_fs(nullptr),
-          _hdfs_file(nullptr) {
+          _hdfs_file(nullptr),
+          _builder(createHDFSBuilder(_hdfs_params)) {
     _namenode = _hdfs_params.fs_name;
 }
 
@@ -40,32 +41,10 @@ HdfsFileReader::~HdfsFileReader() {
 }
 
 Status HdfsFileReader::connect() {
-    hdfsBuilder* hdfs_builder = hdfsNewBuilder();
-    hdfsBuilderSetNameNode(hdfs_builder, _namenode.c_str());
-    // set hdfs user
-    if (_hdfs_params.__isset.user) {
-        hdfsBuilderSetUserName(hdfs_builder, _hdfs_params.user.c_str());
+    if (_builder.is_need_kinit()) {
+        RETURN_IF_ERROR(_builder.run_kinit());
     }
-    // set kerberos conf
-    if (_hdfs_params.__isset.kerb_principal) {
-        hdfsBuilderSetPrincipal(hdfs_builder, _hdfs_params.kerb_principal.c_str());
-    }
-    if (_hdfs_params.__isset.kerb_ticket_cache_path) {
-        hdfsBuilderSetKerbTicketCachePath(hdfs_builder,
-                                          _hdfs_params.kerb_ticket_cache_path.c_str());
-    }
-    // set token
-    if (_hdfs_params.__isset.token) {
-        hdfsBuilderSetToken(hdfs_builder, _hdfs_params.token.c_str());
-    }
-    // set other conf
-    if (_hdfs_params.__isset.hdfs_conf) {
-        for (const THdfsConf& conf : _hdfs_params.hdfs_conf) {
-            hdfsBuilderConfSetStr(hdfs_builder, conf.key.c_str(), conf.value.c_str());
-        }
-    }
-    _hdfs_fs = hdfsBuilderConnect(hdfs_builder);
-    hdfsFreeBuilder(hdfs_builder);
+    _hdfs_fs = hdfsBuilderConnect(_builder.get());
     if (_hdfs_fs == nullptr) {
         std::stringstream ss;
         ss << "connect failed. " << _namenode;
diff --git a/be/src/exec/hdfs_file_reader.h b/be/src/exec/hdfs_file_reader.h
index d4430de3e2..e81af8a186 100644
--- a/be/src/exec/hdfs_file_reader.h
+++ b/be/src/exec/hdfs_file_reader.h
@@ -17,10 +17,9 @@
 
 #pragma once
 
-#include <hdfs/hdfs.h>
-
-#include "exec/file_reader.h"
 #include "gen_cpp/PlanNodes_types.h"
+#include "exec/file_reader.h"
+#include "exec/hdfs_builder.h"
 
 namespace doris {
 
@@ -56,6 +55,7 @@ private:
     int64_t _file_size;
     hdfsFS _hdfs_fs;
     hdfsFile _hdfs_file;
+    HDFSCommonBuilder _builder;
 };
 
 } // namespace doris
diff --git a/be/src/exec/hdfs_writer.cpp b/be/src/exec/hdfs_writer.cpp
index 8aaae7e78e..b45fbe4449 100644
--- a/be/src/exec/hdfs_writer.cpp
+++ b/be/src/exec/hdfs_writer.cpp
@@ -24,23 +24,32 @@
 
 namespace doris {
 const static std::string FS_KEY = "fs.defaultFS";
-const static std::string USER = "hdfs_user";
-const static std::string KERBEROS_PRINCIPAL = "kerberos_principal";
-const static std::string KERB_TICKET_CACHE_PATH = "kerb_ticket_cache_path";
+const static std::string USER = "hadoop.username";
+const static std::string KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
+const static std::string KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
 const static std::string TOKEN = "token";
 
 HDFSWriter::HDFSWriter(std::map<std::string, std::string>& properties, const std::string& path)
         : _properties(properties),
           _path(path),
-          _hdfs_fs(nullptr) {
-    _parse_properties(_properties);
-}
+          _hdfs_fs(nullptr),
+          _hdfs_params(_parse_properties(_properties)),
+          _builder(createHDFSBuilder(_hdfs_params)) {}
 
 HDFSWriter::~HDFSWriter() {
     close();
 }
 
 Status HDFSWriter::open() {
+    if (_namenode.empty()) {
+        LOG(WARNING) << "hdfs properties is incorrect.";
+        return Status::InternalError("hdfs properties is incorrect");
+    }
+    // if the format of _path is hdfs://ip:port/path, replace it to /path.
+    // path like hdfs://ip:port/path can't be used by libhdfs3.
+    if (_path.find(_namenode) != _path.npos) {
+        _path = _path.substr(_namenode.size());
+    }
     RETURN_IF_ERROR(_connect());
     if (_hdfs_fs == nullptr) {
         return Status::InternalError("HDFS writer open without client");
@@ -130,31 +139,10 @@ Status HDFSWriter::close() {
 }
 
 Status HDFSWriter::_connect() {
-    hdfsBuilder* hdfs_builder = hdfsNewBuilder();
-    hdfsBuilderSetNameNode(hdfs_builder, _namenode.c_str());
-    // set hdfs user
-    if (!_user.empty()) {
-        hdfsBuilderSetUserName(hdfs_builder, _user.c_str());
-    }
-    // set kerberos conf
-    if (!_kerb_principal.empty()) {
-        hdfsBuilderSetPrincipal(hdfs_builder, _kerb_principal.c_str());
-    }
-    if (!_kerb_ticket_cache_path.empty()) {
-        hdfsBuilderSetKerbTicketCachePath(hdfs_builder, _kerb_ticket_cache_path.c_str());
-    }
-    // set token
-    if (!_token.empty()) {
-        hdfsBuilderSetToken(hdfs_builder, _token.c_str());
-    }
-    // set other conf
-    if (!_properties.empty()) {
-        std::map<std::string, std::string>::iterator iter;
-        for (iter = _properties.begin(); iter != _properties.end(); ++iter) {
-            hdfsBuilderConfSetStr(hdfs_builder, iter->first.c_str(), iter->second.c_str());
-        }
+    if (_builder.is_need_kinit()) {
+        RETURN_IF_ERROR(_builder.run_kinit());
     }
-    _hdfs_fs = hdfsBuilderConnect(hdfs_builder);
+    _hdfs_fs = hdfsBuilderConnect(_builder.get());
     if (_hdfs_fs == nullptr) {
         std::stringstream ss;
         ss << "connect failed. namenode:" << _namenode;
@@ -163,41 +151,34 @@ Status HDFSWriter::_connect() {
     return Status::OK();
 }
 
-Status HDFSWriter::_parse_properties(std::map<std::string, std::string>& prop) {
+THdfsParams HDFSWriter::_parse_properties(std::map<std::string, std::string>& prop) {
     std::map<std::string, std::string>::iterator iter;
+    std::vector<THdfsConf> hdfs_configs;
+    THdfsParams hdfsParams;
     for (iter = prop.begin(); iter != prop.end();) {
         if (iter->first.compare(FS_KEY) == 0) {
             _namenode = iter->second;
+            hdfsParams.__set_fs_name(_namenode);
             iter = prop.erase(iter);
         } else if (iter->first.compare(USER) == 0) {
-            _user = iter->second;
+            hdfsParams.__set_user(iter->second);
             iter = prop.erase(iter);
         } else if (iter->first.compare(KERBEROS_PRINCIPAL) == 0) {
-            _kerb_principal = iter->second;
+            hdfsParams.__set_hdfs_kerberos_principal(iter->second);
             iter = prop.erase(iter);
-        } else if (iter->first.compare(KERB_TICKET_CACHE_PATH) == 0) {
-            _kerb_ticket_cache_path = iter->second;
-            iter = prop.erase(iter);
-        } else if (iter->first.compare(TOKEN) == 0) {
-            _token = iter->second;
+        } else if (iter->first.compare(KERBEROS_KEYTAB) == 0) {
+            hdfsParams.__set_hdfs_kerberos_keytab(iter->second);
             iter = prop.erase(iter);
         } else {
-            ++iter;
+            THdfsConf item;
+            item.key = iter->first;
+            item.value = iter->second;
+            hdfs_configs.push_back(item);
+            iter = prop.erase(iter);
         }
     }
-
-    if (_namenode.empty()) {
-        LOG(WARNING) << "hdfs properties is incorrect.";
-        return Status::InternalError("hdfs properties is incorrect");
-    }
-
-    // if the format of _path is hdfs://ip:port/path, replace it to /path.
-    // path like hdfs://ip:port/path can't be used by libhdfs3.
-    if (_path.find(_namenode) != _path.npos) {
-        _path = _path.substr(_namenode.size());
-    }
-
-    return Status::OK();
+    hdfsParams.__set_hdfs_conf(hdfs_configs);
+    return hdfsParams;
 }
 
 }// end namespace doris
diff --git a/be/src/exec/hdfs_writer.h b/be/src/exec/hdfs_writer.h
index a3f17ec166..2b99bacb5e 100644
--- a/be/src/exec/hdfs_writer.h
+++ b/be/src/exec/hdfs_writer.h
@@ -17,12 +17,12 @@
 
 #pragma once
 
-#include <hdfs/hdfs.h>
-
 #include <map>
 #include <string>
 
+#include "gen_cpp/PlanNodes_types.h"
 #include "exec/file_writer.h"
+#include "exec/hdfs_builder.h"
 
 namespace doris {
 class HDFSWriter : public FileWriter {
@@ -40,18 +40,16 @@ public:
 
 private:
     Status _connect();
-    Status _parse_properties(std::map<std::string, std::string>& prop);
+    THdfsParams _parse_properties(std::map<std::string, std::string>& prop);
 
     std::map<std::string, std::string> _properties;
-    std::string _user = "";
     std::string _namenode = "";
     std::string _path = "";
-    std::string _kerb_principal = "";
-    std::string _kerb_ticket_cache_path = "";
-    std::string _token = "";
     hdfsFS _hdfs_fs = nullptr;
     hdfsFile _hdfs_file = nullptr;
     bool _closed = false;
+    THdfsParams _hdfs_params;
+    HDFSCommonBuilder _builder;
 };
 
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index c82cd1ab62..1817bf7d2a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.common.util.ParseUtil;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.qe.ConnectContext;
@@ -87,7 +88,8 @@ public class OutFileClause {
     public static final String LOCAL_FILE_PREFIX = "file:///";
     private static final String S3_FILE_PREFIX = "S3://";
     private static final String HDFS_FILE_PREFIX = "hdfs://";
-    private static final String HDFS_PROP_PREFIX = "hdfs.";
+    private static final String HADOOP_FS_PROP_PREFIX = "dfs.";
+    private static final String HADOOP_PROP_PREFIX = "hadoop.";
     private static final String BROKER_PROP_PREFIX = "broker.";
     private static final String PROP_BROKER_NAME = "broker.name";
     private static final String PROP_COLUMN_SEPARATOR = "column_separator";
@@ -416,9 +418,13 @@ public class OutFileClause {
             } else if (entry.getKey().toUpperCase().startsWith(S3Storage.S3_PROPERTIES_PREFIX)) {
                 brokerProps.put(entry.getKey(), entry.getValue());
                 processedPropKeys.add(entry.getKey());
-            } else if (entry.getKey().startsWith(HDFS_PROP_PREFIX)
-                    && storageType == StorageBackend.StorageType.HDFS) {
-                brokerProps.put(entry.getKey().substring(HDFS_PROP_PREFIX.length()), entry.getValue());
+            } else if (entry.getKey().contains(BrokerUtil.HADOOP_FS_NAME)
+                && storageType == StorageBackend.StorageType.HDFS) {
+                brokerProps.put(entry.getKey(), entry.getValue());
+                processedPropKeys.add(entry.getKey());
+            } else if ((entry.getKey().startsWith(HADOOP_FS_PROP_PREFIX) || entry.getKey().startsWith(HADOOP_PROP_PREFIX))
+                && storageType == StorageBackend.StorageType.HDFS) {
+                brokerProps.put(entry.getKey(), entry.getValue());
                 processedPropKeys.add(entry.getKey());
             }
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthType.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthType.java
new file mode 100644
index 0000000000..c0c97530a0
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthType.java
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+/**
+ * Define different auth type for external table such as hive/iceberg,
+ * so that BE could call secured under fileStorageSystem (enable kerberos)
+ */
+public enum AuthType {
+    SIMPLE(0, "simple"),
+    KERBEROS(1, "kerberos");
+
+    private int code;
+    private String desc;
+
+    AuthType(int code, String desc) {
+        this.code = code;
+        this.desc = desc;
+    }
+
+    public static boolean isSupportedAuthType(String authType) {
+        for (AuthType auth : values()) {
+            if (auth.getDesc().equals(authType)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    public String getDesc() {
+        return desc;
+    }
+
+    public void setDesc(String desc) {
+        this.desc = desc;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index c378eecf24..f192df298b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -31,6 +31,7 @@ import org.apache.doris.analysis.NullLiteral;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TExprOpcode;
 
@@ -54,6 +55,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -245,14 +247,25 @@ public class HiveMetaStoreClientHelper {
     private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(Table table, Map<String, String> properties) throws DdlException {
         List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
         Configuration configuration = new Configuration(false);
+        boolean isSecurityEnabled = false;
         for (Map.Entry<String, String> entry : properties.entrySet()) {
             if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
                 configuration.set(entry.getKey(), entry.getValue());
             }
+            if (entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION)
+                && entry.getValue().equals(AuthType.KERBEROS.getDesc())) {
+                isSecurityEnabled = true;
+            }
         }
         String location = table.getSd().getLocation();
         org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(location);
         try {
+            if (isSecurityEnabled) {
+                UserGroupInformation.setConfiguration(configuration);
+                // login user from keytab
+                UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
+                    properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
+            }
             FileSystem fileSystem = path.getFileSystem(configuration);
             iterators.add(fileSystem.listLocatedStatus(path));
         } catch (IOException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
index d418e47307..19be317f12 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
@@ -19,6 +19,7 @@ package org.apache.doris.catalog;
 
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.thrift.THiveTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
@@ -39,6 +40,7 @@ import java.util.Map;
  */
 public class HiveTable extends Table {
     private static final String PROPERTY_MISSING_MSG = "Hive %s is null. Please add properties('%s'='xxx') when create table";
+    private static final String PROPERTY_ERROR_MSG = "Hive table properties('%s'='%s') is illegal or not supported. Please check it";
 
     private static final String HIVE_DB = "database";
     private static final String HIVE_TABLE = "table";
@@ -77,7 +79,7 @@ public class HiveTable extends Table {
     private void validate(Map<String, String> properties) throws DdlException {
         if (properties == null) {
             throw new DdlException("Please set properties of hive table, "
-                    + "they are: database, table and 'hive.metastore.uris'");
+                + "they are: database, table and 'hive.metastore.uris'");
         }
 
         Map<String, String> copiedProps = Maps.newHashMap(properties);
@@ -94,14 +96,48 @@ public class HiveTable extends Table {
         copiedProps.remove(HIVE_TABLE);
 
         // check hive properties
-        // hive.metastore.uris
-        String hiveMetastoreUris = copiedProps.get(HIVE_METASTORE_URIS);
-        if (Strings.isNullOrEmpty(hiveMetastoreUris)) {
+        // hive.metastore.uris 
+        String hiveMetaStoreUris = copiedProps.get(HIVE_METASTORE_URIS);
+        if (Strings.isNullOrEmpty(hiveMetaStoreUris)) {
             throw new DdlException(String.format(PROPERTY_MISSING_MSG, HIVE_METASTORE_URIS, HIVE_METASTORE_URIS));
         }
         copiedProps.remove(HIVE_METASTORE_URIS);
-        hiveProperties.put(HIVE_METASTORE_URIS, hiveMetastoreUris);
+        hiveProperties.put(HIVE_METASTORE_URIS, hiveMetaStoreUris);
 
+        // check auth type
+        String authType = copiedProps.get(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION);
+        if (Strings.isNullOrEmpty(authType)) {
+            authType = AuthType.SIMPLE.getDesc();
+        }
+        if (!AuthType.isSupportedAuthType(authType)) {
+            throw new DdlException(String.format(PROPERTY_ERROR_MSG, BrokerUtil.HADOOP_SECURITY_AUTHENTICATION, authType));
+        }
+        copiedProps.remove(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION);
+        hiveProperties.put(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION, authType);
+
+        if (AuthType.KERBEROS.getDesc().equals(authType)) {
+            // check principal
+            String principal = copiedProps.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL);
+            if (Strings.isNullOrEmpty(principal)) {
+                throw new DdlException(String.format(PROPERTY_MISSING_MSG, BrokerUtil.HADOOP_KERBEROS_PRINCIPAL, BrokerUtil.HADOOP_KERBEROS_PRINCIPAL));
+            }
+            hiveProperties.put(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL, principal);
+            copiedProps.remove(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL);
+            // check keytab
+            String keytabPath = copiedProps.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB);
+            if (Strings.isNullOrEmpty(keytabPath)) {
+                throw new DdlException(String.format(PROPERTY_MISSING_MSG, BrokerUtil.HADOOP_KERBEROS_KEYTAB, BrokerUtil.HADOOP_KERBEROS_KEYTAB));
+            }
+            if (!Strings.isNullOrEmpty(keytabPath)) {
+                hiveProperties.put(BrokerUtil.HADOOP_KERBEROS_KEYTAB, keytabPath);
+                copiedProps.remove(BrokerUtil.HADOOP_KERBEROS_KEYTAB);
+            }
+        }
+        String HDFSUserName = copiedProps.get(BrokerUtil.HADOOP_USER_NAME);
+        if (!Strings.isNullOrEmpty(HDFSUserName)) {
+            hiveProperties.put(BrokerUtil.HADOOP_USER_NAME, HDFSUserName);
+            copiedProps.remove(BrokerUtil.HADOOP_USER_NAME);
+        }
         if (!copiedProps.isEmpty()) {
             Iterator<Map.Entry<String, String>> iter = copiedProps.entrySet().iterator();
             while(iter.hasNext()) {
@@ -148,7 +184,7 @@ public class HiveTable extends Table {
     public TTableDescriptor toThrift() {
         THiveTable tHiveTable = new THiveTable(getHiveDb(), getHiveTable(), getHiveProperties());
         TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE,
-                fullSchema.size(), 0, getName(), "");
+            fullSchema.size(), 0, getName(), "");
         tTableDescriptor.setHiveTable(tHiveTable);
         return tTableDescriptor;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index 9e170e47b7..c10256e69b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.backup.RemoteFile;
 import org.apache.doris.backup.S3Storage;
 import org.apache.doris.backup.Status;
+import org.apache.doris.catalog.AuthType;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.common.AnalysisException;
@@ -64,6 +65,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -82,27 +84,26 @@ import java.util.Map;
 public class BrokerUtil {
     private static final Logger LOG = LogManager.getLogger(BrokerUtil.class);
 
-    private static int READ_BUFFER_SIZE_B = 1024 * 1024;
-    private static String HDFS_FS_KEY = "fs.defaultFS";
-    private static String HDFS_USER_KEY = "hdfs_user";
-    private static String HDFS_KERB_PRINCIPAL = "kerb_principal";
-    private static String HDFS_KERB_TICKET_CACHE_PATH = "kerb_ticket_cache_path";
-    private static String HDFS_KERB_TOKEN = "kerb_token";
+    private static final int READ_BUFFER_SIZE_B = 1024 * 1024;
+    public static String HADOOP_FS_NAME = "fs.defaultFS";
+    // simple or kerberos
+    public static String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
+    public static String HADOOP_USER_NAME = "hadoop.username";
+    public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
+    public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
 
     public static void generateHdfsParam(Map<String, String> properties, TBrokerRangeDesc rangeDesc) {
         rangeDesc.setHdfsParams(new THdfsParams());
         rangeDesc.hdfs_params.setHdfsConf(new ArrayList<>());
         for (Map.Entry<String, String> property : properties.entrySet()) {
-            if (property.getKey().equalsIgnoreCase(HDFS_FS_KEY)) {
+            if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) {
                 rangeDesc.hdfs_params.setFsName(property.getValue());
-            } else if (property.getKey().equalsIgnoreCase(HDFS_USER_KEY)) {
+            } else if (property.getKey().equalsIgnoreCase(HADOOP_USER_NAME)) {
                 rangeDesc.hdfs_params.setUser(property.getValue());
-            } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_PRINCIPAL)) {
-                rangeDesc.hdfs_params.setKerbPrincipal(property.getValue());
-            } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_TICKET_CACHE_PATH)) {
-                rangeDesc.hdfs_params.setKerbTicketCachePath(property.getValue());
-            } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_TOKEN)) {
-                rangeDesc.hdfs_params.setToken(property.getValue());
+            } else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_PRINCIPAL)) {
+                rangeDesc.hdfs_params.setHdfsKerberosPrincipal(property.getValue());
+            } else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_KEYTAB)) {
+                rangeDesc.hdfs_params.setHdfsKerberosKeytab(property.getValue());
             } else {
                 THdfsConf hdfsConf = new THdfsConf();
                 hdfsConf.setKey(property.getKey());
@@ -171,27 +172,35 @@ public class BrokerUtil {
                 }
             }
         } else if (brokerDesc.getStorageType() == StorageBackend.StorageType.HDFS) {
-            if (!brokerDesc.getProperties().containsKey(HDFS_FS_KEY)
-                    || !brokerDesc.getProperties().containsKey(HDFS_USER_KEY)) {
+            if (!brokerDesc.getProperties().containsKey(HADOOP_FS_NAME)
+                || !brokerDesc.getProperties().containsKey(HADOOP_USER_NAME)) {
                 throw new UserException(String.format(
-                        "The properties of hdfs is invalid. %s and %s are needed", HDFS_FS_KEY, HDFS_USER_KEY));
+                    "The properties of hdfs is invalid. %s and %s are needed", HADOOP_FS_NAME, HADOOP_USER_NAME));
             }
-            String hdfsFsName = brokerDesc.getProperties().get(HDFS_FS_KEY);
-            String user = brokerDesc.getProperties().get(HDFS_USER_KEY);
+            String fsName = brokerDesc.getProperties().get(HADOOP_FS_NAME);
+            String userName = brokerDesc.getProperties().get(HADOOP_USER_NAME);
             Configuration conf = new Configuration();
+            boolean isSecurityEnabled = false;
             for (Map.Entry<String, String> propEntry : brokerDesc.getProperties().entrySet()) {
-                if (propEntry.getKey().equals(HDFS_FS_KEY) || propEntry.getKey().equals(HDFS_USER_KEY)) {
-                    continue;
-                }
                 conf.set(propEntry.getKey(), propEntry.getValue());
+                if (propEntry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION)
+                    && propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) {
+                    isSecurityEnabled = true;
+                }
             }
             try {
-                FileSystem fs = FileSystem.get(new URI(hdfsFsName), conf, user);
+                if (isSecurityEnabled) {
+                    UserGroupInformation.setConfiguration(conf);
+                    UserGroupInformation.loginUserFromKeytab(
+                        brokerDesc.getProperties().get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
+                        brokerDesc.getProperties().get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
+                }
+                FileSystem fs = FileSystem.get(new URI(fsName), conf, userName);
                 FileStatus[] statusList = fs.globStatus(new Path(path));
                 for (FileStatus status : statusList) {
                     if (status.isFile()) {
                         fileStatuses.add(new TBrokerFileStatus(status.getPath().toUri().getPath(),
-                                status.isDirectory(), status.getLen(), status.isFile()));
+                            status.isDirectory(), status.getLen(), status.isFile()));
                     }
                 }
             } catch (IOException | InterruptedException | URISyntaxException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index 417b91e145..187646e1d6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -68,6 +68,9 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
index a67f5394ca..711e63e695 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
@@ -179,7 +179,7 @@ public class HiveScanNode extends BrokerScanNode {
         }
         List<TBrokerFileStatus> fileStatuses = new ArrayList<>();
         this.hdfsUri = HiveMetaStoreClientHelper.getHiveDataFiles(hiveTable, hivePartitionPredicate,
-                fileStatuses, remoteHiveTable);
+            fileStatuses, remoteHiveTable);
         fileStatusesList.add(fileStatuses);
         filesAdded += fileStatuses.size();
         for (TBrokerFileStatus fstatus : fileStatuses) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/HiveTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/HiveTableTest.java
index 00b42a4ec0..c7dad72037 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/HiveTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/HiveTableTest.java
@@ -21,6 +21,7 @@ import org.apache.doris.common.DdlException;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -53,7 +54,8 @@ public class HiveTableTest {
     public void testNormal() throws DdlException {
         HiveTable table = new HiveTable(1000, "hive_table", columns, properties);
         Assert.assertEquals(String.format("%s.%s", hiveDb, hiveTable), table.getHiveDbTable());
-        Assert.assertEquals(1, table.getHiveProperties().size());
+        // HiveProperties={hadoop.security.authentication=simple, hive.metastore.uris=thrift://127.0.0.1:9083}
+        Assert.assertEquals(2, table.getHiveProperties().size());
     }
 
     @Test(expected = DdlException.class)
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index c46b0f3c24..9d9124b056 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -118,10 +118,9 @@ struct THdfsConf {
 struct THdfsParams {
     1: optional string fs_name
     2: optional string user
-    3: optional string kerb_principal
-    4: optional string kerb_ticket_cache_path
-    5: optional string token
-    6: optional list<THdfsConf> hdfs_conf
+    3: optional string hdfs_kerberos_principal
+    4: optional string hdfs_kerberos_keytab
+    5: optional list<THdfsConf> hdfs_conf
 }
 
 // One broker range information.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org