You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/14 12:07:09 UTC

[incubator-doris] branch master updated: [feature] Support read hive external table and outfile into HDFS that authenticated by kerberos (#9579)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f7b5f36da4 [feature] Support read hive external table and outfile into HDFS that authenticated by kerberos (#9579)
f7b5f36da4 is described below

commit f7b5f36da499c325258ef5fcd8953c9295c92e79
Author: gtchaos <gs...@gmail.com>
AuthorDate: Tue Jun 14 20:07:03 2022 +0800

    [feature] Support read hive external table and outfile into HDFS that authenticated by kerberos (#9579)
    
    At present, Doris can only access the hadoop cluster with kerberos authentication enabled by broker, but Doris BE itself
    does not supports access to a kerberos-authenticated HDFS file.
    
    This PR hope solve the problem.
    
    When create hive external table, users just specify following properties to access the hdfs data with kerberos authentication enabled:
    
    ```sql
    CREATE EXTERNAL TABLE t_hive (
    k1 int NOT NULL COMMENT "",
    k2 char(10) NOT NULL COMMENT "",
    k3 datetime NOT NULL COMMENT "",
    k5 varchar(20) NOT NULL COMMENT "",
    k6 double NOT NULL COMMENT ""
    ) ENGINE=HIVE
    COMMENT "HIVE"
    PROPERTIES (
    'hive.metastore.uris' = 'thrift://192.168.0.1:9083',
    'database' = 'hive_db',
    'table' = 'hive_table',
    'dfs.nameservices'='hacluster',
    'dfs.ha.namenodes.hacluster'='n1,n2',
    'dfs.namenode.rpc-address.hacluster.n1'='192.168.0.1:8020',
    'dfs.namenode.rpc-address.hacluster.n2'='192.168.0.2:8020',
    'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider',
    'dfs.namenode.kerberos.principal'='hadoop/_HOST@REALM.COM'
    'hadoop.security.authentication'='kerberos',
    'hadoop.kerberos.principal'='doris_test@REALM.COM',
    'hadoop.kerberos.keytab'='/path/to/doris_test.keytab'
    );
    ```
    
    If you want  to `select into outfile` to HDFS that kerberos authentication enable, you can refer to the following SQL statement:
    
    ```sql
    select * from test into outfile "hdfs://tmp/outfile1"
    format as csv
    properties
    (
    'fs.defaultFS'='hdfs://hacluster/',
    'dfs.nameservices'='hacluster',
    'dfs.ha.namenodes.hacluster'='n1,n2',
    'dfs.namenode.rpc-address.hacluster.n1'='192.168.0.1:8020',
    'dfs.namenode.rpc-address.hacluster.n2'='192.168.0.2:8020',
    'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider',
    'dfs.namenode.kerberos.principal'='hadoop/_HOST@REALM.COM'
    'hadoop.security.authentication'='kerberos',
    'hadoop.kerberos.principal'='doris_test@REALM.COM',
    'hadoop.kerberos.keytab'='/path/to/doris_test.keytab'
    );
    ```
---
 be/src/io/CMakeLists.txt                           |  1 +
 be/src/io/hdfs_builder.cpp                         | 78 +++++++++++++++++++
 .../io/hdfs_builder.h}                             | 38 ++++++----
 be/src/io/hdfs_file_reader.cpp                     | 31 ++------
 be/src/io/hdfs_file_reader.h                       |  4 +-
 be/src/io/hdfs_writer.cpp                          | 87 +++++++++-------------
 be/src/io/hdfs_writer.h                            | 12 ++-
 be/test/CMakeLists.txt                             |  2 +-
 be/test/exec/hdfs_file_reader_test.cpp             | 14 +++-
 .../docs/ecosystem/external-table/hive-of-doris.md | 47 +++++++++++-
 .../Data-Manipulation-Statements/OUTFILE.md        | 46 ++++++++++--
 .../docs/ecosystem/external-table/hive-of-doris.md | 45 ++++++++++-
 .../Data-Manipulation-Statements/OUTFILE.md        | 43 +++++++++--
 .../org/apache/doris/analysis/OutFileClause.java   | 14 +++-
 .../java/org/apache/doris/catalog/AuthType.java    | 60 +++++++++++++++
 .../doris/catalog/HiveMetaStoreClientHelper.java   | 14 ++++
 .../java/org/apache/doris/catalog/HiveTable.java   | 48 ++++++++++--
 .../org/apache/doris/common/util/BrokerUtil.java   | 57 ++++++++------
 .../org/apache/doris/planner/BrokerScanNode.java   |  3 +
 .../org/apache/doris/planner/HiveScanNode.java     |  3 +-
 .../org/apache/doris/catalog/HiveTableTest.java    |  4 +-
 gensrc/thrift/PlanNodes.thrift                     |  7 +-
 22 files changed, 489 insertions(+), 169 deletions(-)

diff --git a/be/src/io/CMakeLists.txt b/be/src/io/CMakeLists.txt
index 286a9471db..e55a72188c 100644
--- a/be/src/io/CMakeLists.txt
+++ b/be/src/io/CMakeLists.txt
@@ -26,6 +26,7 @@ set(IO_FILES
      broker_writer.cpp
      buffered_reader.cpp
      file_factory.cpp
+     hdfs_builder.cpp
      hdfs_file_reader.cpp
      hdfs_reader_writer.cpp
      hdfs_writer.cpp
diff --git a/be/src/io/hdfs_builder.cpp b/be/src/io/hdfs_builder.cpp
new file mode 100644
index 0000000000..951e787afb
--- /dev/null
+++ b/be/src/io/hdfs_builder.cpp
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/hdfs_builder.h"
+
+#include <fmt/format.h>
+
+#include <fstream>
+
+#include "agent/utils.h"
+#include "common/logging.h"
+#include "util/uid_util.h"
+#include "util/url_coding.h"
+namespace doris {
+
+const std::string TICKET_CACHE_PATH = "/tmp/krb5cc_doris_";
+
+Status HDFSCommonBuilder::run_kinit() {
+    if (hdfs_kerberos_principal.empty() || hdfs_kerberos_keytab.empty()) {
+        return Status::InvalidArgument("Invalid hdfs_kerberos_principal or hdfs_kerberos_keytab");
+    }
+    std::string ticket_path = TICKET_CACHE_PATH + generate_uuid_string();
+    fmt::memory_buffer kinit_command;
+    fmt::format_to(kinit_command, "kinit -c {} -R -t {} -k {}", ticket_path, hdfs_kerberos_keytab,
+                   hdfs_kerberos_principal);
+    VLOG_NOTICE << "kinit command: " << fmt::to_string(kinit_command);
+    std::string msg;
+    AgentUtils util;
+    bool rc = util.exec_cmd(fmt::to_string(kinit_command), &msg);
+    if (!rc) {
+        return Status::InternalError("Kinit failed, errMsg: " + msg);
+    }
+    hdfsBuilderSetKerbTicketCachePath(hdfs_builder, ticket_path.c_str());
+    return Status::OK();
+}
+
+HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams) {
+    HDFSCommonBuilder builder;
+    hdfsBuilderSetNameNode(builder.get(), hdfsParams.fs_name.c_str());
+    // set hdfs user
+    if (hdfsParams.__isset.user) {
+        hdfsBuilderSetUserName(builder.get(), hdfsParams.user.c_str());
+    }
+    // set kerberos conf
+    if (hdfsParams.__isset.hdfs_kerberos_principal) {
+        builder.need_kinit = true;
+        builder.hdfs_kerberos_principal = hdfsParams.hdfs_kerberos_principal;
+        hdfsBuilderSetPrincipal(builder.get(), hdfsParams.hdfs_kerberos_principal.c_str());
+    }
+    if (hdfsParams.__isset.hdfs_kerberos_keytab) {
+        builder.need_kinit = true;
+        builder.hdfs_kerberos_keytab = hdfsParams.hdfs_kerberos_keytab;
+    }
+    // set other conf
+    if (hdfsParams.__isset.hdfs_conf) {
+        for (const THdfsConf& conf : hdfsParams.hdfs_conf) {
+            hdfsBuilderConfSetStr(builder.get(), conf.key.c_str(), conf.value.c_str());
+        }
+    }
+
+    return builder;
+}
+
+} // namespace doris
diff --git a/be/test/exec/hdfs_file_reader_test.cpp b/be/src/io/hdfs_builder.h
similarity index 54%
copy from be/test/exec/hdfs_file_reader_test.cpp
copy to be/src/io/hdfs_builder.h
index a5e85f2bc8..39c4528836 100644
--- a/be/test/exec/hdfs_file_reader_test.cpp
+++ b/be/src/io/hdfs_builder.h
@@ -15,25 +15,33 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "io/hdfs_file_reader.h"
+#pragma once
 
-#include <gtest/gtest.h>
+#include <hdfs/hdfs.h>
 
-#include "io/hdfs_reader_writer.h"
+#include "gen_cpp/PlanNodes_types.h"
+#include "io/file_reader.h"
 
 namespace doris {
 
-class HdfsFileReaderTest : public testing::Test {};
+class HDFSCommonBuilder {
+    friend HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams);
 
-TEST_F(HdfsFileReaderTest, test_connect_fail) {
-    THdfsParams hdfsParams;
-    hdfsParams.fs_name = "hdfs://127.0.0.1:8888"; // An invalid address
-    HdfsFileReader hdfs_file_reader(hdfsParams, "/user/foo/test.data", 0);
-    Status status = hdfs_file_reader.open();
-    hdfs_file_reader.close();
-    std::string msg = status.get_error_msg();
-    EXPECT_TRUE(msg.find("Connection refused") >= 0);
-    hdfs_file_reader.close();
-}
+public:
+    HDFSCommonBuilder() : hdfs_builder(hdfsNewBuilder()) {};
+    ~HDFSCommonBuilder() { hdfsFreeBuilder(hdfs_builder); };
 
-} // end namespace doris
+    hdfsBuilder* get() { return hdfs_builder; };
+    bool is_need_kinit() { return need_kinit; };
+    Status run_kinit();
+
+private:
+    hdfsBuilder* hdfs_builder;
+    bool need_kinit {false};
+    std::string hdfs_kerberos_keytab;
+    std::string hdfs_kerberos_principal;
+};
+
+HDFSCommonBuilder createHDFSBuilder(const THdfsParams& hdfsParams);
+
+} // namespace doris
diff --git a/be/src/io/hdfs_file_reader.cpp b/be/src/io/hdfs_file_reader.cpp
index bce0b6b0fe..0c5c57afad 100644
--- a/be/src/io/hdfs_file_reader.cpp
+++ b/be/src/io/hdfs_file_reader.cpp
@@ -31,7 +31,8 @@ HdfsFileReader::HdfsFileReader(const THdfsParams& hdfs_params, const std::string
           _current_offset(start_offset),
           _file_size(-1),
           _hdfs_fs(nullptr),
-          _hdfs_file(nullptr) {
+          _hdfs_file(nullptr),
+          _builder(createHDFSBuilder(_hdfs_params)) {
     _namenode = _hdfs_params.fs_name;
 }
 
@@ -40,32 +41,10 @@ HdfsFileReader::~HdfsFileReader() {
 }
 
 Status HdfsFileReader::connect() {
-    hdfsBuilder* hdfs_builder = hdfsNewBuilder();
-    hdfsBuilderSetNameNode(hdfs_builder, _namenode.c_str());
-    // set hdfs user
-    if (_hdfs_params.__isset.user) {
-        hdfsBuilderSetUserName(hdfs_builder, _hdfs_params.user.c_str());
+    if (_builder.is_need_kinit()) {
+        RETURN_IF_ERROR(_builder.run_kinit());
     }
-    // set kerberos conf
-    if (_hdfs_params.__isset.kerb_principal) {
-        hdfsBuilderSetPrincipal(hdfs_builder, _hdfs_params.kerb_principal.c_str());
-    }
-    if (_hdfs_params.__isset.kerb_ticket_cache_path) {
-        hdfsBuilderSetKerbTicketCachePath(hdfs_builder,
-                                          _hdfs_params.kerb_ticket_cache_path.c_str());
-    }
-    // set token
-    if (_hdfs_params.__isset.token) {
-        hdfsBuilderSetToken(hdfs_builder, _hdfs_params.token.c_str());
-    }
-    // set other conf
-    if (_hdfs_params.__isset.hdfs_conf) {
-        for (const THdfsConf& conf : _hdfs_params.hdfs_conf) {
-            hdfsBuilderConfSetStr(hdfs_builder, conf.key.c_str(), conf.value.c_str());
-        }
-    }
-    _hdfs_fs = hdfsBuilderConnect(hdfs_builder);
-    hdfsFreeBuilder(hdfs_builder);
+    _hdfs_fs = hdfsBuilderConnect(_builder.get());
     if (_hdfs_fs == nullptr) {
         std::stringstream ss;
         ss << "connect to hdfs failed. namenode address:" << _namenode
diff --git a/be/src/io/hdfs_file_reader.h b/be/src/io/hdfs_file_reader.h
index 67fa60a7d3..5f8956a5b0 100644
--- a/be/src/io/hdfs_file_reader.h
+++ b/be/src/io/hdfs_file_reader.h
@@ -17,10 +17,9 @@
 
 #pragma once
 
-#include <hdfs/hdfs.h>
-
 #include "gen_cpp/PlanNodes_types.h"
 #include "io/file_reader.h"
+#include "io/hdfs_builder.h"
 
 namespace doris {
 
@@ -56,6 +55,7 @@ private:
     int64_t _file_size;
     hdfsFS _hdfs_fs;
     hdfsFile _hdfs_file;
+    HDFSCommonBuilder _builder;
 };
 
 } // namespace doris
diff --git a/be/src/io/hdfs_writer.cpp b/be/src/io/hdfs_writer.cpp
index 6727d9a6d9..1f6ada8c4b 100644
--- a/be/src/io/hdfs_writer.cpp
+++ b/be/src/io/hdfs_writer.cpp
@@ -24,21 +24,32 @@
 
 namespace doris {
 const static std::string FS_KEY = "fs.defaultFS";
-const static std::string USER = "hdfs_user";
-const static std::string KERBEROS_PRINCIPAL = "kerberos_principal";
-const static std::string KERB_TICKET_CACHE_PATH = "kerb_ticket_cache_path";
+const static std::string USER = "hadoop.username";
+const static std::string KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
+const static std::string KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
 const static std::string TOKEN = "token";
 
 HDFSWriter::HDFSWriter(std::map<std::string, std::string>& properties, const std::string& path)
-        : _properties(properties), _path(path), _hdfs_fs(nullptr) {
-    _parse_properties(_properties);
-}
+        : _properties(properties),
+          _path(path),
+          _hdfs_fs(nullptr),
+          _hdfs_params(_parse_properties(_properties)),
+          _builder(createHDFSBuilder(_hdfs_params)) {}
 
 HDFSWriter::~HDFSWriter() {
     close();
 }
 
 Status HDFSWriter::open() {
+    if (_namenode.empty()) {
+        LOG(WARNING) << "hdfs properties is incorrect.";
+        return Status::InternalError("hdfs properties is incorrect");
+    }
+    // if the format of _path is hdfs://ip:port/path, replace it to /path.
+    // path like hdfs://ip:port/path can't be used by libhdfs3.
+    if (_path.find(_namenode) != _path.npos) {
+        _path = _path.substr(_namenode.size());
+    }
     RETURN_IF_ERROR(_connect());
     if (_hdfs_fs == nullptr) {
         return Status::InternalError("HDFS writer open without client");
@@ -129,31 +140,10 @@ Status HDFSWriter::close() {
 }
 
 Status HDFSWriter::_connect() {
-    hdfsBuilder* hdfs_builder = hdfsNewBuilder();
-    hdfsBuilderSetNameNode(hdfs_builder, _namenode.c_str());
-    // set hdfs user
-    if (!_user.empty()) {
-        hdfsBuilderSetUserName(hdfs_builder, _user.c_str());
-    }
-    // set kerberos conf
-    if (!_kerb_principal.empty()) {
-        hdfsBuilderSetPrincipal(hdfs_builder, _kerb_principal.c_str());
-    }
-    if (!_kerb_ticket_cache_path.empty()) {
-        hdfsBuilderSetKerbTicketCachePath(hdfs_builder, _kerb_ticket_cache_path.c_str());
-    }
-    // set token
-    if (!_token.empty()) {
-        hdfsBuilderSetToken(hdfs_builder, _token.c_str());
-    }
-    // set other conf
-    if (!_properties.empty()) {
-        std::map<std::string, std::string>::iterator iter;
-        for (iter = _properties.begin(); iter != _properties.end(); ++iter) {
-            hdfsBuilderConfSetStr(hdfs_builder, iter->first.c_str(), iter->second.c_str());
-        }
+    if (_builder.is_need_kinit()) {
+        RETURN_IF_ERROR(_builder.run_kinit());
     }
-    _hdfs_fs = hdfsBuilderConnect(hdfs_builder);
+    _hdfs_fs = hdfsBuilderConnect(_builder.get());
     if (_hdfs_fs == nullptr) {
         std::stringstream ss;
         ss << "connect to hdfs failed. namenode address:" << _namenode << ", error"
@@ -163,41 +153,34 @@ Status HDFSWriter::_connect() {
     return Status::OK();
 }
 
-Status HDFSWriter::_parse_properties(std::map<std::string, std::string>& prop) {
+THdfsParams HDFSWriter::_parse_properties(std::map<std::string, std::string>& prop) {
     std::map<std::string, std::string>::iterator iter;
+    std::vector<THdfsConf> hdfs_configs;
+    THdfsParams hdfsParams;
     for (iter = prop.begin(); iter != prop.end();) {
         if (iter->first.compare(FS_KEY) == 0) {
             _namenode = iter->second;
+            hdfsParams.__set_fs_name(_namenode);
             iter = prop.erase(iter);
         } else if (iter->first.compare(USER) == 0) {
-            _user = iter->second;
+            hdfsParams.__set_user(iter->second);
             iter = prop.erase(iter);
         } else if (iter->first.compare(KERBEROS_PRINCIPAL) == 0) {
-            _kerb_principal = iter->second;
+            hdfsParams.__set_hdfs_kerberos_principal(iter->second);
             iter = prop.erase(iter);
-        } else if (iter->first.compare(KERB_TICKET_CACHE_PATH) == 0) {
-            _kerb_ticket_cache_path = iter->second;
-            iter = prop.erase(iter);
-        } else if (iter->first.compare(TOKEN) == 0) {
-            _token = iter->second;
+        } else if (iter->first.compare(KERBEROS_KEYTAB) == 0) {
+            hdfsParams.__set_hdfs_kerberos_keytab(iter->second);
             iter = prop.erase(iter);
         } else {
-            ++iter;
+            THdfsConf item;
+            item.key = iter->first;
+            item.value = iter->second;
+            hdfs_configs.push_back(item);
+            iter = prop.erase(iter);
         }
     }
-
-    if (_namenode.empty()) {
-        LOG(WARNING) << "hdfs properties is incorrect.";
-        return Status::InternalError("hdfs properties is incorrect");
-    }
-
-    // if the format of _path is hdfs://ip:port/path, replace it to /path.
-    // path like hdfs://ip:port/path can't be used by libhdfs3.
-    if (_path.find(_namenode) != _path.npos) {
-        _path = _path.substr(_namenode.size());
-    }
-
-    return Status::OK();
+    hdfsParams.__set_hdfs_conf(hdfs_configs);
+    return hdfsParams;
 }
 
 } // end namespace doris
diff --git a/be/src/io/hdfs_writer.h b/be/src/io/hdfs_writer.h
index e26a3eb7ad..8e0091e2a7 100644
--- a/be/src/io/hdfs_writer.h
+++ b/be/src/io/hdfs_writer.h
@@ -17,12 +17,12 @@
 
 #pragma once
 
-#include <hdfs/hdfs.h>
-
 #include <map>
 #include <string>
 
+#include "gen_cpp/PlanNodes_types.h"
 #include "io/file_writer.h"
+#include "io/hdfs_builder.h"
 
 namespace doris {
 class HDFSWriter : public FileWriter {
@@ -39,18 +39,16 @@ public:
 
 private:
     Status _connect();
-    Status _parse_properties(std::map<std::string, std::string>& prop);
+    THdfsParams _parse_properties(std::map<std::string, std::string>& prop);
 
     std::map<std::string, std::string> _properties;
-    std::string _user = "";
     std::string _namenode = "";
     std::string _path = "";
-    std::string _kerb_principal = "";
-    std::string _kerb_ticket_cache_path = "";
-    std::string _token = "";
     hdfsFS _hdfs_fs = nullptr;
     hdfsFile _hdfs_file = nullptr;
     bool _closed = false;
+    THdfsParams _hdfs_params;
+    HDFSCommonBuilder _builder;
 };
 
 } // namespace doris
diff --git a/be/test/CMakeLists.txt b/be/test/CMakeLists.txt
index b18a9a659b..2323602b54 100644
--- a/be/test/CMakeLists.txt
+++ b/be/test/CMakeLists.txt
@@ -58,7 +58,7 @@ set(EXEC_TEST_FILES
     exec/es_scan_reader_test.cpp
     exec/s3_reader_test.cpp
     exec/multi_bytes_separator_test.cpp
-    # exec/hdfs_file_reader_test.cpp
+    exec/hdfs_file_reader_test.cpp
     # exec/new_olap_scan_node_test.cpp
     # exec/pre_aggregation_node_test.cpp
     # exec/partitioned_hash_table_test.cpp
diff --git a/be/test/exec/hdfs_file_reader_test.cpp b/be/test/exec/hdfs_file_reader_test.cpp
index a5e85f2bc8..4c9bdafba4 100644
--- a/be/test/exec/hdfs_file_reader_test.cpp
+++ b/be/test/exec/hdfs_file_reader_test.cpp
@@ -27,12 +27,18 @@ class HdfsFileReaderTest : public testing::Test {};
 
 TEST_F(HdfsFileReaderTest, test_connect_fail) {
     THdfsParams hdfsParams;
-    hdfsParams.fs_name = "hdfs://127.0.0.1:8888"; // An invalid address
+    hdfsParams.__set_fs_name("hdfs://127.0.0.9:8888"); // An invalid address
+    hdfsParams.__set_hdfs_kerberos_principal("somebody@TEST.COM");
+    hdfsParams.__set_hdfs_kerberos_keytab("/etc/keytab/doris.keytab");
+    std::vector<THdfsConf> confs;
+    THdfsConf item;
+    item.key = "dfs.ha.namenodes.service1";
+    item.value = "n1,n2";
+    confs.push_back(item);
+    hdfsParams.__set_hdfs_conf(confs);
     HdfsFileReader hdfs_file_reader(hdfsParams, "/user/foo/test.data", 0);
     Status status = hdfs_file_reader.open();
-    hdfs_file_reader.close();
-    std::string msg = status.get_error_msg();
-    EXPECT_TRUE(msg.find("Connection refused") >= 0);
+    EXPECT_EQ(TStatusCode::INTERNAL_ERROR, status.code());
     hdfs_file_reader.close();
 }
 
diff --git a/docs/en/docs/ecosystem/external-table/hive-of-doris.md b/docs/en/docs/ecosystem/external-table/hive-of-doris.md
index 741e167c16..1159bb2a61 100644
--- a/docs/en/docs/ecosystem/external-table/hive-of-doris.md
+++ b/docs/en/docs/ecosystem/external-table/hive-of-doris.md
@@ -30,6 +30,7 @@ Hive External Table of Doris provides Doris with direct access to Hive external
 
  1. support for Hive data sources to access Doris
  2. Support joint queries between Doris and Hive data sources to perform more complex analysis operations
+ 3. Support access to kerberos-enabled Hive data sources
 
 This document introduces how to use this feature and the considerations.
 
@@ -84,11 +85,37 @@ PROPERTIES (
 'database' = 'hive_db',
 'table' = 'hive_table',
 'dfs.nameservices'='hacluster',
-'dfs.ha.namenodes.hacluster'='3,4',
-'dfs.namenode.rpc-address.hacluster.3'='192.168.0.93:8020',
-'dfs.namenode.rpc-address.hacluster.4'='172.21.16.11:8020',
+'dfs.ha.namenodes.hacluster'='n1,n2',
+'dfs.namenode.rpc-address.hacluster.n1'='192.168.0.1:8020',
+'dfs.namenode.rpc-address.hacluster.n2'='192.168.0.2:8020',
 'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
 );
+
+
+-- Example 3: Create the hive external table under hive_db in Hive cluster with HDFS HA and enable kerberos authentication. 
+CREATE TABLE `t_hive` (
+  `k1` int NOT NULL COMMENT "",
+  `k2` char(10) NOT NULL COMMENT "",
+  `k3` datetime NOT NULL COMMENT "",
+  `k5` varchar(20) NOT NULL COMMENT "",
+  `k6` double NOT NULL COMMENT ""
+) ENGINE=HIVE
+COMMENT "HIVE"
+PROPERTIES (
+'hive.metastore.uris' = 'thrift://192.168.0.1:9083',
+'database' = 'hive_db',
+'table' = 'hive_table',
+'dfs.nameservices'='hacluster',
+'dfs.ha.namenodes.hacluster'='n1,n2',
+'dfs.namenode.rpc-address.hacluster.n1'='192.168.0.1:8020',
+'dfs.namenode.rpc-address.hacluster.n2'='192.168.0.2:8020',
+'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider',
+'hadoop.security.authentication'='kerberos',
+'dfs.namenode.kerberos.principal'='hadoop/_HOST@REALM.COM'
+'hadoop.kerberos.principal'='doris_test@REALM.COM',
+'hadoop.kerberos.keytab'='/path/to/doris_test.keytab'
+);
+
 ```
 
 #### Parameter Description
@@ -103,10 +130,22 @@ PROPERTIES (
     - `hive.metastore.uris`: Hive Metastore service address
     - `database`: the name of the database to which Hive is mounted
     - `table`: the name of the table to which Hive is mounted
-    - `dfs.nameservices`:the logical name for this new nameservice. See hdfs-site.xml
+    - `dfs.nameservices`: the logical name for this new nameservice. See hdfs-site.xml
     - `dfs.ha.namenodes.[nameservice ID]`:unique identifiers for each NameNode in the nameservice. See hdfs-site.xml
     - `dfs.namenode.rpc-address.[nameservice ID].[name node ID]`:the fully-qualified RPC address for each NameNode to listen on. See hdfs-site.xml
     - `dfs.client.failover.proxy.provider.[nameservice ID]`:the Java class that HDFS clients use to contact the Active NameNode, usually it is org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
+- For a kerberos enabled Hive datasource, additional properties need to be set:
+    - `dfs.namenode.kerberos.principal`: HDFS namenode service principal
+    - `hadoop.security.authentication`: HDFS authentication type please set kerberos, default simple
+    - `hadoop.kerberos.principal`: The Kerberos pincipal that Doris will use when connectiong to HDFS.
+    - `hadoop.kerberos.keytab`: HDFS client keytab location.
+
+**Note:**
+- To enable Doris to access the hadoop cluster with kerberos authentication enabled, you need to deploy the Kerberos client kinit on the Doris all FE and BE nodes, configure krb5.conf, and fill in the KDC service information.
+- The value of the PROPERTIES property `hadoop.kerberos.keytab` needs to specify the absolute path of the keytab local file and allow the Doris process to access it.
+
+
+
     
 ## Data Type Matching
 
diff --git a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
index 244c0173ab..676949581f 100644
--- a/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
+++ b/docs/en/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
@@ -69,17 +69,27 @@ illustrate:
        The following properties are supported:
        column_separator: column separator
        line_delimiter: line delimiter
-       max_file_size: The size limit of a single file, if the result exceeds this value, it will be cut into multiple files.
+       max_file_size: the size limit of a single file, if the result exceeds this value, it will be cut into multiple files.
     
        Broker related properties need to be prefixed with `broker.`:
        broker.name: broker name
        broker.hadoop.security.authentication: specify the authentication method as kerberos
        broker.kerberos_principal: specifies the principal of kerberos
-       broker.kerberos_keytab: Specifies the path to the keytab file of kerberos. The file must be the absolute path to the file on the server where the broker process is located. and can be accessed by the Broker process
+       broker.kerberos_keytab: specifies the path to the keytab file of kerberos. The file must be the absolute path to the file on the server where the broker process is located. and can be accessed by the Broker process
     
-       HDFS related properties need to be prefixed with `hdfs.`:
-       hdfs.fs.defaultFS: namenode address and port
-       hdfs.hdfs_user: hdfs username
+       HDFS related properties:
+       fs.defaultFS: namenode address and port
+       hadoop.username: hdfs username
+       dfs.nameservices: if hadoop enable HA, please set fs nameservice. See hdfs-site.xml
+       dfs.ha.namenodes.[nameservice ID]:unique identifiers for each NameNode in the nameservice. See hdfs-site.xml
+       dfs.namenode.rpc-address.[nameservice ID].[name node ID]`:the fully-qualified RPC address for each NameNode to listen on. See hdfs-site.xml
+       dfs.client.failover.proxy.provider.[nameservice ID]:the Java class that HDFS clients use to contact the Active NameNode, usually it is org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
+
+       For a kerberos-authentication enabled Hadoop cluster, additional properties need to be set:
+       dfs.namenode.kerberos.principal: HDFS namenode service principal
+       hadoop.security.authentication: kerberos
+       hadoop.kerberos.principal: the Kerberos pincipal that Doris will use when connectiong to HDFS.
+       hadoop.kerberos.keytab: HDFS client keytab location.
     
        For the S3 protocol, you can directly execute the S3 protocol configuration:
        AWS_ENDPOINT
@@ -241,10 +251,30 @@ illustrate:
    FORMAT AS CSV
    PROPERTIES
    (
-       "hdfs.fs.defaultFS" = "hdfs://ip:port",
-       "hdfs.hdfs_user" = "work"
+       "fs.defaultFS" = "hdfs://ip:port",
+       "hadoop.username" = "work"
+   );
+   ```
+
+   If the Hadoop cluster is highly available and Kerberos authentication is enabled, you can refer to the following SQL statement:
+
+   ```sql
+   SELECT * FROM tbl
+   INTO OUTFILE "hdfs://path/to/result_"
+   FORMAT AS CSV
+   PROPERTIES
+   (
+   'fs.defaultFS'='hdfs://hacluster/',
+   'dfs.nameservices'='hacluster',
+   'dfs.ha.namenodes.hacluster'='n1,n2',
+   'dfs.namenode.rpc-address.hacluster.n1'='192.168.0.1:8020',
+   'dfs.namenode.rpc-address.hacluster.n2'='192.168.0.2:8020',
+   'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider',
+   'dfs.namenode.kerberos.principal'='hadoop/_HOST@REALM.COM'
+   'hadoop.security.authentication'='kerberos',
+   'hadoop.kerberos.principal'='doris_test@REALM.COM',
+   'hadoop.kerberos.keytab'='/path/to/doris_test.keytab'
    );
-   ````
 
    If the final generated file is not larger than 100MB, it will be: `result_0.csv`.
    If larger than 100MB, it may be `result_0.csv, result_1.csv, ...`.
diff --git a/docs/zh-CN/docs/ecosystem/external-table/hive-of-doris.md b/docs/zh-CN/docs/ecosystem/external-table/hive-of-doris.md
index f5c7a017e4..70a92333fe 100644
--- a/docs/zh-CN/docs/ecosystem/external-table/hive-of-doris.md
+++ b/docs/zh-CN/docs/ecosystem/external-table/hive-of-doris.md
@@ -30,7 +30,8 @@ Hive External Table of Doris 提供了 Doris 直接访问 Hive 外部表的能
 
 1. 支持 Hive 数据源接入Doris
 2. 支持 Doris 与 Hive 数据源中的表联合查询,进行更加复杂的分析操作
-
+3. 支持 访问开启 kerberos 的 Hive 数据源
+ 
 本文档主要介绍该功能的使用方式和注意事项等。
 
 ## 名词解释
@@ -84,11 +85,37 @@ PROPERTIES (
 'database' = 'hive_db',
 'table' = 'hive_table',
 'dfs.nameservices'='hacluster',
-'dfs.ha.namenodes.hacluster'='3,4',
-'dfs.namenode.rpc-address.hacluster.3'='192.168.0.93:8020',
-'dfs.namenode.rpc-address.hacluster.4'='172.21.16.11:8020',
+'dfs.ha.namenodes.hacluster'='n1,n2',
+'dfs.namenode.rpc-address.hacluster.n1'='192.168.0.1:8020',
+'dfs.namenode.rpc-address.hacluster.n2'='192.168.0.2:8020',
 'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
 );
+
+
+-- 例子3:创建 Hive 集群中 hive_db 下的 hive_table 表, HDFS使用HA配置并开启kerberos认证方式
+CREATE TABLE `t_hive` (
+  `k1` int NOT NULL COMMENT "",
+  `k2` char(10) NOT NULL COMMENT "",
+  `k3` datetime NOT NULL COMMENT "",
+  `k5` varchar(20) NOT NULL COMMENT "",
+  `k6` double NOT NULL COMMENT ""
+) ENGINE=HIVE
+COMMENT "HIVE"
+PROPERTIES (
+'hive.metastore.uris' = 'thrift://192.168.0.1:9083',
+'database' = 'hive_db',
+'table' = 'hive_table',
+'dfs.nameservices'='hacluster',
+'dfs.ha.namenodes.hacluster'='n1,n2',
+'dfs.namenode.rpc-address.hacluster.n1'='192.168.0.1:8020',
+'dfs.namenode.rpc-address.hacluster.n2'='192.168.0.2:8020',
+'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider',
+'dfs.namenode.kerberos.principal'='hadoop/_HOST@REALM.COM'
+'hadoop.security.authentication'='kerberos',
+'hadoop.kerberos.principal'='doris_test@REALM.COM',
+'hadoop.kerberos.keytab'='/path/to/doris_test.keytab'
+);
+
 ```
 
 #### 参数说明:
@@ -108,6 +135,16 @@ PROPERTIES (
     - `dfs.namenode.rpc-address.[nameservice ID].[name node ID]`:Name node的rpc地址,数量与namenode数量相同,与hdfs-site.xml保持一致
     - `dfs.client.failover.proxy.provider.[nameservice ID] `:HDFS客户端连接活跃namenode的java类,通常是"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
 
+- 访问开启kerberos的Hive数据源,需要为Hive外表额外配置如下 PROPERTIES 属性:
+    - `hadoop.security.authentication`:认证方式请设置为 kerberos,默认为simple
+    - `dfs.namenode.kerberos.principal`:HDFS namenode 服务的Kerberos 主体
+    - `hadoop.kerberos.principal`:设置 Doris 连接 HDFS 时使用的 Kerberos 主体
+    - `hadoop.kerberos.keytab`:设置 keytab 本地文件路径
+
+**注意:**
+- 若要使 Doris 访问开启kerberos认证方式的hadoop集群,需要在 Doris 集群所有运行节点上部署 Kerberos 客户端 kinit,并配置 krb5.conf,填写KDC 服务信息等。
+- PROPERTIES 属性 `hadoop.kerberos.keytab` 的值需要指定 keytab 本地文件的绝对路径,并允许 Doris 进程访问该本地文件。
+
 ## 类型匹配
 
 支持的 Hive 列类型与 Doris 对应关系如下表:
diff --git a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
index fee56b66c0..026d83321c 100644
--- a/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
+++ b/docs/zh-CN/docs/sql-manual/sql-reference/Data-Manipulation-Statements/OUTFILE.md
@@ -75,10 +75,20 @@ INTO OUTFILE "file_path"
         broker.kerberos_principal: 指定 kerberos 的 principal
         broker.kerberos_keytab: 指定 kerberos 的 keytab 文件路径。该文件必须为 Broker 进程所在服务器上的文件的绝对路径。并且可以被 Broker 进程访问
         
-        HDFS 相关属性需加前缀 `hdfs.`:
-        hdfs.fs.defaultFS: namenode 地址和端口
-        hdfs.hdfs_user: hdfs 用户名
-        
+        HDFS 相关属性:
+        fs.defaultFS: namenode 地址和端口
+        hadoop.username: hdfs 用户名
+        dfs.nameservices: name service名称,与hdfs-site.xml保持一致
+        dfs.ha.namenodes.[nameservice ID]: namenode的id列表,与hdfs-site.xml保持一致
+        dfs.namenode.rpc-address.[nameservice ID].[name node ID]: Name node的rpc地址,数量与namenode数量相同,与hdfs-site.xml保
+持一致
+        dfs.client.failover.proxy.provider.[nameservice ID]: HDFS客户端连接活跃namenode的java类,通常是"org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
+
+        对于开启kerberos认证的Hadoop 集群,还需要额外设置如下 PROPERTIES 属性:
+        dfs.namenode.kerberos.principal: HDFS namenode 服务的 principal 名称
+        hadoop.security.authentication: 认证方式设置为 kerberos
+        hadoop.kerberos.principal: 设置 Doris 连接 HDFS 时使用的 Kerberos 主体
+        hadoop.kerberos.keytab: 设置 keytab 本地文件路径
         S3 协议则直接执行 S3 协议配置即可:
         AWS_ENDPOINT
         AWS_ACCESS_KEY
@@ -239,8 +249,29 @@ INTO OUTFILE "file_path"
     FORMAT AS CSV
     PROPERTIES
     (
-        "hdfs.fs.defaultFS" = "hdfs://ip:port",
-        "hdfs.hdfs_user" = "work"
+        "fs.defaultFS" = "hdfs://ip:port",
+        "hadoop.username" = "work"
+    );
+    ```
+
+    如果Hadoop 集群开启高可用并且启用 Kerberos 认证,可以参考如下SQL语句:
+
+    ```sql
+    SELECT * FROM tbl
+    INTO OUTFILE "hdfs://path/to/result_"
+    FORMAT AS CSV
+    PROPERTIES
+    (
+    'fs.defaultFS'='hdfs://hacluster/',
+    'dfs.nameservices'='hacluster',
+    'dfs.ha.namenodes.hacluster'='n1,n2',
+    'dfs.namenode.rpc-address.hacluster.n1'='192.168.0.1:8020',
+    'dfs.namenode.rpc-address.hacluster.n2'='192.168.0.2:8020',
+    'dfs.client.failover.proxy.provider.hacluster'='org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider',
+    'dfs.namenode.kerberos.principal'='hadoop/_HOST@REALM.COM'
+    'hadoop.security.authentication'='kerberos',
+    'hadoop.kerberos.principal'='doris_test@REALM.COM',
+    'hadoop.kerberos.keytab'='/path/to/doris_test.keytab'
     );
     ```
     
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
index 20d552cd4f..8443949d35 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/OutFileClause.java
@@ -26,6 +26,7 @@ import org.apache.doris.common.Config;
 import org.apache.doris.common.FeConstants;
 import org.apache.doris.common.FeNameFormat;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.common.util.ParseUtil;
 import org.apache.doris.common.util.PrintableMap;
 import org.apache.doris.qe.ConnectContext;
@@ -87,7 +88,8 @@ public class OutFileClause {
     public static final String LOCAL_FILE_PREFIX = "file:///";
     private static final String S3_FILE_PREFIX = "S3://";
     private static final String HDFS_FILE_PREFIX = "hdfs://";
-    private static final String HDFS_PROP_PREFIX = "hdfs.";
+    private static final String HADOOP_FS_PROP_PREFIX = "dfs.";
+    private static final String HADOOP_PROP_PREFIX = "hadoop.";
     private static final String BROKER_PROP_PREFIX = "broker.";
     private static final String PROP_BROKER_NAME = "broker.name";
     private static final String PROP_COLUMN_SEPARATOR = "column_separator";
@@ -430,9 +432,13 @@ public class OutFileClause {
             } else if (entry.getKey().toUpperCase().startsWith(S3Storage.S3_PROPERTIES_PREFIX)) {
                 brokerProps.put(entry.getKey(), entry.getValue());
                 processedPropKeys.add(entry.getKey());
-            } else if (entry.getKey().startsWith(HDFS_PROP_PREFIX)
-                    && storageType == StorageBackend.StorageType.HDFS) {
-                brokerProps.put(entry.getKey().substring(HDFS_PROP_PREFIX.length()), entry.getValue());
+            } else if (entry.getKey().contains(BrokerUtil.HADOOP_FS_NAME)
+                && storageType == StorageBackend.StorageType.HDFS) {
+                brokerProps.put(entry.getKey(), entry.getValue());
+                processedPropKeys.add(entry.getKey());
+            } else if ((entry.getKey().startsWith(HADOOP_FS_PROP_PREFIX) || entry.getKey().startsWith(HADOOP_PROP_PREFIX))
+                && storageType == StorageBackend.StorageType.HDFS) {
+                brokerProps.put(entry.getKey(), entry.getValue());
                 processedPropKeys.add(entry.getKey());
             }
         }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthType.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthType.java
new file mode 100644
index 0000000000..c0c97530a0
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/AuthType.java
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.catalog;
+
+/**
+ * Define different auth type for external table such as hive/iceberg,
+ * so that BE could call secured under fileStorageSystem (enable kerberos)
+ */
+public enum AuthType {
+    SIMPLE(0, "simple"),
+    KERBEROS(1, "kerberos");
+
+    private int code;
+    private String desc;
+
+    AuthType(int code, String desc) {
+        this.code = code;
+        this.desc = desc;
+    }
+
+    public static boolean isSupportedAuthType(String authType) {
+        for (AuthType auth : values()) {
+            if (auth.getDesc().equals(authType)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+
+    public String getDesc() {
+        return desc;
+    }
+
+    public void setDesc(String desc) {
+        this.desc = desc;
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
index 1f2a651d1a..88a7fd1826 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveMetaStoreClientHelper.java
@@ -31,10 +31,12 @@ import org.apache.doris.analysis.NullLiteral;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.common.DdlException;
+import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.thrift.TBrokerFileStatus;
 import org.apache.doris.thrift.TExprOpcode;
 
 import com.google.common.base.Strings;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -55,6 +57,7 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -261,14 +264,25 @@ public class HiveMetaStoreClientHelper {
     private static List<RemoteIterator<LocatedFileStatus>> getRemoteIterator(Table table, Map<String, String> properties) throws DdlException {
         List<RemoteIterator<LocatedFileStatus>> iterators = new ArrayList<>();
         Configuration configuration = new Configuration(false);
+        boolean isSecurityEnabled = false;
         for (Map.Entry<String, String> entry : properties.entrySet()) {
             if (!entry.getKey().equals(HiveTable.HIVE_METASTORE_URIS)) {
                 configuration.set(entry.getKey(), entry.getValue());
             }
+            if (entry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION)
+                && entry.getValue().equals(AuthType.KERBEROS.getDesc())) {
+                isSecurityEnabled = true;
+            }
         }
         String location = table.getSd().getLocation();
         org.apache.hadoop.fs.Path path = new org.apache.hadoop.fs.Path(location);
         try {
+            if (isSecurityEnabled) {
+                UserGroupInformation.setConfiguration(configuration);
+                // login user from keytab
+                UserGroupInformation.loginUserFromKeytab(properties.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
+                    properties.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
+            }
             FileSystem fileSystem = path.getFileSystem(configuration);
             iterators.add(fileSystem.listLocatedStatus(path));
         } catch (IOException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
index 68fb389335..259e743f0b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/HiveTable.java
@@ -19,6 +19,7 @@ package org.apache.doris.catalog;
 
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.io.Text;
+import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.thrift.THiveTable;
 import org.apache.doris.thrift.TTableDescriptor;
 import org.apache.doris.thrift.TTableType;
@@ -39,6 +40,7 @@ import java.util.Map;
  */
 public class HiveTable extends Table {
     private static final String PROPERTY_MISSING_MSG = "Hive %s is null. Please add properties('%s'='xxx') when create table";
+    private static final String PROPERTY_ERROR_MSG = "Hive table properties('%s'='%s') is illegal or not supported. Please check it";
 
     private static final String HIVE_DB = "database";
     private static final String HIVE_TABLE = "table";
@@ -77,7 +79,7 @@ public class HiveTable extends Table {
     private void validate(Map<String, String> properties) throws DdlException {
         if (properties == null) {
             throw new DdlException("Please set properties of hive table, "
-                    + "they are: database, table and 'hive.metastore.uris'");
+                + "they are: database, table and 'hive.metastore.uris'");
         }
 
         Map<String, String> copiedProps = Maps.newHashMap(properties);
@@ -94,14 +96,48 @@ public class HiveTable extends Table {
         copiedProps.remove(HIVE_TABLE);
 
         // check hive properties
-        // hive.metastore.uris
-        String hiveMetastoreUris = copiedProps.get(HIVE_METASTORE_URIS);
-        if (Strings.isNullOrEmpty(hiveMetastoreUris)) {
+        // hive.metastore.uris 
+        String hiveMetaStoreUris = copiedProps.get(HIVE_METASTORE_URIS);
+        if (Strings.isNullOrEmpty(hiveMetaStoreUris)) {
             throw new DdlException(String.format(PROPERTY_MISSING_MSG, HIVE_METASTORE_URIS, HIVE_METASTORE_URIS));
         }
         copiedProps.remove(HIVE_METASTORE_URIS);
-        hiveProperties.put(HIVE_METASTORE_URIS, hiveMetastoreUris);
+        hiveProperties.put(HIVE_METASTORE_URIS, hiveMetaStoreUris);
 
+        // check auth type
+        String authType = copiedProps.get(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION);
+        if (Strings.isNullOrEmpty(authType)) {
+            authType = AuthType.SIMPLE.getDesc();
+        }
+        if (!AuthType.isSupportedAuthType(authType)) {
+            throw new DdlException(String.format(PROPERTY_ERROR_MSG, BrokerUtil.HADOOP_SECURITY_AUTHENTICATION, authType));
+        }
+        copiedProps.remove(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION);
+        hiveProperties.put(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION, authType);
+
+        if (AuthType.KERBEROS.getDesc().equals(authType)) {
+            // check principal
+            String principal = copiedProps.get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL);
+            if (Strings.isNullOrEmpty(principal)) {
+                throw new DdlException(String.format(PROPERTY_MISSING_MSG, BrokerUtil.HADOOP_KERBEROS_PRINCIPAL, BrokerUtil.HADOOP_KERBEROS_PRINCIPAL));
+            }
+            hiveProperties.put(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL, principal);
+            copiedProps.remove(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL);
+            // check keytab
+            String keytabPath = copiedProps.get(BrokerUtil.HADOOP_KERBEROS_KEYTAB);
+            if (Strings.isNullOrEmpty(keytabPath)) {
+                throw new DdlException(String.format(PROPERTY_MISSING_MSG, BrokerUtil.HADOOP_KERBEROS_KEYTAB, BrokerUtil.HADOOP_KERBEROS_KEYTAB));
+            }
+            if (!Strings.isNullOrEmpty(keytabPath)) {
+                hiveProperties.put(BrokerUtil.HADOOP_KERBEROS_KEYTAB, keytabPath);
+                copiedProps.remove(BrokerUtil.HADOOP_KERBEROS_KEYTAB);
+            }
+        }
+        String HDFSUserName = copiedProps.get(BrokerUtil.HADOOP_USER_NAME);
+        if (!Strings.isNullOrEmpty(HDFSUserName)) {
+            hiveProperties.put(BrokerUtil.HADOOP_USER_NAME, HDFSUserName);
+            copiedProps.remove(BrokerUtil.HADOOP_USER_NAME);
+        }
         if (!copiedProps.isEmpty()) {
             Iterator<Map.Entry<String, String>> iter = copiedProps.entrySet().iterator();
             while (iter.hasNext()) {
@@ -148,7 +184,7 @@ public class HiveTable extends Table {
     public TTableDescriptor toThrift() {
         THiveTable tHiveTable = new THiveTable(getHiveDb(), getHiveTable(), getHiveProperties());
         TTableDescriptor tTableDescriptor = new TTableDescriptor(getId(), TTableType.HIVE_TABLE,
-                fullSchema.size(), 0, getName(), "");
+            fullSchema.size(), 0, getName(), "");
         tTableDescriptor.setHiveTable(tHiveTable);
         return tTableDescriptor;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
index 843863b034..f5ecef57cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/BrokerUtil.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.StorageBackend;
 import org.apache.doris.backup.RemoteFile;
 import org.apache.doris.backup.S3Storage;
 import org.apache.doris.backup.Status;
+import org.apache.doris.catalog.AuthType;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.FsBroker;
 import org.apache.doris.common.AnalysisException;
@@ -63,6 +64,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.thrift.TException;
@@ -81,27 +83,26 @@ import java.util.Map;
 public class BrokerUtil {
     private static final Logger LOG = LogManager.getLogger(BrokerUtil.class);
 
-    private static int READ_BUFFER_SIZE_B = 1024 * 1024;
-    private static String HDFS_FS_KEY = "fs.defaultFS";
-    private static String HDFS_USER_KEY = "hdfs_user";
-    private static String HDFS_KERB_PRINCIPAL = "kerb_principal";
-    private static String HDFS_KERB_TICKET_CACHE_PATH = "kerb_ticket_cache_path";
-    private static String HDFS_KERB_TOKEN = "kerb_token";
+    private static final int READ_BUFFER_SIZE_B = 1024 * 1024;
+    public static String HADOOP_FS_NAME = "fs.defaultFS";
+    // simple or kerberos
+    public static String HADOOP_SECURITY_AUTHENTICATION = "hadoop.security.authentication";
+    public static String HADOOP_USER_NAME = "hadoop.username";
+    public static String HADOOP_KERBEROS_PRINCIPAL = "hadoop.kerberos.principal";
+    public static String HADOOP_KERBEROS_KEYTAB = "hadoop.kerberos.keytab";
 
     public static void generateHdfsParam(Map<String, String> properties, TBrokerRangeDesc rangeDesc) {
         rangeDesc.setHdfsParams(new THdfsParams());
         rangeDesc.hdfs_params.setHdfsConf(new ArrayList<>());
         for (Map.Entry<String, String> property : properties.entrySet()) {
-            if (property.getKey().equalsIgnoreCase(HDFS_FS_KEY)) {
+            if (property.getKey().equalsIgnoreCase(HADOOP_FS_NAME)) {
                 rangeDesc.hdfs_params.setFsName(property.getValue());
-            } else if (property.getKey().equalsIgnoreCase(HDFS_USER_KEY)) {
+            } else if (property.getKey().equalsIgnoreCase(HADOOP_USER_NAME)) {
                 rangeDesc.hdfs_params.setUser(property.getValue());
-            } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_PRINCIPAL)) {
-                rangeDesc.hdfs_params.setKerbPrincipal(property.getValue());
-            } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_TICKET_CACHE_PATH)) {
-                rangeDesc.hdfs_params.setKerbTicketCachePath(property.getValue());
-            } else if (property.getKey().equalsIgnoreCase(HDFS_KERB_TOKEN)) {
-                rangeDesc.hdfs_params.setToken(property.getValue());
+            } else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_PRINCIPAL)) {
+                rangeDesc.hdfs_params.setHdfsKerberosPrincipal(property.getValue());
+            } else if (property.getKey().equalsIgnoreCase(HADOOP_KERBEROS_KEYTAB)) {
+                rangeDesc.hdfs_params.setHdfsKerberosKeytab(property.getValue());
             } else {
                 THdfsConf hdfsConf = new THdfsConf();
                 hdfsConf.setKey(property.getKey());
@@ -170,27 +171,35 @@ public class BrokerUtil {
                 }
             }
         } else if (brokerDesc.getStorageType() == StorageBackend.StorageType.HDFS) {
-            if (!brokerDesc.getProperties().containsKey(HDFS_FS_KEY)
-                    || !brokerDesc.getProperties().containsKey(HDFS_USER_KEY)) {
+            if (!brokerDesc.getProperties().containsKey(HADOOP_FS_NAME)
+                || !brokerDesc.getProperties().containsKey(HADOOP_USER_NAME)) {
                 throw new UserException(String.format(
-                        "The properties of hdfs is invalid. %s and %s are needed", HDFS_FS_KEY, HDFS_USER_KEY));
+                    "The properties of hdfs is invalid. %s and %s are needed", HADOOP_FS_NAME, HADOOP_USER_NAME));
             }
-            String hdfsFsName = brokerDesc.getProperties().get(HDFS_FS_KEY);
-            String user = brokerDesc.getProperties().get(HDFS_USER_KEY);
+            String fsName = brokerDesc.getProperties().get(HADOOP_FS_NAME);
+            String userName = brokerDesc.getProperties().get(HADOOP_USER_NAME);
             Configuration conf = new Configuration();
+            boolean isSecurityEnabled = false;
             for (Map.Entry<String, String> propEntry : brokerDesc.getProperties().entrySet()) {
-                if (propEntry.getKey().equals(HDFS_FS_KEY) || propEntry.getKey().equals(HDFS_USER_KEY)) {
-                    continue;
-                }
                 conf.set(propEntry.getKey(), propEntry.getValue());
+                if (propEntry.getKey().equals(BrokerUtil.HADOOP_SECURITY_AUTHENTICATION)
+                    && propEntry.getValue().equals(AuthType.KERBEROS.getDesc())) {
+                    isSecurityEnabled = true;
+                }
             }
             try {
-                FileSystem fs = FileSystem.get(new URI(hdfsFsName), conf, user);
+                if (isSecurityEnabled) {
+                    UserGroupInformation.setConfiguration(conf);
+                    UserGroupInformation.loginUserFromKeytab(
+                        brokerDesc.getProperties().get(BrokerUtil.HADOOP_KERBEROS_PRINCIPAL),
+                        brokerDesc.getProperties().get(BrokerUtil.HADOOP_KERBEROS_KEYTAB));
+                }
+                FileSystem fs = FileSystem.get(new URI(fsName), conf, userName);
                 FileStatus[] statusList = fs.globStatus(new Path(path));
                 for (FileStatus status : statusList) {
                     if (status.isFile()) {
                         fileStatuses.add(new TBrokerFileStatus(status.getPath().toUri().getPath(),
-                                status.isDirectory(), status.getLen(), status.isFile()));
+                            status.isDirectory(), status.getLen(), status.isFile()));
                     }
                 }
             } catch (IOException | InterruptedException | URISyntaxException e) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
index 53fa6ea60d..07ec26cae3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/BrokerScanNode.java
@@ -69,6 +69,9 @@ import com.google.common.collect.Sets;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.Collections;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
index ee9b1603bb..4fda0515e7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HiveScanNode.java
@@ -33,6 +33,7 @@ import org.apache.doris.thrift.TExplainLevel;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
+
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -178,7 +179,7 @@ public class HiveScanNode extends BrokerScanNode {
         }
         List<TBrokerFileStatus> fileStatuses = new ArrayList<>();
         this.hdfsUri = HiveMetaStoreClientHelper.getHiveDataFiles(hiveTable, hivePartitionPredicate,
-                fileStatuses, remoteHiveTable);
+            fileStatuses, remoteHiveTable);
         fileStatusesList.add(fileStatuses);
         filesAdded += fileStatuses.size();
         for (TBrokerFileStatus fstatus : fileStatuses) {
diff --git a/fe/fe-core/src/test/java/org/apache/doris/catalog/HiveTableTest.java b/fe/fe-core/src/test/java/org/apache/doris/catalog/HiveTableTest.java
index 00b42a4ec0..c7dad72037 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/catalog/HiveTableTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/catalog/HiveTableTest.java
@@ -21,6 +21,7 @@ import org.apache.doris.common.DdlException;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -53,7 +54,8 @@ public class HiveTableTest {
     public void testNormal() throws DdlException {
         HiveTable table = new HiveTable(1000, "hive_table", columns, properties);
         Assert.assertEquals(String.format("%s.%s", hiveDb, hiveTable), table.getHiveDbTable());
-        Assert.assertEquals(1, table.getHiveProperties().size());
+        // HiveProperties={hadoop.security.authentication=simple, hive.metastore.uris=thrift://127.0.0.1:9083}
+        Assert.assertEquals(2, table.getHiveProperties().size());
     }
 
     @Test(expected = DdlException.class)
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 98b1d6e182..fb3a87fc4a 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -118,10 +118,9 @@ struct THdfsConf {
 struct THdfsParams {
     1: optional string fs_name
     2: optional string user
-    3: optional string kerb_principal
-    4: optional string kerb_ticket_cache_path
-    5: optional string token
-    6: optional list<THdfsConf> hdfs_conf
+    3: optional string hdfs_kerberos_principal
+    4: optional string hdfs_kerberos_keytab
+    5: optional list<THdfsConf> hdfs_conf
 }
 
 // One broker range information.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org