You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/16 12:49:45 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Support kerberos in hive and hdfs file connector (#3840)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 055ad9d83 [Feature][Connector-V2] Support kerberos in hive and hdfs file connector (#3840)
055ad9d83 is described below
commit 055ad9d8369947f935fc743def6f8df52550b090
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Mon Jan 16 20:49:38 2023 +0800
[Feature][Connector-V2] Support kerberos in hive and hdfs file connector (#3840)
* [Improve][Connector-V2][HdfsFile] Support kerberos authentication
* [Improve][Connector-V2][HdfsFile] Optimize code structure
---
docs/en/connector-v2/sink/HdfsFile.md | 18 ++++++++--
docs/en/connector-v2/sink/Hive.md | 29 +++++++++++----
docs/en/connector-v2/source/HdfsFile.md | 15 ++++++++
docs/en/connector-v2/source/Hive.md | 16 ++++++---
.../seatunnel/file/hdfs/sink/BaseHdfsFileSink.java | 6 ++++
.../file/hdfs/source/BaseHdfsFileSource.java | 6 ++++
.../seatunnel/file/config/BaseSinkConfig.java | 17 +++++++++
.../seatunnel/file/config/BaseSourceConfig.java | 18 ++++++++++
.../seatunnel/file/config/HadoopConf.java | 2 ++
.../file/sink/writer/AbstractWriteStrategy.java | 23 ++++++++++++
.../file/source/reader/AbstractReadStrategy.java | 26 ++++++++++++++
.../seatunnel/hive/utils/HiveMetaStoreProxy.java | 42 +++++++++++++++++++---
12 files changed, 198 insertions(+), 20 deletions(-)
diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md
index eed6ff210..3790fe01b 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -50,8 +50,10 @@ By default, we use 2PC commit to ensure `exactly-once`
| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true | |
| batch_size | int | no | 1000000 | |
+| kerberos_principal | string | no | - |
+| kerberos_keytab_path | string | no | - | |
+| compress_codec | string | no | none | |
| common-options | object | no | - | |
-| compressCodec | string | no | none | |
### fs.defaultFS [string]
@@ -151,6 +153,14 @@ Only support `true` now.
The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
+### kerberos_principal [string]
+
+The principal of kerberos
+
+### kerberos_keytab_path [string]
+
+The keytab path of kerberos
+
### compressCodec [string]
Support lzo compression for text in file format. The file name ends with ".lzo.txt" .
@@ -226,11 +236,13 @@ HdfsFile {
- [BugFix] Fix filesystem get error ([3117](https://github.com/apache/incubator-seatunnel/pull/3117))
- [BugFix] Solved the bug of can not parse '\t' as delimiter from config file ([3083](https://github.com/apache/incubator-seatunnel/pull/3083))
-### Next version
+### 2.3.0 2022-12-30
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed
+### Next version
- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
-- [Improve] Support lzo compression for text in file format ([3782](https://github.com/apache/incubator-seatunnel/pull/3782))
\ No newline at end of file
+- [Improve] Support lzo compression for text in file format ([3782](https://github.com/apache/incubator-seatunnel/pull/3782))
+- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/Hive.md b/docs/en/connector-v2/sink/Hive.md
index 269cc1cef..d47621880 100644
--- a/docs/en/connector-v2/sink/Hive.md
+++ b/docs/en/connector-v2/sink/Hive.md
@@ -30,12 +30,15 @@ By default, we use 2PC commit to ensure `exactly-once`
## Options
-| name | type | required | default value |
-|----------------|--------|----------|---------------|
-| table_name | string | yes | - |
-| metastore_uri | string | yes | - |
-| compressCodec | string | no | none |
-| common-options | | no | - |
+| name | type | required | default value |
+|----------------------|--------|----------|---------------|
+| table_name | string | yes | - |
+| metastore_uri | string | yes | - |
+| compress_codec | string | no | none |
+| kerberos_principal | string | no | - |
+| kerberos_keytab_path | string | no | - |
+| common-options | | no | - |
+
### table_name [string]
Target Hive table name eg: db1.table1
@@ -44,6 +47,14 @@ Target Hive table name eg: db1.table1
Hive metastore uri
+### kerberos_principal [string]
+
+The principal of kerberos
+
+### kerberos_keytab_path [string]
+
+The keytab path of kerberos
+
### common options
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
@@ -145,9 +156,13 @@ sink {
### 2.3.0-beta 2022-10-20
- [Improve] Hive Sink supports automatic partition repair ([3133](https://github.com/apache/incubator-seatunnel/pull/3133))
-### Next version
+### 2.3.0 2022-12-30
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed
+### Next version
+
+- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
+
diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md
index 869e9859e..240f89146 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -45,6 +45,8 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| date_format | string | no | yyyy-MM-dd |
| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
| time_format | string | no | HH:mm:ss |
+| kerberos_principal | string | no | - |
+| kerberos_keytab_path | string | no | - |
| skip_header_row_number | long | no | 0 |
| schema | config | no | - |
| common-options | | no | - |
@@ -204,6 +206,14 @@ Hdfs cluster address.
The path of `hdfs-site.xml`, used to load ha configuration of namenodes
+### kerberos_principal [string]
+
+The principal of kerberos
+
+### kerberos_keytab_path [string]
+
+The keytab path of kerberos
+
### schema [Config]
#### fields [Config]
@@ -253,3 +263,8 @@ HdfsFile {
- [BugFix] Fix the bug of incorrect path in windows environment ([2980](https://github.com/apache/incubator-seatunnel/pull/2980))
- [Improve] Support extract partition from SeaTunnelRow fields ([3085](https://github.com/apache/incubator-seatunnel/pull/3085))
- [Improve] Support parse field from file path ([2985](https://github.com/apache/incubator-seatunnel/pull/2985))
+
+### next version
+
+- [Improve] Support skip header for csv and txt files ([3900](https://github.com/apache/incubator-seatunnel/pull/3840))
+- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/Hive.md b/docs/en/connector-v2/source/Hive.md
index b12b4802a..69df9eb9f 100644
--- a/docs/en/connector-v2/source/Hive.md
+++ b/docs/en/connector-v2/source/Hive.md
@@ -33,11 +33,13 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
## Options
-| name | type | required | default value |
-|----------------|--------|----------|---------------|
-| table_name | string | yes | - |
-| metastore_uri | string | yes | - |
-| common-options | | no | - |
+| name | type | required | default value |
+|----------------------|--------|----------|---------------|
+| table_name | string | yes | - |
+| metastore_uri | string | yes | - |
+| kerberos_principal | string | no | - |
+| kerberos_keytab_path | string | no | - |
+| common-options | | no | - |
### table_name [string]
@@ -67,3 +69,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
### 2.2.0-beta 2022-09-26
- Add Hive Source Connector
+
+### Next version
+
+- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
index 090f942c0..79649fb44 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
@@ -46,5 +46,11 @@ public abstract class BaseHdfsFileSink extends BaseFileSink {
if (pluginConfig.hasPath(BaseSinkConfig.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(pluginConfig.getString(BaseSinkConfig.HDFS_SITE_PATH.key()));
}
+ if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_PRINCIPAL.key())) {
+ hadoopConf.setKerberosPrincipal(pluginConfig.getString(BaseSinkConfig.KERBEROS_PRINCIPAL.key()));
+ }
+ if (pluginConfig.hasPath(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key())) {
+ hadoopConf.setKerberosKeytabPath(pluginConfig.getString(BaseSinkConfig.KERBEROS_KEYTAB_PATH.key()));
+ }
}
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 4e67cf9bb..be7fc0e8c 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -55,6 +55,12 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
if (pluginConfig.hasPath(HdfsSourceConfig.HDFS_SITE_PATH.key())) {
hadoopConf.setHdfsSitePath(pluginConfig.getString(HdfsSourceConfig.HDFS_SITE_PATH.key()));
}
+ if (pluginConfig.hasPath(HdfsSourceConfig.KERBEROS_PRINCIPAL.key())) {
+ hadoopConf.setKerberosPrincipal(pluginConfig.getString(HdfsSourceConfig.KERBEROS_PRINCIPAL.key()));
+ }
+ if (pluginConfig.hasPath(HdfsSourceConfig.KERBEROS_KEYTAB_PATH.key())) {
+ hadoopConf.setKerberosKeytabPath(pluginConfig.getString(HdfsSourceConfig.KERBEROS_KEYTAB_PATH.key()));
+ }
try {
filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
} catch (IOException e) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 9137576d5..5abffc13f 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -41,26 +41,32 @@ public class BaseSinkConfig {
.stringType()
.noDefaultValue()
.withDescription("Compression codec");
+
public static final Option<DateUtils.Formatter> DATE_FORMAT = Options.key("date_format")
.enumType(DateUtils.Formatter.class)
.defaultValue(DateUtils.Formatter.YYYY_MM_DD)
.withDescription("Date format");
+
public static final Option<DateTimeUtils.Formatter> DATETIME_FORMAT = Options.key("datetime_format")
.enumType(DateTimeUtils.Formatter.class)
.defaultValue(DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)
.withDescription("Datetime format");
+
public static final Option<TimeUtils.Formatter> TIME_FORMAT = Options.key("time_format")
.enumType(TimeUtils.Formatter.class)
.defaultValue(TimeUtils.Formatter.HH_MM_SS)
.withDescription("Time format");
+
public static final Option<String> FILE_PATH = Options.key("path")
.stringType()
.noDefaultValue()
.withDescription("The file path of target files");
+
public static final Option<String> FIELD_DELIMITER = Options.key("field_delimiter")
.stringType()
.defaultValue(DEFAULT_FIELD_DELIMITER)
.withDescription("The separator between columns in a row of data. Only needed by `text` and `csv` file format");
+
public static final Option<String> ROW_DELIMITER = Options.key("row_delimiter")
.stringType()
.defaultValue(DEFAULT_ROW_DELIMITER)
@@ -84,6 +90,7 @@ public class BaseSinkConfig {
"and the final file will be placed in the partition directory. " +
"Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. " +
"`k0` is the first partition field and `v0` is the value of the first partition field.");
+
public static final Option<Boolean> IS_PARTITION_FIELD_WRITE_IN_FILE = Options.key("is_partition_field_write_in_file")
.booleanType()
.defaultValue(false)
@@ -136,4 +143,14 @@ public class BaseSinkConfig {
.stringType()
.noDefaultValue()
.withDescription("The path of hdfs-site.xml");
+
+ public static final Option<String> KERBEROS_PRINCIPAL = Options.key("kerberos_principal")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Kerberos principal");
+
+ public static final Option<String> KERBEROS_KEYTAB_PATH = Options.key("kerberos_keytab_path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Kerberos keytab file path");
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index d8e5bcce6..42ef21f2d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -28,34 +28,52 @@ public class BaseSourceConfig {
.objectType(FileFormat.class)
.noDefaultValue()
.withDescription("File type");
+
public static final Option<String> FILE_PATH = Options.key("path")
.stringType()
.noDefaultValue()
.withDescription("The file path of source files");
+
public static final Option<String> DELIMITER = Options.key("delimiter")
.stringType()
.defaultValue(String.valueOf('\001'))
.withDescription("The separator between columns in a row of data. Only needed by `text` file format");
+
public static final Option<DateUtils.Formatter> DATE_FORMAT = Options.key("date_format")
.enumType(DateUtils.Formatter.class)
.defaultValue(DateUtils.Formatter.YYYY_MM_DD)
.withDescription("Date format");
+
public static final Option<DateTimeUtils.Formatter> DATETIME_FORMAT = Options.key("datetime_format")
.enumType(DateTimeUtils.Formatter.class)
.defaultValue(DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)
.withDescription("Datetime format");
+
public static final Option<TimeUtils.Formatter> TIME_FORMAT = Options.key("time_format")
.enumType(TimeUtils.Formatter.class)
.defaultValue(TimeUtils.Formatter.HH_MM_SS)
.withDescription("Time format");
+
public static final Option<Boolean> PARSE_PARTITION_FROM_PATH = Options.key("parse_partition_from_path")
.booleanType()
.defaultValue(true)
.withDescription("Whether parse partition fields from file path");
+
public static final Option<String> HDFS_SITE_PATH = Options.key("hdfs_site_path")
.stringType()
.noDefaultValue()
.withDescription("The path of hdfs-site.xml");
+
+ public static final Option<String> KERBEROS_PRINCIPAL = Options.key("kerberos_principal")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Kerberos principal");
+
+ public static final Option<String> KERBEROS_KEYTAB_PATH = Options.key("kerberos_keytab_path")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("Kerberos keytab file path");
+
public static final Option<Long> SKIP_HEADER_ROW_NUMBER = Options.key("skip_header_row_number")
.longType()
.defaultValue(0L)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
index baeb7a3ad..92b2db282 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/HadoopConf.java
@@ -32,6 +32,8 @@ public class HadoopConf implements Serializable {
protected Map<String, String> extraOptions = new HashMap<>();
protected String hdfsNameKey;
protected String hdfsSitePath;
+ protected String kerberosPrincipal;
+ protected String kerberosKeytabPath;
public HadoopConf(String hdfsNameKey) {
this.hdfsNameKey = hdfsNameKey;
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 50c1f365e..41bd8ab08 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -43,6 +43,7 @@ import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,6 +84,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
protected int partId = 0;
protected int batchSize;
protected int currentBatchSize = 0;
+ protected boolean isKerberosAuthorization = false;
public AbstractWriteStrategy(FileSinkConfig fileSinkConfig) {
this.fileSinkConfig = fileSinkConfig;
@@ -129,6 +131,27 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl());
this.hadoopConf.setExtraOptionsForConfiguration(configuration);
+ String principal = hadoopConf.getKerberosPrincipal();
+ String keytabPath = hadoopConf.getKerberosKeytabPath();
+ if (!isKerberosAuthorization && StringUtils.isNotBlank(principal)) {
+ // kerberos authentication and only once
+ if (StringUtils.isBlank(keytabPath)) {
+ throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED,
+ "Kerberos keytab path is blank, please check this parameter that in your config file");
+ }
+ configuration.set("hadoop.security.authentication", "kerberos");
+ UserGroupInformation.setConfiguration(configuration);
+ try {
+ log.info("Start Kerberos authentication using principal {} and keytab {}", principal, keytabPath);
+ UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
+ log.info("Kerberos authentication successful");
+ } catch (IOException e) {
+ String errorMsg = String.format("Kerberos authentication failed using this " +
+ "principal [%s] and keytab path [%s]", principal, keytabPath);
+ throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, e);
+ }
+ isKerberosAuthorization = true;
+ }
return configuration;
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 4c7de8a54..5c5479478 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -25,17 +25,21 @@ import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
import java.math.BigDecimal;
@@ -68,6 +72,7 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
protected List<String> fileNames = new ArrayList<>();
protected boolean isMergePartition = true;
protected long skipHeaderNumber = BaseSourceConfig.SKIP_HEADER_ROW_NUMBER.defaultValue();
+ protected boolean isKerberosAuthorization = false;
@Override
public void init(HadoopConf conf) {
@@ -90,6 +95,27 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
configuration.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, hadoopConf.getHdfsNameKey());
configuration.set(String.format("fs.%s.impl", hadoopConf.getSchema()), hadoopConf.getFsHdfsImpl());
hadoopConf.setExtraOptionsForConfiguration(configuration);
+ String principal = hadoopConf.getKerberosPrincipal();
+ String keytabPath = hadoopConf.getKerberosKeytabPath();
+ if (!isKerberosAuthorization && StringUtils.isNotBlank(principal)) {
+ // kerberos authentication and only once
+ if (StringUtils.isBlank(keytabPath)) {
+ throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED,
+ "Kerberos keytab path is blank, please check this parameter that in your config file");
+ }
+ configuration.set("hadoop.security.authentication", "kerberos");
+ UserGroupInformation.setConfiguration(configuration);
+ try {
+ log.info("Start Kerberos authentication using principal {} and keytab {}", principal, keytabPath);
+ UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
+ log.info("Kerberos authentication successful");
+ } catch (IOException e) {
+ String errorMsg = String.format("Kerberos authentication failed using this " +
+ "principal [%s] and keytab path [%s]", principal, keytabPath);
+ throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, e);
+ }
+ isKerberosAuthorization = true;
+ }
return configuration;
}
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
index 0d8194488..e7a522591 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java
@@ -17,6 +17,9 @@
package org.apache.seatunnel.connectors.seatunnel.hive.utils;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.hive.config.HiveConfig;
import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorException;
@@ -24,26 +27,56 @@ import org.apache.seatunnel.connectors.seatunnel.hive.exception.HiveConnectorExc
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
+import java.io.IOException;
import java.util.List;
import java.util.Objects;
+@Slf4j
public class HiveMetaStoreProxy {
private final HiveMetaStoreClient hiveMetaStoreClient;
private static volatile HiveMetaStoreProxy INSTANCE = null;
- private HiveMetaStoreProxy(@NonNull String uris) {
+ private HiveMetaStoreProxy(Config config) {
+ String metastoreUri = config.getString(HiveConfig.METASTORE_URI.key());
HiveConf hiveConf = new HiveConf();
- hiveConf.set("hive.metastore.uris", uris);
+ hiveConf.set("hive.metastore.uris", metastoreUri);
+ if (config.hasPath(BaseSourceConfig.KERBEROS_PRINCIPAL.key()) &&
+ config.hasPath(BaseSourceConfig.KERBEROS_KEYTAB_PATH.key())) {
+ String principal = config.getString(BaseSourceConfig.KERBEROS_PRINCIPAL.key());
+ String keytabPath = config.getString(BaseSourceConfig.KERBEROS_KEYTAB_PATH.key());
+ if (StringUtils.isBlank(principal) || StringUtils.isBlank(keytabPath)) {
+ String errorMsg = String.format("Kerberos principal [%s] or keytab file path [%s] is blank," +
+ "please check", principal, keytabPath);
+ throw new HiveConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg);
+ }
+ Configuration configuration = new Configuration();
+ configuration.set("hadoop.security.authentication", "kerberos");
+ UserGroupInformation.setConfiguration(configuration);
+ try {
+ log.info("Start Kerberos authentication using principal {} and keytab {}", principal, keytabPath);
+ UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
+ log.info("Kerberos authentication successful");
+ } catch (IOException e) {
+ String errorMsg = String.format("Kerberos authentication failed using this " +
+ "principal [%s] and keytab path [%s]", principal, keytabPath);
+ throw new FileConnectorException(CommonErrorCode.KERBEROS_AUTHORIZED_FAILED, errorMsg, e);
+ }
+ }
try {
hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
} catch (MetaException e) {
- String errorMsg = String.format("Using this hive uris [%s] to initialize hive metastore client instance failed", uris);
+ String errorMsg = String.format("Using this hive uris [%s] to initialize " +
+ "hive metastore client instance failed", metastoreUri);
throw new HiveConnectorException(HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e);
}
}
@@ -52,8 +85,7 @@ public class HiveMetaStoreProxy {
if (INSTANCE == null) {
synchronized (HiveMetaStoreProxy.class) {
if (INSTANCE == null) {
- String metastoreUri = config.getString(HiveConfig.METASTORE_URI.key());
- INSTANCE = new HiveMetaStoreProxy(metastoreUri);
+ INSTANCE = new HiveMetaStoreProxy(config);
}
}
}