You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wa...@apache.org on 2024/04/07 14:49:13 UTC

(seatunnel) branch dev updated: [Improve][Zeta][storage] update hdfs configuration, support more parameters (#6547)

This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 595cdd1ce9 [Improve][Zeta][storage] update hdfs configuration, support more parameters (#6547)
595cdd1ce9 is described below

commit 595cdd1ce9bb770b6117b3436646124a13b26752
Author: Jarvis <li...@163.com>
AuthorDate: Sun Apr 7 22:49:08 2024 +0800

    [Improve][Zeta][storage] update hdfs configuration, support more parameters (#6547)
---
 docs/en/seatunnel-engine/checkpoint-storage.md     |  2 +
 docs/en/seatunnel-engine/deployment.md             |  2 +
 .../storage/hdfs/common/HdfsConfiguration.java     |  9 ++-
 .../engine/imap/storage/file/IMapFileStorage.java  |  8 ++-
 .../storage/file/config/AbstractConfiguration.java |  8 +--
 .../storage/file/config/HdfsConfiguration.java     | 84 ++++++++++++++++++++--
 .../imap/storage/file/config/OssConfiguration.java |  4 +-
 .../imap/storage/file/config/S3Configuration.java  |  6 +-
 8 files changed, 107 insertions(+), 16 deletions(-)

diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md b/docs/en/seatunnel-engine/checkpoint-storage.md
index 332a3951bf..a04a84197d 100644
--- a/docs/en/seatunnel-engine/checkpoint-storage.md
+++ b/docs/en/seatunnel-engine/checkpoint-storage.md
@@ -144,6 +144,8 @@ seatunnel:
           // if you used kerberos, you can config like this:
           kerberosPrincipal: your-kerberos-principal
           kerberosKeytabFilePath: your-kerberos-keytab
+          // if you need hdfs-site config, you can config like this:
+          hdfs_site_path: /path/to/your/hdfs_site_path
 ```
 
 if HDFS is in HA mode , you can config like this:
diff --git a/docs/en/seatunnel-engine/deployment.md b/docs/en/seatunnel-engine/deployment.md
index 6a39f347d3..412add286a 100644
--- a/docs/en/seatunnel-engine/deployment.md
+++ b/docs/en/seatunnel-engine/deployment.md
@@ -197,6 +197,8 @@ map:
            clusterName: seatunnel-cluster
            storage.type: hdfs
            fs.defaultFS: hdfs://localhost:9000
+           // if you need hdfs-site config, you can config like this:
+           hdfs_site_path: /path/to/your/hdfs_site_path
 ```
 
 If there is no HDFS and your cluster only have one node, you can config to use local file like this:
diff --git a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
index 953da3027b..2da4c6ad5f 100644
--- a/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-storage/checkpoint-storage-plugins/checkpoint-storage-hdfs/src/main/java/org/apache/seatunnel/engine/checkpoint/storage/hdfs/common/HdfsConfiguration.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorag
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
 
 import java.io.IOException;
@@ -44,11 +45,13 @@ public class HdfsConfiguration extends AbstractConfiguration {
 
     private static final String KERBEROS_KEY = "kerberos";
 
-    /** ********* Hdfs constants ************* */
+    /** ******** Hdfs constants ************* */
     private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
 
     private static final String HDFS_IMPL_KEY = "fs.hdfs.impl";
 
+    private static final String HDFS_SITE_PATH = "hdfs_site_path";
+
     private static final String SEATUNNEL_HADOOP_PREFIX = "seatunnel.hadoop.";
 
     @Override
@@ -71,6 +74,9 @@ public class HdfsConfiguration extends AbstractConfiguration {
                 authenticateKerberos(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf);
             }
         }
+        if (config.containsKey(HDFS_SITE_PATH)) {
+            hadoopConf.addResource(new Path(config.get(HDFS_SITE_PATH)));
+        }
         //  support other hdfs optional config keys
         config.entrySet().stream()
                 .filter(entry -> entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX))
@@ -80,6 +86,7 @@ public class HdfsConfiguration extends AbstractConfiguration {
                             String value = entry.getValue();
                             hadoopConf.set(key, value);
                         });
+
         return hadoopConf;
     }
 
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java
index 915981e476..ab125ba5c8 100644
--- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java
@@ -49,6 +49,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.DEFAULT_IMAP_FILE_PATH_SPLIT;
 import static org.apache.seatunnel.engine.imap.storage.file.common.FileConstants.DEFAULT_IMAP_NAMESPACE;
@@ -125,8 +126,13 @@ public class IMapFileStorage implements IMapStorage {
         this.fileConfiguration = FileConfiguration.valueOf(storageType.toUpperCase());
         // build configuration
         AbstractConfiguration fileConfiguration = this.fileConfiguration.getConfiguration();
+        Map<String, String> stringMap =
+                configuration.entrySet().stream()
+                        .collect(
+                                Collectors.toMap(
+                                        Map.Entry::getKey, entry -> entry.getValue().toString()));
 
-        Configuration hadoopConf = fileConfiguration.buildConfiguration(configuration);
+        Configuration hadoopConf = fileConfiguration.buildConfiguration(stringMap);
         this.conf = hadoopConf;
         this.namespace = (String) configuration.getOrDefault(NAMESPACE_KEY, DEFAULT_IMAP_NAMESPACE);
         this.businessName = (String) configuration.get(BUSINESS_KEY);
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java
index 8d59ac11b8..8198479ae8 100644
--- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/AbstractConfiguration.java
@@ -46,7 +46,7 @@ public abstract class AbstractConfiguration {
      * @param config configuration
      * @param keys keys
      */
-    void checkConfiguration(Map<String, Object> config, String... keys) {
+    void checkConfiguration(Map<String, String> config, String... keys) {
         for (String key : keys) {
             if (!config.containsKey(key) || null == config.get(key)) {
                 throw new IllegalArgumentException(key + " is required");
@@ -54,7 +54,7 @@ public abstract class AbstractConfiguration {
         }
     }
 
-    public abstract Configuration buildConfiguration(Map<String, Object> config)
+    public abstract Configuration buildConfiguration(Map<String, String> config)
             throws IMapStorageException;
 
     /**
@@ -65,11 +65,11 @@ public abstract class AbstractConfiguration {
      * @param prefix
      */
     void setExtraConfiguration(
-            Configuration hadoopConf, Map<String, Object> config, String prefix) {
+            Configuration hadoopConf, Map<String, String> config, String prefix) {
         config.forEach(
                 (k, v) -> {
                     if (config.containsKey(BLOCK_SIZE)) {
-                        setBlockSize(Long.parseLong(config.get(BLOCK_SIZE).toString()));
+                        setBlockSize(Long.parseLong(config.get(BLOCK_SIZE)));
                     }
                     if (k.startsWith(prefix)) {
                         hadoopConf.set(k, String.valueOf(v));
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java
index 2f98dfa0b4..10592b3da8 100644
--- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/HdfsConfiguration.java
@@ -20,21 +20,95 @@
 
 package org.apache.seatunnel.engine.imap.storage.file.config;
 
+import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 
+import java.io.IOException;
 import java.util.Map;
 
-import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
 import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
 
 public class HdfsConfiguration extends AbstractConfiguration {
 
+    /** hdfs uri is required */
+    private static final String HDFS_DEF_FS_NAME = "fs.defaultFS";
+    /** hdfs kerberos principal( is optional) */
+    private static final String KERBEROS_PRINCIPAL = "kerberosPrincipal";
+
+    private static final String KERBEROS_KEYTAB_FILE_PATH = "kerberosKeytabFilePath";
+    private static final String HADOOP_SECURITY_AUTHENTICATION_KEY =
+            "hadoop.security.authentication";
+
+    private static final String KERBEROS_KEY = "kerberos";
+
+    /** ******** Hdfs constants ************* */
+    private static final String HDFS_IMPL = "org.apache.hadoop.hdfs.DistributedFileSystem";
+
+    private static final String HDFS_IMPL_KEY = "fs.hdfs.impl";
+
+    private static final String HDFS_SITE_PATH = "hdfs_site_path";
+
+    private static final String SEATUNNEL_HADOOP_PREFIX = "seatunnel.hadoop.";
+
     @Override
-    public Configuration buildConfiguration(Map<String, Object> config) {
+    public Configuration buildConfiguration(Map<String, String> config) {
         Configuration hadoopConf = new Configuration();
-        hadoopConf.set(
-                FS_DEFAULT_NAME_KEY,
-                String.valueOf(config.getOrDefault(FS_DEFAULT_NAME_KEY, FS_DEFAULT_NAME_DEFAULT)));
+        if (config.containsKey(HDFS_DEF_FS_NAME)) {
+            hadoopConf.set(HDFS_DEF_FS_NAME, config.get(HDFS_DEF_FS_NAME));
+        }
+        hadoopConf.set(HDFS_IMPL_KEY, HDFS_IMPL);
+        hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(FS_DEFAULT_NAME_KEY));
+        if (config.containsKey(KERBEROS_PRINCIPAL)
+                && config.containsKey(KERBEROS_KEYTAB_FILE_PATH)) {
+            String kerberosPrincipal = config.get(KERBEROS_PRINCIPAL);
+            String kerberosKeytabFilePath = config.get(KERBEROS_KEYTAB_FILE_PATH);
+            if (StringUtils.isNotBlank(kerberosPrincipal)
+                    && StringUtils.isNotBlank(kerberosKeytabFilePath)) {
+                hadoopConf.set(HADOOP_SECURITY_AUTHENTICATION_KEY, KERBEROS_KEY);
+                authenticateKerberos(kerberosPrincipal, kerberosKeytabFilePath, hadoopConf);
+            }
+        }
+        if (config.containsKey(HDFS_SITE_PATH)) {
+            hadoopConf.addResource(new Path(config.get(HDFS_SITE_PATH)));
+        }
+        //  support other hdfs optional config keys
+        config.entrySet().stream()
+                .filter(entry -> entry.getKey().startsWith(SEATUNNEL_HADOOP_PREFIX))
+                .forEach(
+                        entry -> {
+                            String key = entry.getKey().replace(SEATUNNEL_HADOOP_PREFIX, "");
+                            String value = entry.getValue();
+                            hadoopConf.set(key, value);
+                        });
+
         return hadoopConf;
     }
+
+    /**
+     * Authenticate kerberos
+     *
+     * @param kerberosPrincipal kerberos principal
+     * @param kerberosKeytabFilePath kerberos keytab file path
+     * @param hdfsConf hdfs configuration
+     * @throws IMapStorageException authentication exception
+     */
+    private void authenticateKerberos(
+            String kerberosPrincipal, String kerberosKeytabFilePath, Configuration hdfsConf)
+            throws IMapStorageException {
+        UserGroupInformation.setConfiguration(hdfsConf);
+        try {
+            UserGroupInformation.loginUserFromKeytab(kerberosPrincipal, kerberosKeytabFilePath);
+        } catch (IOException e) {
+            throw new IMapStorageException(
+                    "Failed to login user from keytab : "
+                            + kerberosKeytabFilePath
+                            + " and kerberos principal : "
+                            + kerberosPrincipal,
+                    e);
+        }
+    }
 }
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java
index d36aa34145..71f31063be 100644
--- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/OssConfiguration.java
@@ -36,11 +36,11 @@ public class OssConfiguration extends AbstractConfiguration {
     private static final String OSS_KEY = "fs.oss.";
 
     @Override
-    public Configuration buildConfiguration(Map<String, Object> config)
+    public Configuration buildConfiguration(Map<String, String> config)
             throws IMapStorageException {
         checkConfiguration(config, OSS_BUCKET_KEY);
         Configuration hadoopConf = new Configuration();
-        hadoopConf.set(FS_DEFAULT_NAME_KEY, String.valueOf(config.get(OSS_BUCKET_KEY)));
+        hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(OSS_BUCKET_KEY));
         hadoopConf.set(OSS_IMPL_KEY, HDFS_OSS_IMPL);
         setExtraConfiguration(hadoopConf, config, OSS_KEY);
         return hadoopConf;
diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java
index 5d34f7814b..872120a5ba 100644
--- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java
+++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/config/S3Configuration.java
@@ -39,16 +39,16 @@ public class S3Configuration extends AbstractConfiguration {
     private static final String FS_KEY = "fs.";
 
     @Override
-    public Configuration buildConfiguration(Map<String, Object> config)
+    public Configuration buildConfiguration(Map<String, String> config)
             throws IMapStorageException {
         checkConfiguration(config, S3_BUCKET_KEY);
         String protocol = DEFAULT_PROTOCOL;
-        if (config.get(S3_BUCKET_KEY).toString().startsWith(S3A_PROTOCOL)) {
+        if (config.get(S3_BUCKET_KEY).startsWith(S3A_PROTOCOL)) {
             protocol = S3A_PROTOCOL;
         }
         String fsImpl = protocol.equals(S3A_PROTOCOL) ? HDFS_S3A_IMPL : HDFS_S3N_IMPL;
         Configuration hadoopConf = new Configuration();
-        hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY).toString());
+        hadoopConf.set(FS_DEFAULT_NAME_KEY, config.get(S3_BUCKET_KEY));
         hadoopConf.set(formatKey(protocol, HDFS_IMPL_KEY), fsImpl);
         setExtraConfiguration(hadoopConf, config, FS_KEY + protocol + SPLIT_CHAR);
         return hadoopConf;