You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2024/04/07 14:49:13 UTC
(seatunnel) branch dev updated: [Improve][Zeta][storage] update hdfs configuration, support more parameters (#6547)
This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 595cdd1ce9 [Improve][Zeta][storage] update hdfs configuration, support more parameters (#6547)
595cdd1ce9 is described below
commit 595cdd1ce9bb770b6117b3436646124a13b26752
Author: Jarvis <li...@163.com>
AuthorDate: Sun Apr 7 22:49:08 2024 +0800
[Improve][Zeta][storage] update hdfs configuration, support more parameters (#6547)
---
docs/en/seatunnel-engine/checkpoint-storage.md | 2 +
docs/en/seatunnel-engine/deployment.md | 2 +
.../storage/hdfs/common/HdfsConfiguration.java | 9 ++-
.../engine/imap/storage/file/IMapFileStorage.java | 8 ++-
.../storage/file/config/AbstractConfiguration.java | 8 +--
.../storage/file/config/HdfsConfiguration.java | 84 ++++++++++++++++++++--
.../imap/storage/file/config/OssConfiguration.java | 4 +-
.../imap/storage/file/config/S3Configuration.java | 6 +-
8 files changed, 107 insertions(+), 16 deletions(-)
diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md b/docs/en/seatunnel-engine/checkpoint-storage.md
index 332a3951bf..a04a84197d 100644
--- a/docs/en/seatunnel-engine/checkpoint-storage.md
+++ b/docs/en/seatunnel-engine/checkpoint-storage.md
@@ -144,6 +144,8 @@ seatunnel:
// if you used kerberos, you can config like this:
kerberosPrincipal: your-kerberos-principal
kerberosKeytabFilePath: your-kerberos-keytab
+ // if you need hdfs-site config, you can config like this:
+ hdfs_site_path: /path/to/your/hdfs_site_path
```
if HDFS is in HA mode , you can config like this:
diff --git a/docs/en/seatunnel-engine/deployment.md b/docs/en/seatunnel-engine/deployment.md
index 6a39f347d3..412add286a 100644
--- a/docs/en/seatunnel-engine/deployment.md
+++ b/docs/en/seatunnel-engine/deployment.md
@@ -197,6 +197,8 @@ map:
clusterName: seatunnel-cluster
storage.type: hdfs
fs.defaultFS: hdfs://localhost:9000
+ // if you need hdfs-site config, you can config like this:
+ hdfs_site_path: /path/to/your/hdfs_site_path
```
If there is no HDFS and your cluster only have one node, you can config to use local file like this:
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
index 953da3027b..2da4c6ad5f 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorag
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import java.io.IOException;
@@ -44,11 +45,13 @@ public class HdfsConfiguration extends AbstractConfiguration {
private static final String KERBEROS_KEY = "kerberos";
- /** ********* Hdfs constants ************* */
+ /** ******** Hdfs constants ************* */
private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
private static final String HDFS_IMPL_KEY = "fs.hdfs.impl";
+ private static final String HDFS_SITE_PATH = "hdfs_site_path";
+
private static final String SEATUNNEL_HADOOP_PREFIX = "seatunnel.hadoop.";
@Override
@@ -71,6 +74,9 @@ public class HdfsConfiguration extends AbstractConfiguration {
authenticateKerberos(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf);
}
}
+ if (config.containsKey(HDFS_SITE_PATH)) {
+ hadoopConf.addResource(new Path(config.get(HDFS_SITE_PATH)));
+ }
// support other hdfs optional config keys
config.entrySet().stream()
.filter(entry -> entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX))
@@ -80,6 +86,7 @@ public class HdfsConfiguration extends AbstractConfiguration {
String value = entry.getValue();
hadoopConf.set(key, value);
});
+
return hadoopConf;
}
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java
index 915981e476..ab125ba5c8 100644
--- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java
@@ -49,6 +49,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.DEFAULT_IMAP_FILE_PATH_SPLIT;
import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.DEFAULT_IMAP_NAMESPACE;
@@ -125,8 +126,13 @@ public class IMapFileStorage implements IMapStorage {
this.fileConfiguration = FileConfiguration.valueOf(storageType.toUpperCase());
// build configuration
AbstractConfiguration fileConfiguration = this.fileConfiguration.getConfiguration();
+ Map<String, String> stringMap =
+ configuration.entrySet().stream()
+ .collect(
+ Collectors.toMap(
+ Map.Entry::getKey, entry -> entry.getValue().toString()));
- Configuration hadoopConf = fileConfiguration.buildConfiguration(configuration);
+ Configuration hadoopConf = fileConfiguration.buildConfiguration(stringMap);
this.conf = hadoopConf;
this.namespace = (String) configuration.getOrDefault(NAMESPACE_KEY, DEFAULT_IMAP_NAMESPACE);
this.businessName = (String) configuration.get(BUSINESS_KEY);
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java
index 8d59ac11b8..8198479ae8 100644
--- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java
@@ -46,7 +46,7 @@ public abstract class AbstractConfiguration {
* @param config configuration
* @param keys keys
*/
- void checkConfiguration(Map<String, Object> config, String... keys) {
+ void checkConfiguration(Map<String, String> config, String... keys) {
for (String key : keys) {
if (!config.containsKey(key) || null == config.get(key)) {
throw new IllegalArgumentException(key + " is required");
@@ -54,7 +54,7 @@ public abstract class AbstractConfiguration {
}
}
- public abstract Configuration buildConfiguration(Map<String, Object> config)
+ public abstract Configuration buildConfiguration(Map<String, String> config)
throws IMapStorageException;
/**
@@ -65,11 +65,11 @@ public abstract class AbstractConfiguration {
* @param prefix
*/
void setExtraConfiguration(
- Configuration hadoopConf, Map<String, Object> config, String prefix) {
+ Configuration hadoopConf, Map<String, String> config, String prefix) {
config.forEach(
(k, v) -> {
if (config.containsKey(BLOCK_SIZE)) {
- setBlockSize(Long.parseLong(config.get(BLOCK_SIZE).toString()));
+ setBlockSize(Long.parseLong(config.get(BLOCK_SIZE)));
}
if (k.startsWith(prefix)) {
hadoopConf.set(k, String.valueOf(v));
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java
index 2f98dfa0b4..10592b3da8 100644
--- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java
@@ -20,21 +20,95 @@
package org.apache.seatunnel.engine.imap.storage.file.config;
+import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import java.io.IOException;
import java.util.Map;
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
public class HdfsConfiguration extends AbstractConfiguration {
+ /** hdfs uri is required */
+ private static final String HDFS_DEF_FS_NAME = "fs.defaultFS";
+ /** hdfs kerberos principal( is optional) */
+ private static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
+
+ private static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath";
+ private static final String HADOOP_SECURITY_AUTHENTICATION_KEY =
+ "hadoop.security.authentication";
+
+ private static final String KERBEROS_KEY = "kerberos";
+
+ /** ******** Hdfs constants ************* */
+ private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
+
+ private static final String HDFS_IMPL_KEY = "fs.hdfs.impl";
+
+ private static final String HDFS_SITE_PATH = "hdfs_site_path";
+
+ private static final String SEATUNNEL_HADOOP_PREFIX = "seatunnel.hadoop.";
+
@Override
- public Configuration buildConfiguration(Map<String, Object> config) {
+ public Configuration buildConfiguration(Map<String, String> config) {
Configuration hadoopConf = new Configuration();
- hadoopConf.set(
- FS_DEFAULT_NAME_KEY,
- String.valueOf(config.getOrDefault(FS_DEFAULT_NAME_KEY, FS_DEFAULT_NAME_DEFAULT)));
+ if (config.containsKey(HDFS_DEF_FS_NAME)) {
+ hadoopConf.set(HDFS_DEF_FS_NAME, config.get(HDFS_DEF_FS_NAME));
+ }
+ hadoopConf.set(HDFS_IMPL_KEY, HDFS_IMPL);
+ hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(FS_DEFAULT_NAME_KEY));
+ if (config.containsKey(KERBEROS_PRINCIPAL)
+ && config.containsKey(KERBEROS_KEYTAB_FILE_PATH)) {
+ String kerberosPrincipal = config.get(KERBEROS_PRINCIPAL);
+ String kerberosKeytabFilePath = config.get(KERBEROS_KEYTAB_FILE_PATH);
+ if (StringUtils.isNotBlank(kerberosPrincipal)
+ && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
+ hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, KERBEROS_KEY);
+ authenticateKerberos(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf);
+ }
+ }
+ if (config.containsKey(HDFS_SITE_PATH)) {
+ hadoopConf.addResource(new Path(config.get(HDFS_SITE_PATH)));
+ }
+ // support other hdfs optional config keys
+ config.entrySet().stream()
+ .filter(entry -> entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX))
+ .forEach(
+ entry -> {
+ String key = entry.getKey().replace(SEATUNNEL_HADOOP_PREFIX, "");
+ String value = entry.getValue();
+ hadoopConf.set(key, value);
+ });
+
return hadoopConf;
}
+
+ /**
+ * Authenticate kerberos
+ *
+ * @param kerberosPrincipal kerberos principal
+ * @param kerberosKeytabFilePath kerberos keytab file path
+ * @param hdfsConf hdfs configuration
+ * @throws IMapStorageException authentication exception
+ */
+ private void authenticateKerberos(
+ String kerberosPrincipal, String kerberosKeytabFilePath, Configuration hdfsConf)
+ throws IMapStorageException {
+ UserGroupInformation.setConfiguration(hdfsConf);
+ try {
+ UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
+ } catch (IOException e) {
+ throw new IMapStorageException(
+ "Failed to login user from keytab : "
+ + kerberosKeytabFilePath
+ + " and kerberos principal : "
+ + kerberosPrincipal,
+ e);
+ }
+ }
}
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java
index d36aa34145..71f31063be 100644
--- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java
@@ -36,11 +36,11 @@ public class OssConfiguration extends AbstractConfiguration {
private static final String OSS_KEY = "fs.oss.";
@Override
- public Configuration buildConfiguration(Map<String, Object> config)
+ public Configuration buildConfiguration(Map<String, String> config)
throws IMapStorageException {
checkConfiguration(config, OSS_BUCKET_KEY);
Configuration hadoopConf = new Configuration();
- hadoopConf.set(FS_DEFAULT_NAME_KEY, String.valueOf(config.get(OSS_BUCKET_KEY)));
+ hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(OSS_BUCKET_KEY));
hadoopConf.set(OSS_IMPL_KEY, HDFS_OSS_IMPL);
setExtraConfiguration(hadoopConf, config, OSS_KEY);
return hadoopConf;
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java
index 5d34f7814b..872120a5ba 100644
--- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java
@@ -39,16 +39,16 @@ public class S3Configuration extends AbstractConfiguration {
private static final String FS_KEY = "fs.";
@Override
- public Configuration buildConfiguration(Map<String, Object> config)
+ public Configuration buildConfiguration(Map<String, String> config)
throws IMapStorageException {
checkConfiguration(config, S3_BUCKET_KEY);
String protocol = DEFAULT_PROTOCOL;
- if (config.get(S3_BUCKET_KEY).toString().startsWith(S3A_PROTOCOL)) {
+ if (config.get(S3_BUCKET_KEY).startsWith(S3A_PROTOCOL)) {
protocol = S3A_PROTOCOL;
}
String fsImpl = protocol.equals(S3A_PROTOCOL) ? HDFS_S3A_IMPL : HDFS_S3N_IMPL;
Configuration hadoopConf = new Configuration();
- hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY).toString());
+ hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY));
hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl);
setExtraConfiguration(hadoopConf, config, FS_KEY + protocol + SPLIT_CHAR);
return hadoopConf;