You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/16 12:49:45 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Support kerberos in hive and hdfs file connector (#3840)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 055ad9d83 [Feature][Connector-V2] Support kerberos in hive and hdfs file connector (#3840)
055ad9d83 is described below

commit 055ad9d8369947f935fc743def6f8df52550b090
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Mon Jan 16 20:49:38 2023 +0800

    [Feature][Connector-V2] Support kerberos in hive and hdfs file connector (#3840)
    
    * [Improve][Connector-V2][HdfsFile] Support kerberos authentication
    
    * [Improve][Connector-V2][HdfsFile] Optimize code structure
---
 docs/en/connector-v2/sink/HdfsFile.md              | 18 ++++++++--
 docs/en/connector-v2/sink/Hive.md                  | 29 +++++++++++----
 docs/en/connector-v2/source/HdfsFile.md            | 15 ++++++++
 docs/en/connector-v2/source/Hive.md                | 16 ++++++---
 .../seatunnel/file/hdfs/sink/BaseHdfsFileSink.java |  6 ++++
 .../file/hdfs/source/BaseHdfsFileSource.java       |  6 ++++
 .../seatunnel/file/config/BaseSinkConfig.java      | 17 +++++++++
 .../seatunnel/file/config/BaseSourceConfig.java    | 18 ++++++++++
 .../seatunnel/file/config/HadoopConf.java          |  2 ++
 .../file/sink/writer/AbstractWriteStrategy.java    | 23 ++++++++++++
 .../file/source/reader/AbstractReadStrategy.java   | 26 ++++++++++++++
 .../seatunnel/hive/utils/HiveMetaStoreProxy.java   | 42 +++++++++++++++++++---
 12 files changed, 198 insertions(+), 20 deletions(-)

diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md
index eed6ff210..3790fe01b 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -50,8 +50,10 @@ By default, we use 2PC commit to ensure `exactly-once`
 | sink_columns                     | array   | no       |                                            | When this parameter is empty, all fields are sink columns |
 | is_enable_transaction            | boolean | no       | true                                       |                                                           |
 | batch_size                       | int     | no       | 1000000                                    |                                                           |
+| kerberos_principal               | string  | no       | -                                          |
+| kerberos_keytab_path             | string  | no       | -                                          |                                                           |
+| compress_codec                   | string  | no       | none                                       |                                                           |
 | common-options                   | object  | no       | -                                          |                                                           |
-| compressCodec                    | string  | no       | none                                       |                                                           |
 
 ### fs.defaultFS [string]
 
@@ -151,6 +153,14 @@ Only support `true` now.
 
 The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
 
+### kerberos_principal [string]
+
+The principal of kerberos
+
+### kerberos_keytab_path [string]
+
+The keytab path of kerberos
+
 ### compressCodec [string]
 Support lzo compression for text in file format. The file name ends with ".lzo.txt" .
 
@@ -226,11 +236,13 @@ HdfsFile {
 - [BugFix] Fix filesystem get error ([3117](https://github.com/apache/incubator-seatunnel/pull/3117))
 - [BugFix] Solved the bug of can not parse '\t' as delimiter from config file ([3083](https://github.com/apache/incubator-seatunnel/pull/3083))
 
-### Next version
+### 2.3.0 2022-12-30
 - [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
   - When field from upstream is null it will throw NullPointerException
   - Sink columns mapping failed
   - When restore writer from states getting transaction directly failed
 
+### Next version
 - [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
-- [Improve] Support lzo compression for text in file format ([3782](https://github.com/apache/incubator-seatunnel/pull/3782))
\ No newline at end of file
+- [Improve] Support lzo compression for text in file format ([3782](https://github.com/apache/incubator-seatunnel/pull/3782))
+- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/Hive.md b/docs/en/connector-v2/sink/Hive.md
index 269cc1cef..d47621880 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -30,12 +30,15 @@ By default, we use 2PC commit to ensure `exactly-once`
 
 ## Options
 
-| name           | type   | required | default value |
-|----------------|--------|----------|---------------|
-| table_name     | string | yes      | -             |
-| metastore_uri  | string | yes      | -             |
-| compressCodec  | string | no       | none          |
-| common-options |        | no       | -             |
+| name                 | type   | required | default value |
+|----------------------|--------|----------|---------------|
+| table_name           | string | yes      | -             |
+| metastore_uri        | string | yes      | -             |
+| compress_codec       | string | no       | none          |
+| kerberos_principal   | string | no       | -             |
+| kerberos_keytab_path | string | no       | -             |
+| common-options       |        | no       | -             |
+
 ### table_name [string]
 
 Target Hive table name eg: db1.table1
@@ -44,6 +47,14 @@ Target Hive table name eg: db1.table1
 
 Hive metastore uri
 
+### kerberos_principal [string]
+
+The principal of kerberos
+
+### kerberos_keytab_path [string]
+
+The keytab path of kerberos
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
@@ -145,9 +156,13 @@ sink {
 ### 2.3.0-beta 2022-10-20
 - [Improve] Hive Sink supports automatic partition repair ([3133](https://github.com/apache/incubator-seatunnel/pull/3133))
 
-### Next version
+### 2.3.0 2022-12-30
 - [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
   - When field from upstream is null it will throw NullPointerException
   - Sink columns mapping failed
   - When restore writer from states getting transaction directly failed
 
+### Next version
+
+- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
+
diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md
index 869e9859e..240f89146 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -45,6 +45,8 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 | date_format               | string  | no       | yyyy-MM-dd          |
 | datetime_format           | string  | no       | yyyy-MM-dd HH:mm:ss |
 | time_format               | string  | no       | HH:mm:ss            |
+| kerberos_principal        | string  | no       | -                   |
+| kerberos_keytab_path      | string  | no       | -                   |
 | skip_header_row_number    | long    | no       | 0                   |
 | schema                    | config  | no       | -                   |
 | common-options            |         | no       | -                   |
@@ -204,6 +206,14 @@ Hdfs cluster address.
 
 The path of `hdfs-site.xml`, used to load ha configuration of namenodes
 
+### kerberos_principal [string]
+
+The principal of kerberos
+
+### kerberos_keytab_path [string]
+
+The keytab path of kerberos
+
 ### schema [Config]
 
 #### fields [Config]
@@ -253,3 +263,8 @@ HdfsFile {
 - [BugFix] Fix the bug of incorrect path in windows environment ([2980](https://github.com/apache/incubator-seatunnel/pull/2980))
 - [Improve] Support extract partition from SeaTunnelRow fields ([3085](https://github.com/apache/incubator-seatunnel/pull/3085))
 - [Improve] Support parse field from file path ([2985](https://github.com/apache/incubator-seatunnel/pull/2985))
+
+### next version
+
+- [Improve] Support skip header for csv and txt files ([3900](https://github.com/apache/incubator-seatunnel/pull/3840))
+- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/Hive.md b/docs/en/connector-v2/source/Hive.md
index b12b4802a..69df9eb9f 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -33,11 +33,13 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 
 ## Options
 
-| name           | type   | required | default value |
-|----------------|--------|----------|---------------|
-| table_name     | string | yes      | -             |
-| metastore_uri  | string | yes      | -             |
-| common-options |        | no       | -             |
+| name                 | type   | required | default value |
+|----------------------|--------|----------|---------------|
+| table_name           | string | yes      | -             |
+| metastore_uri        | string | yes      | -             |
+| kerberos_principal   | string | no       | -             |
+| kerberos_keytab_path | string | no       | -             | 
+| common-options       |        | no       | -             |
 
 ### table_name [string]
 
@@ -67,3 +69,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
 ### 2.2.0-beta 2022-09-26
 
 - Add Hive Source Connector
+
+### Next version
+
+- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
index 090f942c0..79649fb44 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
@@ -46,5 +46,11 @@ public abstract class BaseHdfsFileSink extends BaseFileSink {
         if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) {
             hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key()));
         }
+        if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_PRINCIPAL.key())) {
+            hadoopConf.setKerberosPrincipal(pluginConfig.getString(BaseSinkConfig.KERBEROS_PRINCIPAL.key()));
+        }
+        if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key())) {
+            hadoopConf.setKerberosKeytabPath(pluginConfig.getString(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key()));
+        }
     }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 4e67cf9bb..be7fc0e8c 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -55,6 +55,12 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
         if (pluginConfig.hasPath(HdfsSourceConfig.HDFS_SITE_PATH.key())) {
             hadoopConf.setHdfsSitePath(pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_PATH.key()));
         }
+        if (pluginConfig.hasPath(HdfsSourceConfig.KERBEROS_PRINCIPAL.key())) {
+            hadoopConf.setKerberosPrincipal(pluginConfig.getString(HdfsSourceConfig.KERBEROS_PRINCIPAL.key()));
+        }
+        if (pluginConfig.hasPath(HdfsSourceConfig.KERBEROS_KEYTAB_PATH.key())) {
+            hadoopConf.setKerberosKeytabPath(pluginConfig.getString(HdfsSourceConfig.KERBEROS_KEYTAB_PATH.key()));
+        }
         try {
             filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
         } catch (IOException e) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 9137576d5..5abffc13f 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -41,26 +41,32 @@ public class BaseSinkConfig {
             .stringType()
             .noDefaultValue()
             .withDescription("Compression codec");
+
     public static final Option<DateUtils.Formatter> DATE_FORMAT = Options.key("date_format")
             .enumType(DateUtils.Formatter.class)
             .defaultValue(DateUtils.Formatter.YYYY_MM_DD)
             .withDescription("Date format");
+
     public static final Option<DateTimeUtils.Formatter> DATETIME_FORMAT = Options.key("datetime_format")
             .enumType(DateTimeUtils.Formatter.class)
             .defaultValue(DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)
             .withDescription("Datetime format");
+
     public static final Option<TimeUtils.Formatter> TIME_FORMAT = Options.key("time_format")
             .enumType(TimeUtils.Formatter.class)
             .defaultValue(TimeUtils.Formatter.HH_MM_SS)
             .withDescription("Time format");
+
     public static final Option<String> FILE_PATH = Options.key("path")
             .stringType()
             .noDefaultValue()
             .withDescription("The file path of target files");
+
     public static final Option<String> FIELD_DELIMITER = Options.key("field_delimiter")
             .stringType()
             .defaultValue(DEFAULT_FIELD_DELIMITER)
             .withDescription("The separator between columns in a row of data. Only needed by `text` and `csv` file format");
+
     public static final Option<String> ROW_DELIMITER = Options.key("row_delimiter")
             .stringType()
             .defaultValue(DEFAULT_ROW_DELIMITER)
@@ -84,6 +90,7 @@ public class BaseSinkConfig {
                     "and the final file will be placed in the partition directory. " +
                     "Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. " +
                     "`k0` is the first partition field and `v0` is the value of the first partition field.");
+
     public static final Option<Boolean> IS_PARTITION_FIELD_WRITE_IN_FILE = Options.key("is_partition_field_write_in_file")
             .booleanType()
             .defaultValue(false)
@@ -136,4 +143,14 @@ public class BaseSinkConfig {
             .stringType()
             .noDefaultValue()
             .withDescription("The path of hdfs-site.xml");
+
+    public static final Option<String> KERBEROS_PRINCIPAL = Options.key("kerberos_principal")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Kerberos principal");
+
+    public static final Option<String> KERBEROS_KEYTAB_PATH = Options.key("kerberos_keytab_path")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Kerberos keytab file path");
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index d8e5bcce6..42ef21f2d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -28,34 +28,52 @@ public class BaseSourceConfig {
             .objectType(FileFormat.class)
             .noDefaultValue()
             .withDescription("File type");
+
     public static final Option<String> FILE_PATH = Options.key("path")
             .stringType()
             .noDefaultValue()
             .withDescription("The file path of source files");
+
     public static final Option<String> DELIMITER = Options.key("delimiter")
             .stringType()
             .defaultValue(String.valueOf('\001'))
             .withDescription("The separator between columns in a row of data. Only needed by `text` file format");
+
     public static final Option<DateUtils.Formatter> DATE_FORMAT = Options.key("date_format")
             .enumType(DateUtils.Formatter.class)
             .defaultValue(DateUtils.Formatter.YYYY_MM_DD)
             .withDescription("Date format");
+
     public static final Option<DateTimeUtils.Formatter> DATETIME_FORMAT = Options.key("datetime_format")
             .enumType(DateTimeUtils.Formatter.class)
             .defaultValue(DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)
             .withDescription("Datetime format");
+
     public static final Option<TimeUtils.Formatter> TIME_FORMAT = Options.key("time_format")
             .enumType(TimeUtils.Formatter.class)
             .defaultValue(TimeUtils.Formatter.HH_MM_SS)
             .withDescription("Time format");
+
     public static final Option<Boolean> PARSE_PARTITION_FROM_PATH = Options.key("parse_partition_from_path")
             .booleanType()
             .defaultValue(true)
             .withDescription("Whether parse partition fields from file path");
+
     public static final Option<String> HDFS_SITE_PATH = Options.key("hdfs_site_path")
             .stringType()
             .noDefaultValue()
             .withDescription("The path of hdfs-site.xml");
+
+    public static final Option<String> KERBEROS_PRINCIPAL = Options.key("kerberos_principal")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Kerberos principal");
+
+    public static final Option<String> KERBEROS_KEYTAB_PATH = Options.key("kerberos_keytab_path")
+            .stringType()
+            .noDefaultValue()
+            .withDescription("Kerberos keytab file path");
+
     public static final Option<Long> SKIP_HEADER_ROW_NUMBER = Options.key("skip_header_row_number")
             .longType()
             .defaultValue(0L)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
index baeb7a3ad..92b2db282 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
@@ -32,6 +32,8 @@ public class HadoopConf implements Serializable {
     protected Map<String, String> extraOptions = new HashMap<>();
     protected String hdfsNameKey;
     protected String hdfsSitePath;
+    protected String kerberosPrincipal;
+    protected String kerberosKeytabPath;
 
     public HadoopConf(String hdfsNameKey) {
         this.hdfsNameKey = hdfsNameKey;
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 50c1f365e..41bd8ab08 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -43,6 +43,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -83,6 +84,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
     protected int partId = 0;
     protected int batchSize;
     protected int currentBatchSize = 0;
+    protected boolean isKerberosAuthorization = false;
 
     public AbstractWriteStrategy(FileSinkConfig fileSinkConfig) {
         this.fileSinkConfig = fileSinkConfig;
@@ -129,6 +131,27 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
         configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
         configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl());
         this.hadoopConf.setExtraOptionsForConfiguration(configuration);
+        String principal = hadoopConf.getKerberosPrincipal();
+        String keytabPath = hadoopConf.getKerberosKeytabPath();
+        if (!isKerberosAuthorization && StringUtils.isNotBlank(principal)) {
+            // kerberos authentication and only once
+            if (StringUtils.isBlank(keytabPath)) {
+                throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED,
+                        "Kerberos keytab path is blank, please check this parameter that in your config file");
+            }
+            configuration.set("hadoop.security.authentication", "kerberos");
+            UserGroupInformation.setConfiguration(configuration);
+            try {
+                log.info("Start Kerberos authentication using principal {} and keytab {}", principal, keytabPath);
+                UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
+                log.info("Kerberos authentication successful");
+            } catch (IOException e) {
+                String errorMsg = String.format("Kerberos authentication failed using this " +
+                                "principal [%s] and keytab path [%s]", principal, keytabPath);
+                throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, e);
+            }
+            isKerberosAuthorization = true;
+        }
         return configuration;
     }
 
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 4c7de8a54..5c5479478 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -25,17 +25,21 @@ import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -68,6 +72,7 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
     protected List<String> fileNames = new ArrayList<>();
     protected boolean isMergePartition = true;
     protected long skipHeaderNumber = BaseSourceConfig.SKIP_HEADER_ROW_NUMBER.defaultValue();
+    protected boolean isKerberosAuthorization = false;
 
     @Override
     public void init(HadoopConf conf) {
@@ -90,6 +95,27 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
         configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
         configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl());
         hadoopConf.setExtraOptionsForConfiguration(configuration);
+        String principal = hadoopConf.getKerberosPrincipal();
+        String keytabPath = hadoopConf.getKerberosKeytabPath();
+        if (!isKerberosAuthorization && StringUtils.isNotBlank(principal)) {
+            // kerberos authentication and only once
+            if (StringUtils.isBlank(keytabPath)) {
+                throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED,
+                        "Kerberos keytab path is blank, please check this parameter that in your config file");
+            }
+            configuration.set("hadoop.security.authentication", "kerberos");
+            UserGroupInformation.setConfiguration(configuration);
+            try {
+                log.info("Start Kerberos authentication using principal {} and keytab {}", principal, keytabPath);
+                UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
+                log.info("Kerberos authentication successful");
+            } catch (IOException e) {
+                String errorMsg = String.format("Kerberos authentication failed using this " +
+                        "principal [%s] and keytab path [%s]", principal, keytabPath);
+                throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, e);
+            }
+            isKerberosAuthorization = true;
+        }
         return configuration;
     }
 
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
index 0d8194488..e7a522591 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.utils;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
 import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
@@ -24,26 +27,56 @@ import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorExc
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.thrift.TException;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Objects;
 
+@Slf4j
 public class HiveMetaStoreProxy {
     private final HiveMetaStoreClient hiveMetaStoreClient;
     private static volatile HiveMetaStoreProxy INSTANCE = null;
 
-    private HiveMetaStoreProxy(@NonNull String uris) {
+    private HiveMetaStoreProxy(Config config) {
+        String metastoreUri = config.getString(HiveConfig.METASTORE_URI.key());
         HiveConf hiveConf = new HiveConf();
-        hiveConf.set("hive.metastore.uris", uris);
+        hiveConf.set("hive.metastore.uris", metastoreUri);
+        if (config.hasPath(BaseSourceConfig.KERBEROS_PRINCIPAL.key()) &&
+                config.hasPath(BaseSourceConfig.KERBEROS_KEYTAB_PATH.key())) {
+            String principal = config.getString(BaseSourceConfig.KERBEROS_PRINCIPAL.key());
+            String keytabPath = config.getString(BaseSourceConfig.KERBEROS_KEYTAB_PATH.key());
+            if (StringUtils.isBlank(principal) || StringUtils.isBlank(keytabPath)) {
+                String errorMsg = String.format("Kerberos principal [%s] or keytab file path [%s] is blank," +
+                        "please check", principal, keytabPath);
+                throw new HiveConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg);
+            }
+            Configuration configuration = new Configuration();
+            configuration.set("hadoop.security.authentication", "kerberos");
+            UserGroupInformation.setConfiguration(configuration);
+            try {
+                log.info("Start Kerberos authentication using principal {} and keytab {}", principal, keytabPath);
+                UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
+                log.info("Kerberos authentication successful");
+            } catch (IOException e) {
+                String errorMsg = String.format("Kerberos authentication failed using this " +
+                                "principal [%s] and keytab path [%s]", principal, keytabPath);
+                throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, e);
+            }
+        }
         try {
             hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
         } catch (MetaException e) {
-            String errorMsg = String.format("Using this hive uris [%s] to initialize hive metastore client instance failed", uris);
+            String errorMsg = String.format("Using this hive uris [%s] to initialize " +
+                    "hive metastore client instance failed", metastoreUri);
             throw new HiveConnectorException(HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e);
         }
     }
@@ -52,8 +85,7 @@ public class HiveMetaStoreProxy {
         if (INSTANCE == null) {
             synchronized (HiveMetaStoreProxy.class) {
                 if (INSTANCE == null) {
-                    String metastoreUri = config.getString(HiveConfig.METASTORE_URI.key());
-                    INSTANCE = new HiveMetaStoreProxy(metastoreUri);
+                    INSTANCE = new HiveMetaStoreProxy(config);
                 }
             }
         }