You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/03/09 10:34:48 UTC

[incubator-seatunnel] branch dev updated: [Feature][Core] Support config encryption (#4134)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 859a4bc20 [Feature][Core] Support config encryption (#4134)
859a4bc20 is described below

commit 859a4bc20ec01736e3154c5d898870a8bb0579e4
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Thu Mar 9 18:34:42 2023 +0800

    [Feature][Core] Support config encryption (#4134)
    
    * [Feature][Core] Support config shade options
    
    * [Improve][Core] Support config shade
    
    * [Feature][Core] Fix unit test in windows
    
    * [Improve][Core] Fix unit text
    
    * [Feature][Core] Support user-defined encrypt options
    
    * [Feature][Core] Add logic in ConfigBuilder
    
    * [Feature][Core] Add e2e test case
    
    * [Feature][Core] Fix code style
    
    * [Feature][Core] Fix ci
    
    * [Feature][Core] Fix code style
    
    * [Feature][Core] Fix integration test
    
    * [Feature][Core] Fix integration test
    
    * [Feature][Core] First commit docs
    
    * [Feature][Core] Update code
    
    * [Feature][Core] Add command for support config encryption and decryption
    
    * [Feature][Core] Fix docs
    
    * [Feature][Core] Optimize docs
    
    * [Feature][Core] Fix dead link
    
    * [Feature][Core] Fix dependencies
---
 docs/en/concept/JobEnvConfig.md                    |   6 +
 .../connector-v2/Config-Encryption-Decryption.md   | 180 ++++++++++++++++++
 docs/sidebars.js                                   |   1 +
 pom.xml                                            |   3 +-
 .../seatunnel/api/configuration/ConfigShade.java   |  43 +++--
 .../org/apache/seatunnel/common/Constants.java     |   2 +
 seatunnel-config/pom.xml                           |   4 +-
 seatunnel-config/seatunnel-config-base/pom.xml     |  16 +-
 seatunnel-config/seatunnel-config-shade/pom.xml    |   4 +-
 .../com/typesafe/config/ConfigParseOptions.java    | 115 ++++++------
 .../com/typesafe/config/impl/ConfigNodePath.java   |  17 +-
 .../com/typesafe/config/impl/ConfigParser.java     | 166 +++++++++++------
 .../shade/com/typesafe/config/impl/Path.java       |  22 +--
 .../shade/com/typesafe/config/impl/PathParser.java |  76 +++++---
 .../com/typesafe/config/impl/PropertiesParser.java | 207 +++++++++++++++++++++
 .../typesafe/config/impl/SimpleConfigObject.java   | 157 ++++++++++++----
 .../org/apache/seatunnel/config/CompleteTest.java  |  17 +-
 .../apache/seatunnel/config/ConfigFactoryTest.java |  32 ++--
 .../apache/seatunnel/config/JsonFormatTest.java    |  24 +--
 .../apache/seatunnel/config/utils/FileUtils.java   |   4 +-
 .../core/starter/command/AbstractCommandArgs.java  |  12 ++
 .../core/starter/command/ConfDecryptCommand.java   |  63 +++++++
 .../core/starter/command/ConfEncryptCommand.java   |  67 +++++++
 .../core/starter/utils/ConfigAdapterUtils.java     |   8 +-
 .../core/starter/utils/ConfigBuilder.java          |  19 +-
 .../core/starter/utils/ConfigShadeUtils.java       | 185 ++++++++++++++++++
 ....apache.seatunnel.api.configuration.ConfigShade |  36 +---
 .../starter/command/ConfDecryptCommandTest.java    |  60 ++++++
 .../starter/command/ConfEncryptCommandTest.java    |  60 ++++++
 .../core/starter/utils/ConfigShadeTest.java        | 105 +++++++++++
 ....apache.seatunnel.api.configuration.ConfigShade |  36 +---
 .../src/test/resources/config.shade.conf           |  61 ++++++
 .../src/test/resources/log4j2.properties           |  42 +++++
 .../src/test/resources/origin.conf                 |  61 ++++++
 .../src/test/resources/shade.conf                  |  61 ++++++
 .../seatunnel/core/starter/flink/FlinkStarter.java |   8 +
 .../core/starter/flink/args/FlinkCommandArgs.java  |  11 +-
 .../seatunnel/core/starter/flink/FlinkStarter.java |   8 +
 .../core/starter/flink/args/FlinkCommandArgs.java  |  11 +-
 .../seatunnel/core/starter/spark/SparkStarter.java |   6 +
 .../core/starter/spark/args/SparkCommandArgs.java  |  11 +-
 .../seatunnel/core/starter/spark/SparkStarter.java |   6 +
 .../core/starter/spark/args/SparkCommandArgs.java  |  11 +-
 .../starter/seatunnel/args/ClientCommandArgs.java  |  11 +-
 .../src/test/resources/redis-to-redis.conf         |   5 +-
 tools/dependencies/known-dependencies.txt          |   2 -
 46 files changed, 1695 insertions(+), 367 deletions(-)

diff --git a/docs/en/concept/JobEnvConfig.md b/docs/en/concept/JobEnvConfig.md
index 6d56813ac..7272c90fc 100644
--- a/docs/en/concept/JobEnvConfig.md
+++ b/docs/en/concept/JobEnvConfig.md
@@ -21,3 +21,9 @@ Gets the interval in which checkpoints are periodically scheduled.
 ## parallelism
 
 This parameter configures the parallelism of source and sink.
+
+## shade.identifier
+
+Specify the method of encryption, if you didn't have the requirement for encrypting or decrypting config files, this option can be ignored.
+
+For more details, you can refer to the documentation [config-encryption-decryption](../connector-v2/Config-Encryption-Decryption.md)
diff --git a/docs/en/connector-v2/Config-Encryption-Decryption.md b/docs/en/connector-v2/Config-Encryption-Decryption.md
new file mode 100644
index 000000000..570cb9f06
--- /dev/null
+++ b/docs/en/connector-v2/Config-Encryption-Decryption.md
@@ -0,0 +1,180 @@
+# Config  File Encryption And Decryption
+
+## Introduction
+
+In most production environments, sensitive configuration items such as passwords are required to be encrypted and cannot be stored in plain text, SeaTunnel provides a convenient one-stop solution for this.
+
+## How to use
+
+SeaTunnel comes with the function of base64 encryption and decryption, but it is not recommended for production use, it is recommended that users implement custom encryption and decryption logic. You can refer to this chapter [How to implement user-defined encryption and decryption](#How to implement user-defined encryption and decryption) get more details about it.
+
+Base64 encryption support encrypt the following parameters:
+- username
+- password
+- auth
+
+Next, I'll show how to quickly use SeaTunnel's own `base64` encryption:
+
+1. And a new option `shade.identifier` in env block of config file, this option indicate what the encryption method that you want to use, in this example, we should add `shade.identifier = base64` in config as the following shown:
+
+   ```hocon
+   #
+   # Licensed to the Apache Software Foundation (ASF) under one or more
+   # contributor license agreements.  See the NOTICE file distributed with
+   # this work for additional information regarding copyright ownership.
+   # The ASF licenses this file to You under the Apache License, Version 2.0
+   # (the "License"); you may not use this file except in compliance with
+   # the License.  You may obtain a copy of the License at
+   #
+   #     http://www.apache.org/licenses/LICENSE-2.0
+   #
+   # Unless required by applicable law or agreed to in writing, software
+   # distributed under the License is distributed on an "AS IS" BASIS,
+   # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   # See the License for the specific language governing permissions and
+   # limitations under the License.
+   #
+
+   env {
+     execution.parallelism = 1
+     shade.identifier = "base64"
+   }
+
+   source {
+     MySQL-CDC {
+       result_table_name = "fake"
+       parallelism = 1
+       server-id = 5656
+       port = 56725
+       hostname = "127.0.0.1"
+       username = "seatunnel"
+       password = "seatunnel_password"
+       database-name = "inventory_vwyw0n"
+       table-name = "products"
+       base-url = "jdbc:mysql://localhost:56725"
+     }
+   }
+
+   transform {
+   }
+
+   sink {
+     # choose stdout output plugin to output data to console
+     Clickhouse {
+       host = "localhost:8123"
+       database = "default"
+       table = "fake_all"
+       username = "seatunnel"
+       password = "seatunnel_password"
+
+       # cdc options
+       primary_key = "id"
+       support_upsert = true
+     }
+   }
+   ```
+2. Using the shell based on different calculate engine to encrypt config file, in this example we use zeta:
+
+   ```shell
+   ${SEATUNNEL_HOME}/bin/seatunnel.sh --config config/v2.batch.template --encrypt
+   ```
+
+   Then you can see the encrypted configuration file in the terminal:
+
+   ```log
+   2023-02-20 17:50:58,319 INFO  org.apache.seatunnel.core.starter.command.ConfEncryptCommand - Encrypt config: 
+   {
+       "env" : {
+           "execution.parallelism" : 1,
+           "shade.identifier" : "base64"
+       },
+       "source" : [
+           {
+               "base-url" : "jdbc:mysql://localhost:56725",
+               "hostname" : "127.0.0.1",
+               "password" : "c2VhdHVubmVsX3Bhc3N3b3Jk",
+               "port" : 56725,
+               "database-name" : "inventory_vwyw0n",
+               "parallelism" : 1,
+               "result_table_name" : "fake",
+               "table-name" : "products",
+               "plugin_name" : "MySQL-CDC",
+               "server-id" : 5656,
+               "username" : "c2VhdHVubmVs"
+           }
+       ],
+       "transform" : [],
+       "sink" : [
+           {
+               "database" : "default",
+               "password" : "c2VhdHVubmVsX3Bhc3N3b3Jk",
+               "support_upsert" : true,
+               "host" : "localhost:8123",
+               "plugin_name" : "Clickhouse",
+               "primary_key" : "id",
+               "table" : "fake_all",
+               "username" : "c2VhdHVubmVs"
+           }
+       ]
+   }
+   ```
+3. Of course, not only encrypted configuration files are supported, but if the user wants to see the decrypted configuration file, you can execute this command:
+
+   ```shell
+   ${SEATUNNEL_HOME}/bin/seatunnel.sh --config config/v2.batch.template --decrypt
+   ```
+
+## How to implement user-defined encryption and decryption
+
+If you want to customize the encryption method and the configuration of the encryption, this section will help you to solve the problem.
+
+1. Create a java maven project
+
+2. Add `seatunnel-api` module in dependencies like the following shown:
+
+   ```xml
+   <dependency>
+       <groupId>org.apache.seatunnel</groupId>
+       <artifactId>seatunnel-api</artifactId>
+       <version>${seatunnel.version}</version>
+   </dependency>
+   ```
+3. Create a new class and implement interface `ConfigShade`, this interface has the following methods:
+
+   ```java
+   /**
+    * The interface that provides the ability to encrypt and decrypt {@link
+    * org.apache.seatunnel.shade.com.typesafe.config.Config}
+    */
+   public interface ConfigShade {
+
+       /**
+        * The unique identifier of the current interface, used it to select the correct {@link
+        * ConfigShade}
+        */
+       String getIdentifier();
+
+       /**
+        * Encrypt the content
+        *
+        * @param content The content to encrypt
+        */
+       String encrypt(String content);
+
+       /**
+        * Decrypt the content
+        *
+        * @param content The content to decrypt
+        */
+       String decrypt(String content);
+
+       /** To expand the options that user want to encrypt */
+       default String[] sensitiveOptions() {
+           return new String[0];
+       }
+   }
+   ```
+4. Add `org.apache.seatunnel.api.configuration.ConfigShade` in `resources/META-INF/services`
+5. Package it to jar and add jar to `${SEATUNNEL_HOME}/lib`
+6. Change the option `shade.identifier` to the value that you defined in `ConfigShade#getIdentifier`of you config file, please enjoy it \^_\^
+
diff --git a/docs/sidebars.js b/docs/sidebars.js
index 789358a3d..f9e67a4a4 100644
--- a/docs/sidebars.js
+++ b/docs/sidebars.js
@@ -134,6 +134,7 @@ const sidebars = {
                     ]
                 },
                 "connector-v2/Error-Quick-Reference-Manual",
+                "connector-v2/Config-Encryption-Decryption"
             ]
         },
         {
diff --git a/pom.xml b/pom.xml
index 5df9af140..7ba5379c7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -38,6 +38,7 @@
             you need to open this annotation and change the dependency of config-shade to project.
             <module>seatunnel-config</module>
         -->
+        <module>seatunnel-config</module>
         <module>seatunnel-common</module>
         <module>seatunnel-core</module>
         <module>seatunnel-transforms-v2</module>
@@ -284,7 +285,7 @@
             <dependency>
                 <groupId>org.apache.seatunnel</groupId>
                 <artifactId>seatunnel-config-shade</artifactId>
-                <version>${seatunnel.config.shade.version}</version>
+                <version>${project.version}</version>
             </dependency>
             <dependency>
                 <groupId>commons-codec</groupId>
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/utils/FileUtils.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ConfigShade.java
similarity index 51%
copy from seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/utils/FileUtils.java
copy to seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ConfigShade.java
index 3b0e01b79..5532f48e0 100644
--- a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/utils/FileUtils.java
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/configuration/ConfigShade.java
@@ -15,25 +15,36 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.config.utils;
+package org.apache.seatunnel.api.configuration;
 
-import java.io.File;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Paths;
+/**
+ * The interface that provides the ability to encrypt and decrypt {@link
+ * org.apache.seatunnel.shade.com.typesafe.config.Config}
+ */
+public interface ConfigShade {
 
-public final class FileUtils {
+    /**
+     * The unique identifier of the current interface, used it to select the correct {@link
+     * ConfigShade}
+     */
+    String getIdentifier();
 
-    private FileUtils() {
-    }
+    /**
+     * Encrypt the content
+     *
+     * @param content The content to encrypt
+     */
+    String encrypt(String content);
 
-    // get file from classpath, resources folder
-    public static File getFileFromResources(String fileName) throws URISyntaxException {
-        URL resource = FileUtils.class.getResource(fileName);
-        if (resource == null) {
-            throw new IllegalArgumentException("file is not found!");
-        }
-        return Paths.get(resource.toURI()).toFile();
-    }
+    /**
+     * Decrypt the content
+     *
+     * @param content The content to decrypt
+     */
+    String decrypt(String content);
 
+    /** To expand the options that user want to encrypt */
+    default String[] sensitiveOptions() {
+        return new String[0];
+    }
 }
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
index 030cc4dbd..c5e04d5fa 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/Constants.java
@@ -23,6 +23,8 @@ public final class Constants {
 
     public static final String LOGO = "SeaTunnel";
 
+    public static final String ENV = "env";
+
     public static final String SOURCE = "source";
 
     public static final String TRANSFORM = "transform";
diff --git a/seatunnel-config/pom.xml b/seatunnel-config/pom.xml
index 76d492459..ac78020d2 100644
--- a/seatunnel-config/pom.xml
+++ b/seatunnel-config/pom.xml
@@ -19,16 +19,16 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.seatunnel</groupId>
         <artifactId>seatunnel</artifactId>
         <version>${revision}</version>
     </parent>
-    <modelVersion>4.0.0</modelVersion>
     <artifactId>seatunnel-config</artifactId>
     <packaging>pom</packaging>
 
-    <name>SeaTunnel : Config : </name>
+    <name>SeaTunnel : Config :</name>
 
     <modules>
         <module>seatunnel-config-shade</module>
diff --git a/seatunnel-config/seatunnel-config-base/pom.xml b/seatunnel-config/seatunnel-config-base/pom.xml
index 473bb528b..99c958369 100644
--- a/seatunnel-config/seatunnel-config-base/pom.xml
+++ b/seatunnel-config/seatunnel-config-base/pom.xml
@@ -17,8 +17,7 @@
     limitations under the License.
 
 -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
@@ -73,6 +72,7 @@
                                 <exclude>com/typesafe/config/impl/PathParser.class</exclude>
                                 <exclude>com/typesafe/config/impl/Path.class</exclude>
                                 <exclude>com/typesafe/config/impl/SimpleConfigObject.class</exclude>
+                                <exclude>com/typesafe/config/impl/PropertiesParser.class</exclude>
                             </excludes>
                         </filter>
                     </filters>
@@ -83,18 +83,16 @@
                         </relocation>
                     </relocations>
                     <transformers>
-                        <transformer
-                                implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
-                        <transformer
-                                implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
+                        <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer" />
+                        <transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer" />
                     </transformers>
                 </configuration>
                 <executions>
                     <execution>
-                        <phase>package</phase>
                         <goals>
                             <goal>shade</goal>
                         </goals>
+                        <phase>package</phase>
                     </execution>
                 </executions>
             </plugin>
@@ -105,10 +103,10 @@
                 <executions>
                     <execution>
                         <id>compile</id>
-                        <phase>package</phase>
                         <goals>
                             <goal>attach-artifact</goal>
                         </goals>
+                        <phase>package</phase>
                         <configuration>
                             <artifacts>
                                 <artifact>
@@ -124,4 +122,4 @@
 
         </plugins>
     </build>
-</project>
\ No newline at end of file
+</project>
diff --git a/seatunnel-config/seatunnel-config-shade/pom.xml b/seatunnel-config/seatunnel-config-shade/pom.xml
index f0cb380d8..f49dafb36 100644
--- a/seatunnel-config/seatunnel-config-shade/pom.xml
+++ b/seatunnel-config/seatunnel-config-shade/pom.xml
@@ -19,13 +19,13 @@
 -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.seatunnel</groupId>
         <artifactId>seatunnel-config</artifactId>
         <version>${revision}</version>
         <relativePath>../pom.xml</relativePath>
     </parent>
-    <modelVersion>4.0.0</modelVersion>
     <artifactId>seatunnel-config-shade</artifactId>
     <name>SeaTunnel : Config : Shade</name>
 
@@ -59,10 +59,10 @@
                 <executions>
                     <execution>
                         <id>compile</id>
-                        <phase>package</phase>
                         <goals>
                             <goal>attach-artifact</goal>
                         </goals>
+                        <phase>package</phase>
                         <configuration>
                             <artifacts>
                                 <artifact>
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java
index b79496702..f66d4d1c6 100644
--- a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/ConfigParseOptions.java
@@ -7,11 +7,9 @@ package org.apache.seatunnel.shade.com.typesafe.config;
 /**
  * A set of options related to parsing.
  *
- * <p>
- * This object is immutable, so the "setters" return a new object.
+ * <p>This object is immutable, so the "setters" return a new object.
  *
- * <p>
- * Here is an example of creating a custom {@code ConfigParseOptions}:
+ * <p>Here is an example of creating a custom {@code ConfigParseOptions}:
  *
  * <pre>
  *     ConfigParseOptions options = ConfigParseOptions.defaults()
@@ -21,10 +19,7 @@ package org.apache.seatunnel.shade.com.typesafe.config;
  */
 public final class ConfigParseOptions {
 
-    /**
-     * a.b.c
-     * a-&gt;b-&gt;c
-     */
+    /** a.b.c a-&gt;b-&gt;c */
     public static final String PATH_TOKEN_SEPARATOR = "->";
 
     final ConfigSyntax syntax;
@@ -33,8 +28,12 @@ public final class ConfigParseOptions {
     final ConfigIncluder includer;
     final ClassLoader classLoader;
 
-    private ConfigParseOptions(ConfigSyntax syntax, String originDescription, boolean allowMissing,
-                               ConfigIncluder includer, ClassLoader classLoader) {
+    private ConfigParseOptions(
+            ConfigSyntax syntax,
+            String originDescription,
+            boolean allowMissing,
+            ConfigIncluder includer,
+            ClassLoader classLoader) {
         this.syntax = syntax;
         this.originDescription = originDescription;
         this.allowMissing = allowMissing;
@@ -43,9 +42,8 @@ public final class ConfigParseOptions {
     }
 
     /**
-     * Gets an instance of {@code ConfigParseOptions} with all fields
-     * set to the default values. Start with this instance and make any
-     * changes you need.
+     * Gets an instance of {@code ConfigParseOptions} with all fields set to the default values.
+     * Start with this instance and make any changes you need.
      *
      * @return the default parse options
      */
@@ -54,8 +52,8 @@ public final class ConfigParseOptions {
     }
 
     /**
-     * Set the file format. If set to null, try to guess from any available
-     * filename extension; if guessing fails, assume {@link ConfigSyntax#CONF}.
+     * Set the file format. If set to null, try to guess from any available filename extension; if
+     * guessing fails, assume {@link ConfigSyntax#CONF}.
      *
      * @param syntax a syntax or {@code null} for best guess
      * @return options with the syntax set
@@ -64,8 +62,12 @@ public final class ConfigParseOptions {
         if (this.syntax == syntax) {
             return this;
         } else {
-            return new ConfigParseOptions(syntax, this.originDescription, this.allowMissing,
-                this.includer, this.classLoader);
+            return new ConfigParseOptions(
+                    syntax,
+                    this.originDescription,
+                    this.allowMissing,
+                    this.includer,
+                    this.classLoader);
         }
     }
 
@@ -79,11 +81,10 @@ public final class ConfigParseOptions {
     }
 
     /**
-     * Set a description for the thing being parsed. In most cases this will be
-     * set up for you to something like the filename, but if you provide just an
-     * input stream you might want to improve on it. Set to null to allow the
-     * library to come up with something automatically. This description is the
-     * basis for the {@link ConfigOrigin} of the parsed values.
+     * Set a description for the thing being parsed. In most cases this will be set up for you to
+     * something like the filename, but if you provide just an input stream you might want to
+     * improve on it. Set to null to allow the library to come up with something automatically. This
+     * description is the basis for the {@link ConfigOrigin} of the parsed values.
      *
      * @param originDescription description to put in the {@link ConfigOrigin}
      * @return options with the origin description set
@@ -92,12 +93,17 @@ public final class ConfigParseOptions {
         // findbugs complains about == here but is wrong, do not "fix"
         if (this.originDescription == originDescription) {
             return this;
-        } else if (this.originDescription != null && originDescription != null
-            && this.originDescription.equals(originDescription)) {
+        } else if (this.originDescription != null
+                && originDescription != null
+                && this.originDescription.equals(originDescription)) {
             return this;
         } else {
-            return new ConfigParseOptions(this.syntax, originDescription, this.allowMissing,
-                this.includer, this.classLoader);
+            return new ConfigParseOptions(
+                    this.syntax,
+                    originDescription,
+                    this.allowMissing,
+                    this.includer,
+                    this.classLoader);
         }
     }
 
@@ -110,9 +116,7 @@ public final class ConfigParseOptions {
         return originDescription;
     }
 
-    /**
-     * this is package-private, not public API
-     */
+    /** this is package-private, not public API */
     ConfigParseOptions withFallbackOriginDescription(String originDescription) {
         if (this.originDescription == null) {
             return setOriginDescription(originDescription);
@@ -122,10 +126,9 @@ public final class ConfigParseOptions {
     }
 
     /**
-     * Set to false to throw an exception if the item being parsed (for example
-     * a file) is missing. Set to true to just return an empty document in that
-     * case. Note that this setting applies on only to fetching the root document,
-     * it has no effect on any nested includes.
+     * Set to false to throw an exception if the item being parsed (for example a file) is missing.
+     * Set to true to just return an empty document in that case. Note that this setting applies on
+     * only to fetching the root document, it has no effect on any nested includes.
      *
      * @param allowMissing true to silently ignore missing item
      * @return options with the "allow missing" flag set
@@ -134,8 +137,12 @@ public final class ConfigParseOptions {
         if (this.allowMissing == allowMissing) {
             return this;
         } else {
-            return new ConfigParseOptions(this.syntax, this.originDescription, allowMissing,
-                this.includer, this.classLoader);
+            return new ConfigParseOptions(
+                    this.syntax,
+                    this.originDescription,
+                    allowMissing,
+                    this.includer,
+                    this.classLoader);
         }
     }
 
@@ -149,8 +156,8 @@ public final class ConfigParseOptions {
     }
 
     /**
-     * Set a {@link ConfigIncluder} which customizes how includes are handled.
-     * null means to use the default includer.
+     * Set a {@link ConfigIncluder} which customizes how includes are handled. null means to use the
+     * default includer.
      *
      * @param includer the includer to use or null for default
      * @return new version of the parse options with different includer
@@ -159,16 +166,19 @@ public final class ConfigParseOptions {
         if (this.includer == includer) {
             return this;
         } else {
-            return new ConfigParseOptions(this.syntax, this.originDescription, this.allowMissing,
-                includer, this.classLoader);
+            return new ConfigParseOptions(
+                    this.syntax,
+                    this.originDescription,
+                    this.allowMissing,
+                    includer,
+                    this.classLoader);
         }
     }
 
     /**
-     * Prepends a {@link ConfigIncluder} which customizes how
-     * includes are handled.  To prepend your includer, the
-     * library calls {@link ConfigIncluder#withFallback} on your
-     * includer to append the existing includer to it.
+     * Prepends a {@link ConfigIncluder} which customizes how includes are handled. To prepend your
+     * includer, the library calls {@link ConfigIncluder#withFallback} on your includer to append
+     * the existing includer to it.
      *
      * @param includer the includer to prepend (may not be null)
      * @return new version of the parse options with different includer
@@ -187,9 +197,8 @@ public final class ConfigParseOptions {
     }
 
     /**
-     * Appends a {@link ConfigIncluder} which customizes how
-     * includes are handled.  To append, the library calls {@link
-     * ConfigIncluder#withFallback} on the existing includer.
+     * Appends a {@link ConfigIncluder} which customizes how includes are handled. To append, the
+     * library calls {@link ConfigIncluder#withFallback} on the existing includer.
      *
      * @param includer the includer to append (may not be null)
      * @return new version of the parse options with different includer
@@ -217,25 +226,23 @@ public final class ConfigParseOptions {
     }
 
     /**
-     * Set the class loader. If set to null,
-     * {@code Thread.currentThread().getContextClassLoader()} will be used.
+     * Set the class loader. If set to null, {@code Thread.currentThread().getContextClassLoader()}
+     * will be used.
      *
-     * @param loader a class loader or {@code null} to use thread context class
-     *               loader
+     * @param loader a class loader or {@code null} to use thread context class loader
      * @return options with the class loader set
      */
     public ConfigParseOptions setClassLoader(ClassLoader loader) {
         if (this.classLoader == loader) {
             return this;
         } else {
-            return new ConfigParseOptions(this.syntax, this.originDescription, this.allowMissing,
-                this.includer, loader);
+            return new ConfigParseOptions(
+                    this.syntax, this.originDescription, this.allowMissing, this.includer, loader);
         }
     }
 
     /**
-     * Get the class loader; never returns {@code null}, if the class loader was
-     * unset, returns
+     * Get the class loader; never returns {@code null}, if the class loader was unset, returns
      * {@code Thread.currentThread().getContextClassLoader()}.
      *
      * @return class loader to use
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigNodePath.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigNodePath.java
index d917d84e8..95e1022f4 100644
--- a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigNodePath.java
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigNodePath.java
@@ -32,13 +32,17 @@ final class ConfigNodePath extends AbstractConfigNode {
         int periodCount = 0;
         ArrayList<Token> tokensCopy = new ArrayList<>(tokens);
         for (int i = 0; i < tokensCopy.size(); i++) {
-            if (Tokens.isUnquotedText(tokensCopy.get(i)) &&
-                tokensCopy.get(i).tokenText().equals(ConfigParseOptions.PATH_TOKEN_SEPARATOR)) {
+            if (Tokens.isUnquotedText(tokensCopy.get(i))
+                    && tokensCopy
+                            .get(i)
+                            .tokenText()
+                            .equals(ConfigParseOptions.PATH_TOKEN_SEPARATOR)) {
                 periodCount++;
             }
 
             if (periodCount == toRemove) {
-                return new ConfigNodePath(path.subPath(toRemove), tokensCopy.subList(i + 1, tokensCopy.size()));
+                return new ConfigNodePath(
+                        path.subPath(toRemove), tokensCopy.subList(i + 1, tokensCopy.size()));
             }
         }
         throw new ConfigException.BugOrBroken("Tried to remove too many elements from a Path node");
@@ -47,8 +51,11 @@ final class ConfigNodePath extends AbstractConfigNode {
     protected ConfigNodePath first() {
         ArrayList<Token> tokensCopy = new ArrayList<>(tokens);
         for (int i = 0; i < tokensCopy.size(); i++) {
-            if (Tokens.isUnquotedText(tokensCopy.get(i)) &&
-                tokensCopy.get(i).tokenText().equals(ConfigParseOptions.PATH_TOKEN_SEPARATOR)) {
+            if (Tokens.isUnquotedText(tokensCopy.get(i))
+                    && tokensCopy
+                            .get(i)
+                            .tokenText()
+                            .equals(ConfigParseOptions.PATH_TOKEN_SEPARATOR)) {
                 return new ConfigNodePath(path.subPath(0, 1), tokensCopy.subList(0, i));
             }
         }
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigParser.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigParser.java
index 39b9cb5d4..783546a0d 100644
--- a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigParser.java
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/ConfigParser.java
@@ -23,11 +23,18 @@ import java.util.ListIterator;
 import java.util.Map;
 
 final class ConfigParser {
-    static AbstractConfigValue parse(ConfigNodeRoot document,
-                                     ConfigOrigin origin, ConfigParseOptions options,
-                                     ConfigIncludeContext includeContext) {
-        ParseContext context = new ParseContext(options.getSyntax(), origin, document,
-            SimpleIncluder.makeFull(options.getIncluder()), includeContext);
+    static AbstractConfigValue parse(
+            ConfigNodeRoot document,
+            ConfigOrigin origin,
+            ConfigParseOptions options,
+            ConfigIncludeContext includeContext) {
+        ParseContext context =
+                new ParseContext(
+                        options.getSyntax(),
+                        origin,
+                        document,
+                        SimpleIncluder.makeFull(options.getIncluder()),
+                        includeContext);
         return context.parse();
     }
 
@@ -45,8 +52,12 @@ final class ConfigParser {
         // problem we should be able to get rid of this variable.
         int arrayCount;
 
-        ParseContext(ConfigSyntax flavor, ConfigOrigin origin, ConfigNodeRoot document,
-                     FullIncluder includer, ConfigIncludeContext includeContext) {
+        ParseContext(
+                ConfigSyntax flavor,
+                ConfigOrigin origin,
+                ConfigNodeRoot document,
+                FullIncluder includer,
+                ConfigIncludeContext includeContext) {
             lineNumber = 1;
             this.document = document;
             this.flavor = flavor;
@@ -94,7 +105,8 @@ final class ConfigParser {
         private Path fullCurrentPath() {
             // pathStack has top of stack at front
             if (pathStack.isEmpty()) {
-                throw new ConfigException.BugOrBroken("Bug in parser; tried to get current path when at root");
+                throw new ConfigException.BugOrBroken(
+                        "Bug in parser; tried to get current path when at root");
             } else {
                 return new Path(pathStack.descendingIterator());
             }
@@ -111,10 +123,11 @@ final class ConfigParser {
 
                 Path path = pathStack.peekFirst();
 
-                if (path != null && !ConfigSyntax.JSON.equals(flavor)
-                    && ("source".equals(path.first())
-                    || "transform".equals(path.first())
-                    || "sink".equals(path.first()))) {
+                if (path != null
+                        && !ConfigSyntax.JSON.equals(flavor)
+                        && ("source".equals(path.first())
+                                || "transform".equals(path.first())
+                                || "sink".equals(path.first()))) {
                     v = parseObjectForSeatunnel((ConfigNodeObject) n);
                 } else {
                     v = parseObject((ConfigNodeObject) n);
@@ -134,14 +147,15 @@ final class ConfigParser {
             }
 
             if (arrayCount != startingArrayCount) {
-                throw new ConfigException.BugOrBroken("Bug in config parser: unbalanced array count");
+                throw new ConfigException.BugOrBroken(
+                        "Bug in config parser: unbalanced array count");
             }
 
             return v;
         }
 
-        private static AbstractConfigObject createValueUnderPath(Path path,
-                                                                 AbstractConfigValue value) {
+        private static AbstractConfigObject createValueUnderPath(
+                Path path, AbstractConfigValue value) {
             // for path foo.bar, we are creating
             // { "foo" : { "bar" : value } }
             List<String> keys = new ArrayList<>();
@@ -164,8 +178,10 @@ final class ConfigParser {
             // "foo.bar" not also to "foo"
             ListIterator<String> i = keys.listIterator(keys.size());
             String deepest = i.previous();
-            AbstractConfigObject o = new SimpleConfigObject(value.origin().withComments(null),
-                Collections.singletonMap(deepest, value));
+            AbstractConfigObject o =
+                    new SimpleConfigObject(
+                            value.origin().withComments(null),
+                            Collections.singletonMap(deepest, value));
             while (i.hasPrevious()) {
                 Map<String, AbstractConfigValue> m = Collections.singletonMap(i.previous(), o);
                 o = new SimpleConfigObject(value.origin().withComments(null), m);
@@ -176,7 +192,9 @@ final class ConfigParser {
 
         private void parseInclude(Map<String, AbstractConfigValue> values, ConfigNodeInclude n) {
             boolean isRequired = n.isRequired();
-            ConfigIncludeContext cic = includeContext.setParseOptions(includeContext.parseOptions().setAllowMissing(!isRequired));
+            ConfigIncludeContext cic =
+                    includeContext.setParseOptions(
+                            includeContext.parseOptions().setAllowMissing(!isRequired));
 
             AbstractConfigObject obj;
             switch (n.kind()) {
@@ -191,8 +209,7 @@ final class ConfigParser {
                     break;
 
                 case FILE:
-                    obj = (AbstractConfigObject) includer.includeFile(cic,
-                        new File(n.name()));
+                    obj = (AbstractConfigObject) includer.includeFile(cic, new File(n.name()));
                     break;
 
                 case CLASSPATH:
@@ -200,8 +217,7 @@ final class ConfigParser {
                     break;
 
                 case HEURISTIC:
-                    obj = (AbstractConfigObject) includer
-                        .include(cic, n.name());
+                    obj = (AbstractConfigObject) includer.include(cic, n.name());
                     break;
 
                 default:
@@ -212,9 +228,10 @@ final class ConfigParser {
             // exception is better than producing an incorrect result.
             // See https://github.com/lightbend/config/issues/160
             if (arrayCount > 0 && obj.resolveStatus() != ResolveStatus.RESOLVED) {
-                throw parseError("Due to current limitations of the config parser, when an include statement is nested inside a list value, "
-                    + "${} substitutions inside the included file cannot be resolved correctly. Either move the include outside of the list value or "
-                    + "remove the ${} statements from the included file.");
+                throw parseError(
+                        "Due to current limitations of the config parser, when an include statement is nested inside a list value, "
+                                + "${} substitutions inside the included file cannot be resolved correctly. Either move the include outside of the list value or "
+                                + "remove the ${} statements from the included file.");
             }
 
             if (!pathStack.isEmpty()) {
@@ -247,7 +264,8 @@ final class ConfigParser {
                 if (node instanceof ConfigNodeComment) {
                     lastWasNewline = false;
                     comments.add(((ConfigNodeComment) node).commentText());
-                } else if (node instanceof ConfigNodeSingleToken && Tokens.isNewline(((ConfigNodeSingleToken) node).token())) {
+                } else if (node instanceof ConfigNodeSingleToken
+                        && Tokens.isNewline(((ConfigNodeSingleToken) node).token())) {
                     lineNumber++;
                     if (lastWasNewline) {
                         // Drop all comments if there was a blank line and start a new comment block
@@ -270,9 +288,10 @@ final class ConfigParser {
                         // result. See
                         // https://github.com/lightbend/config/issues/160
                         if (arrayCount > 0) {
-                            throw parseError("Due to current limitations of the config parser, += does not work nested inside a list. "
-                                + "+= expands to a ${} substitution and the path in ${} cannot currently refer to list elements. "
-                                + "You might be able to move the += outside of the list and then refer to it from inside the list with ${}.");
+                            throw parseError(
+                                    "Due to current limitations of the config parser, += does not work nested inside a list. "
+                                            + "+= expands to a ${} substitution and the path in ${} cannot currently refer to list elements. "
+                                            + "You might be able to move the += outside of the list and then refer to it from inside the list with ${}.");
                         }
 
                         // because we will put it in an array after the fact so
@@ -293,10 +312,14 @@ final class ConfigParser {
                         arrayCount -= 1;
 
                         List<AbstractConfigValue> concat = new ArrayList<>(2);
-                        AbstractConfigValue previousRef = new ConfigReference(newValue.origin(),
-                            new SubstitutionExpression(fullCurrentPath(), true /* optional */));
-                        AbstractConfigValue list = new SimpleConfigList(newValue.origin(),
-                            Collections.singletonList(newValue));
+                        AbstractConfigValue previousRef =
+                                new ConfigReference(
+                                        newValue.origin(),
+                                        new SubstitutionExpression(
+                                                fullCurrentPath(), true /* optional */));
+                        AbstractConfigValue list =
+                                new SimpleConfigList(
+                                        newValue.origin(), Collections.singletonList(newValue));
                         concat.add(previousRef);
                         concat.add(list);
                         newValue = ConfigConcatenation.concatenate(concat);
@@ -308,12 +331,17 @@ final class ConfigParser {
                         while (i < nodes.size()) {
                             if (nodes.get(i) instanceof ConfigNodeComment) {
                                 ConfigNodeComment comment = (ConfigNodeComment) nodes.get(i);
-                                newValue = newValue.withOrigin(newValue.origin().appendComments(
-                                    Collections.singletonList(comment.commentText())));
+                                newValue =
+                                        newValue.withOrigin(
+                                                newValue.origin()
+                                                        .appendComments(
+                                                                Collections.singletonList(
+                                                                        comment.commentText())));
                                 break;
                             } else if (nodes.get(i) instanceof ConfigNodeSingleToken) {
                                 ConfigNodeSingleToken curr = (ConfigNodeSingleToken) nodes.get(i);
-                                if (curr.token() == Tokens.COMMA || Tokens.isIgnoredWhitespace(curr.token())) {
+                                if (curr.token() == Tokens.COMMA
+                                        || Tokens.isIgnoredWhitespace(curr.token())) {
                                     // keep searching, as there could still be a comment
                                 } else {
                                     i--;
@@ -342,11 +370,10 @@ final class ConfigParser {
                     } else {
                         if (flavor == ConfigSyntax.JSON) {
                             throw new ConfigException.BugOrBroken(
-                                "somehow got multi-element path in JSON mode");
+                                    "somehow got multi-element path in JSON mode");
                         }
 
-                        AbstractConfigObject obj = createValueUnderPath(
-                            remaining, newValue);
+                        AbstractConfigObject obj = createValueUnderPath(remaining, newValue);
 
                         Map<String, String> m = Collections.singletonMap("plugin_name", key);
                         obj = obj.withFallback(ConfigValueFactory.fromMap(m));
@@ -372,7 +399,8 @@ final class ConfigParser {
                 if (node instanceof ConfigNodeComment) {
                     lastWasNewline = false;
                     comments.add(((ConfigNodeComment) node).commentText());
-                } else if (node instanceof ConfigNodeSingleToken && Tokens.isNewline(((ConfigNodeSingleToken) node).token())) {
+                } else if (node instanceof ConfigNodeSingleToken
+                        && Tokens.isNewline(((ConfigNodeSingleToken) node).token())) {
                     lineNumber++;
                     if (lastWasNewline) {
                         // Drop all comments if there was a blank line and start a new comment block
@@ -395,9 +423,10 @@ final class ConfigParser {
                         // result. See
                         // https://github.com/lightbend/config/issues/160
                         if (arrayCount > 0) {
-                            throw parseError("Due to current limitations of the config parser, += does not work nested inside a list. "
-                                + "+= expands to a ${} substitution and the path in ${} cannot currently refer to list elements. "
-                                + "You might be able to move the += outside of the list and then refer to it from inside the list with ${}.");
+                            throw parseError(
+                                    "Due to current limitations of the config parser, += does not work nested inside a list. "
+                                            + "+= expands to a ${} substitution and the path in ${} cannot currently refer to list elements. "
+                                            + "You might be able to move the += outside of the list and then refer to it from inside the list with ${}.");
                         }
 
                         // because we will put it in an array after the fact so
@@ -418,10 +447,14 @@ final class ConfigParser {
                         arrayCount -= 1;
 
                         List<AbstractConfigValue> concat = new ArrayList<>(2);
-                        AbstractConfigValue previousRef = new ConfigReference(newValue.origin(),
-                            new SubstitutionExpression(fullCurrentPath(), true /* optional */));
-                        AbstractConfigValue list = new SimpleConfigList(newValue.origin(),
-                            Collections.singletonList(newValue));
+                        AbstractConfigValue previousRef =
+                                new ConfigReference(
+                                        newValue.origin(),
+                                        new SubstitutionExpression(
+                                                fullCurrentPath(), true /* optional */));
+                        AbstractConfigValue list =
+                                new SimpleConfigList(
+                                        newValue.origin(), Collections.singletonList(newValue));
                         concat.add(previousRef);
                         concat.add(list);
                         newValue = ConfigConcatenation.concatenate(concat);
@@ -433,12 +466,17 @@ final class ConfigParser {
                         while (i < nodes.size()) {
                             if (nodes.get(i) instanceof ConfigNodeComment) {
                                 ConfigNodeComment comment = (ConfigNodeComment) nodes.get(i);
-                                newValue = newValue.withOrigin(newValue.origin().appendComments(
-                                    Collections.singletonList(comment.commentText())));
+                                newValue =
+                                        newValue.withOrigin(
+                                                newValue.origin()
+                                                        .appendComments(
+                                                                Collections.singletonList(
+                                                                        comment.commentText())));
                                 break;
                             } else if (nodes.get(i) instanceof ConfigNodeSingleToken) {
                                 ConfigNodeSingleToken curr = (ConfigNodeSingleToken) nodes.get(i);
-                                if (curr.token() == Tokens.COMMA || Tokens.isIgnoredWhitespace(curr.token())) {
+                                if (curr.token() == Tokens.COMMA
+                                        || Tokens.isIgnoredWhitespace(curr.token())) {
                                     // keep searching, as there could still be a comment
                                 } else {
                                     i--;
@@ -466,10 +504,11 @@ final class ConfigParser {
                             // could become an object).
 
                             if (flavor == ConfigSyntax.JSON) {
-                                throw parseError("JSON does not allow duplicate fields: '"
-                                    + key
-                                    + "' was already seen at "
-                                    + existing.origin().description());
+                                throw parseError(
+                                        "JSON does not allow duplicate fields: '"
+                                                + key
+                                                + "' was already seen at "
+                                                + existing.origin().description());
                             } else {
                                 newValue = newValue.withFallback(existing);
                             }
@@ -478,11 +517,10 @@ final class ConfigParser {
                     } else {
                         if (flavor == ConfigSyntax.JSON) {
                             throw new ConfigException.BugOrBroken(
-                                "somehow got multi-element path in JSON mode");
+                                    "somehow got multi-element path in JSON mode");
                         }
 
-                        AbstractConfigObject obj = createValueUnderPath(
-                            remaining, newValue);
+                        AbstractConfigObject obj = createValueUnderPath(remaining, newValue);
                         AbstractConfigValue existing = values.get(key);
                         if (existing != null) {
                             obj = obj.withFallback(existing);
@@ -510,12 +548,14 @@ final class ConfigParser {
                 if (node instanceof ConfigNodeComment) {
                     comments.add(((ConfigNodeComment) node).commentText());
                     lastWasNewLine = false;
-                } else if (node instanceof ConfigNodeSingleToken && Tokens.isNewline(((ConfigNodeSingleToken) node).token())) {
+                } else if (node instanceof ConfigNodeSingleToken
+                        && Tokens.isNewline(((ConfigNodeSingleToken) node).token())) {
                     lineNumber++;
                     if (lastWasNewLine && v == null) {
                         comments.clear();
                     } else if (v != null) {
-                        values.add(v.withOrigin(v.origin().appendComments(new ArrayList<>(comments))));
+                        values.add(
+                                v.withOrigin(v.origin().appendComments(new ArrayList<>(comments))));
                         comments.clear();
                         v = null;
                     }
@@ -523,7 +563,8 @@ final class ConfigParser {
                 } else if (node instanceof AbstractConfigNodeValue) {
                     lastWasNewLine = false;
                     if (v != null) {
-                        values.add(v.withOrigin(v.origin().appendComments(new ArrayList<>(comments))));
+                        values.add(
+                                v.withOrigin(v.origin().appendComments(new ArrayList<>(comments))));
                         comments.clear();
                     }
                     v = parseValue((AbstractConfigNodeValue) node, comments);
@@ -552,7 +593,10 @@ final class ConfigParser {
                         if (lastWasNewLine && result == null) {
                             comments.clear();
                         } else if (result != null) {
-                            result = result.withOrigin(result.origin().appendComments(new ArrayList<>(comments)));
+                            result =
+                                    result.withOrigin(
+                                            result.origin()
+                                                    .appendComments(new ArrayList<>(comments)));
                             comments.clear();
                             break;
                         }
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java
index 8ec318f2a..33e856855 100644
--- a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/Path.java
@@ -65,16 +65,12 @@ final class Path {
         return first;
     }
 
-    /**
-     * @return path minus the first element or null if no more elements
-     */
+    /** @return path minus the first element or null if no more elements */
     Path remainder() {
         return remainder;
     }
 
-    /**
-     * @return path minus the last element or null if we have just one element
-     */
+    /** @return path minus the last element or null if we have just one element */
     Path parent() {
         if (remainder == null) {
             return null;
@@ -89,9 +85,7 @@ final class Path {
         return pb.result();
     }
 
-    /**
-     * @return last element in the path
-     */
+    /** @return last element in the path */
     String last() {
         Path p = this;
         while (p.remainder != null) {
@@ -140,7 +134,8 @@ final class Path {
             pb.appendKey(from.first());
             from = from.remainder();
             if (from == null) {
-                throw new ConfigException.BugOrBroken("subPath lastIndex out of range " + lastIndex);
+                throw new ConfigException.BugOrBroken(
+                        "subPath lastIndex out of range " + lastIndex);
             }
         }
         return pb.result();
@@ -167,8 +162,7 @@ final class Path {
         if (other instanceof Path) {
             Path that = (Path) other;
             return this.first.equals(that.first)
-                    && ConfigImplUtil.equalsHandlingNull(this.remainder,
-                    that.remainder);
+                    && ConfigImplUtil.equalsHandlingNull(this.remainder, that.remainder);
         } else {
             return false;
         }
@@ -223,8 +217,8 @@ final class Path {
     }
 
     /**
-     * toString() is a debugging-oriented version while this is an
-     * error-message-oriented human-readable one.
+     * toString() is a debugging-oriented version while this is an error-message-oriented
+     * human-readable one.
      */
     String render() {
         StringBuilder sb = new StringBuilder();
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java
index b512cd1cc..a0cb50b66 100644
--- a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PathParser.java
@@ -45,40 +45,44 @@ final class PathParser {
         }
     }
 
-    protected static Path parsePathExpression(Iterator<Token> expression,
-                                              ConfigOrigin origin) {
+    protected static Path parsePathExpression(Iterator<Token> expression, ConfigOrigin origin) {
         return parsePathExpression(expression, origin, null, null, ConfigSyntax.CONF);
     }
 
-    protected static Path parsePathExpression(Iterator<Token> expression,
-                                              ConfigOrigin origin, String originalText) {
+    protected static Path parsePathExpression(
+            Iterator<Token> expression, ConfigOrigin origin, String originalText) {
         return parsePathExpression(expression, origin, originalText, null, ConfigSyntax.CONF);
     }
 
-    protected static ConfigNodePath parsePathNodeExpression(Iterator<Token> expression,
-                                                            ConfigOrigin origin) {
+    protected static ConfigNodePath parsePathNodeExpression(
+            Iterator<Token> expression, ConfigOrigin origin) {
         return parsePathNodeExpression(expression, origin, null, ConfigSyntax.CONF);
     }
 
-    protected static ConfigNodePath parsePathNodeExpression(Iterator<Token> expression,
-                                                            ConfigOrigin origin, String originalText, ConfigSyntax flavor) {
+    protected static ConfigNodePath parsePathNodeExpression(
+            Iterator<Token> expression,
+            ConfigOrigin origin,
+            String originalText,
+            ConfigSyntax flavor) {
         ArrayList<Token> pathTokens = new ArrayList<>();
         Path path = parsePathExpression(expression, origin, originalText, pathTokens, flavor);
         return new ConfigNodePath(path, pathTokens);
     }
 
     // originalText may be null if not available
-    protected static Path parsePathExpression(Iterator<Token> expression,
-                                              ConfigOrigin origin, String originalText,
-                                              ArrayList<Token> pathTokens,
-                                              ConfigSyntax flavor) {
+    protected static Path parsePathExpression(
+            Iterator<Token> expression,
+            ConfigOrigin origin,
+            String originalText,
+            ArrayList<Token> pathTokens,
+            ConfigSyntax flavor) {
         // each builder in "buf" is an element in the path.
         List<Element> buf = new ArrayList<>();
         buf.add(new Element("", false));
 
         if (!expression.hasNext()) {
-            throw new ConfigException.BadPath(origin, originalText,
-                "Expecting a field name or path here, but got nothing");
+            throw new ConfigException.BadPath(
+                    origin, originalText, "Expecting a field name or path here, but got nothing");
         }
 
         while (expression.hasNext()) {
@@ -119,7 +123,8 @@ final class PathParser {
                     // implementation detail.
                     AbstractConfigValue v = Tokens.getValue(t);
 
-                    // We need to split the tokens on a . so that we can get sub-paths but still preserve
+                    // We need to split the tokens on a . so that we can get sub-paths but still
+                    // preserve
                     // the original path text when doing an insertion
                     if (pathTokens != null) {
                         pathTokens.remove(pathTokens.size() - 1);
@@ -127,7 +132,8 @@ final class PathParser {
                     }
                     text = v.transformToString();
                 } else if (Tokens.isUnquotedText(t)) {
-                    // We need to split the tokens on a . so that we can get sub-paths but still preserve
+                    // We need to split the tokens on a . so that we can get sub-paths but still
+                    // preserve
                     // the original path text when doing an insertion on ConfigNodeObjects
                     if (pathTokens != null) {
                         pathTokens.remove(pathTokens.size() - 1);
@@ -136,11 +142,11 @@ final class PathParser {
                     text = Tokens.getUnquotedText(t);
                 } else {
                     throw new ConfigException.BadPath(
-                        origin,
-                        originalText,
-                        "Token not allowed in path expression: "
-                            + t
-                            + " (you can double-quote this token if you really want it here)");
+                            origin,
+                            originalText,
+                            "Token not allowed in path expression: "
+                                    + t
+                                    + " (you can double-quote this token if you really want it here)");
                 }
 
                 addPathText(buf, false, text);
@@ -151,9 +157,9 @@ final class PathParser {
         for (Element e : buf) {
             if (e.sb.length() == 0 && !e.canBeEmpty) {
                 throw new ConfigException.BadPath(
-                    origin,
-                    originalText,
-                    "path has a leading, trailing, or two adjacent period '.' (use quoted \"\" empty string if you want an empty element)");
+                        origin,
+                        originalText,
+                        "path has a leading, trailing, or two adjacent period '.' (use quoted \"\" empty string if you want an empty element)");
             } else {
                 pb.appendKey(e.sb.toString());
             }
@@ -176,18 +182,20 @@ final class PathParser {
             } else {
                 splitTokens.add(Tokens.newString(t.origin(), s, "\"" + s + "\""));
             }
-            splitTokens.add(Tokens.newUnquotedText(t.origin(), ConfigParseOptions.PATH_TOKEN_SEPARATOR));
+            splitTokens.add(
+                    Tokens.newUnquotedText(t.origin(), ConfigParseOptions.PATH_TOKEN_SEPARATOR));
         }
 
-        if (!tokenText.startsWith(ConfigParseOptions.PATH_TOKEN_SEPARATOR, tokenText.length() - ConfigParseOptions.PATH_TOKEN_SEPARATOR.length())) {
+        if (!tokenText.startsWith(
+                ConfigParseOptions.PATH_TOKEN_SEPARATOR,
+                tokenText.length() - ConfigParseOptions.PATH_TOKEN_SEPARATOR.length())) {
             splitTokens.remove(splitTokens.size() - 1);
         }
 
         return splitTokens;
     }
 
-    private static void addPathText(List<Element> buf, boolean wasQuoted,
-                                    String newText) {
+    private static void addPathText(List<Element> buf, boolean wasQuoted, String newText) {
 
         int i = wasQuoted ? -1 : newText.indexOf(ConfigParseOptions.PATH_TOKEN_SEPARATOR);
         Element current = buf.get(buf.size() - 1);
@@ -205,7 +213,10 @@ final class PathParser {
             // then start a new element
             buf.add(new Element("", false));
             // recurse to consume remainder of newText
-            addPathText(buf, false, newText.substring(i + ConfigParseOptions.PATH_TOKEN_SEPARATOR.length()));
+            addPathText(
+                    buf,
+                    false,
+                    newText.substring(i + ConfigParseOptions.PATH_TOKEN_SEPARATOR.length()));
         }
     }
 
@@ -262,7 +273,12 @@ final class PathParser {
             Path withOneMoreElement = new Path(s.substring(0, end), tail);
             return withOneMoreElement;
         } else {
-            Path withOneMoreElement = new Path(s.substring(splitAt + ConfigParseOptions.PATH_TOKEN_SEPARATOR.length(), end), tail);
+            Path withOneMoreElement =
+                    new Path(
+                            s.substring(
+                                    splitAt + ConfigParseOptions.PATH_TOKEN_SEPARATOR.length(),
+                                    end),
+                            tail);
             return fastPathBuild(withOneMoreElement, s, splitAt);
         }
     }
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PropertiesParser.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PropertiesParser.java
new file mode 100644
index 000000000..eceacf997
--- /dev/null
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/PropertiesParser.java
@@ -0,0 +1,207 @@
+/** Copyright (C) 2011-2012 Typesafe Inc. <http://typesafe.com> */
+package org.apache.seatunnel.shade.com.typesafe.config.impl;
+
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigException;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigOrigin;
+
+import java.io.IOException;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+final class PropertiesParser {
+    static AbstractConfigObject parse(Reader reader, ConfigOrigin origin) throws IOException {
+        Properties props = new Properties();
+        props.load(reader);
+        return fromProperties(origin, props);
+    }
+
+    static String lastElement(String path) {
+        int i = path.lastIndexOf('.');
+        if (i < 0) return path;
+        else return path.substring(i + 1);
+    }
+
+    static String exceptLastElement(String path) {
+        int i = path.lastIndexOf('.');
+        if (i < 0) return null;
+        else return path.substring(0, i);
+    }
+
+    static Path pathFromPropertyKey(String key) {
+        String last = lastElement(key);
+        String exceptLast = exceptLastElement(key);
+        Path path = new Path(last, null);
+        while (exceptLast != null) {
+            last = lastElement(exceptLast);
+            exceptLast = exceptLastElement(exceptLast);
+            path = new Path(last, path);
+        }
+        return path;
+    }
+
+    static AbstractConfigObject fromProperties(ConfigOrigin origin, Properties props) {
+        return fromEntrySet(origin, props.entrySet());
+    }
+
+    private static <K, V> AbstractConfigObject fromEntrySet(
+            ConfigOrigin origin, Set<Map.Entry<K, V>> entries) {
+        final Map<Path, Object> pathMap = getPathMap(entries);
+        return fromPathMap(origin, pathMap, true /* from properties */);
+    }
+
+    private static <K, V> Map<Path, Object> getPathMap(Set<Map.Entry<K, V>> entries) {
+        Map<Path, Object> pathMap = new LinkedHashMap<Path, Object>();
+        for (Map.Entry<K, V> entry : entries) {
+            Object key = entry.getKey();
+            if (key instanceof String) {
+                Path path = pathFromPropertyKey((String) key);
+                pathMap.put(path, entry.getValue());
+            }
+        }
+        return pathMap;
+    }
+
+    static AbstractConfigObject fromStringMap(ConfigOrigin origin, Map<String, String> stringMap) {
+        return fromEntrySet(origin, stringMap.entrySet());
+    }
+
+    static AbstractConfigObject fromPathMap(ConfigOrigin origin, Map<?, ?> pathExpressionMap) {
+        Map<Path, Object> pathMap = new LinkedHashMap<Path, Object>();
+        for (Map.Entry<?, ?> entry : pathExpressionMap.entrySet()) {
+            Object keyObj = entry.getKey();
+            if (!(keyObj instanceof String)) {
+                throw new ConfigException.BugOrBroken(
+                        "Map has a non-string as a key, expecting a path expression as a String");
+            }
+            Path path = Path.newPath((String) keyObj);
+            pathMap.put(path, entry.getValue());
+        }
+        return fromPathMap(origin, pathMap, false /* from properties */);
+    }
+
+    private static AbstractConfigObject fromPathMap(
+            ConfigOrigin origin, Map<Path, Object> pathMap, boolean convertedFromProperties) {
+        /*
+         * First, build a list of paths that will have values, either string or
+         * object values.
+         */
+        Set<Path> scopePaths = new LinkedHashSet<Path>();
+        Set<Path> valuePaths = new LinkedHashSet<Path>();
+        for (Path path : pathMap.keySet()) {
+            // add value's path
+            valuePaths.add(path);
+
+            // all parent paths are objects
+            Path next = path.parent();
+            while (next != null) {
+                scopePaths.add(next);
+                next = next.parent();
+            }
+        }
+
+        if (convertedFromProperties) {
+            /*
+             * If any string values are also objects containing other values,
+             * drop those string values - objects "win".
+             */
+            valuePaths.removeAll(scopePaths);
+        } else {
+            /* If we didn't start out as properties, then this is an error. */
+            for (Path path : valuePaths) {
+                if (scopePaths.contains(path)) {
+                    throw new ConfigException.BugOrBroken(
+                            "In the map, path '"
+                                    + path.render()
+                                    + "' occurs as both the parent object of a value and as a value. "
+                                    + "Because Map has no defined ordering, this is a broken situation.");
+                }
+            }
+        }
+
+        /*
+         * Create maps for the object-valued values.
+         */
+        Map<String, AbstractConfigValue> root = new LinkedHashMap<String, AbstractConfigValue>();
+        Map<Path, Map<String, AbstractConfigValue>> scopes =
+                new LinkedHashMap<Path, Map<String, AbstractConfigValue>>();
+
+        for (Path path : scopePaths) {
+            Map<String, AbstractConfigValue> scope =
+                    new LinkedHashMap<String, AbstractConfigValue>();
+            scopes.put(path, scope);
+        }
+
+        /* Store string values in the associated scope maps */
+        for (Path path : valuePaths) {
+            Path parentPath = path.parent();
+            Map<String, AbstractConfigValue> parent =
+                    parentPath != null ? scopes.get(parentPath) : root;
+
+            String last = path.last();
+            Object rawValue = pathMap.get(path);
+            AbstractConfigValue value;
+            if (convertedFromProperties) {
+                if (rawValue instanceof String) {
+                    value = new ConfigString.Quoted(origin, (String) rawValue);
+                } else {
+                    // silently ignore non-string values in Properties
+                    value = null;
+                }
+            } else {
+                value =
+                        ConfigImpl.fromAnyRef(
+                                pathMap.get(path), origin, FromMapMode.KEYS_ARE_PATHS);
+            }
+            if (value != null) parent.put(last, value);
+        }
+
+        /*
+         * Make a list of scope paths from longest to shortest, so children go
+         * before parents.
+         */
+        List<Path> sortedScopePaths = new ArrayList<Path>();
+        sortedScopePaths.addAll(scopePaths);
+        // sort descending by length
+        Collections.sort(
+                sortedScopePaths,
+                new Comparator<Path>() {
+                    @Override
+                    public int compare(Path a, Path b) {
+                        // Path.length() is O(n) so in theory this sucks
+                        // but in practice we can make Path precompute length
+                        // if it ever matters.
+                        return b.length() - a.length();
+                    }
+                });
+
+        /*
+         * Create ConfigObject for each scope map, working from children to
+         * parents to avoid modifying any already-created ConfigObject. This is
+         * where we need the sorted list.
+         */
+        for (Path scopePath : sortedScopePaths) {
+            Map<String, AbstractConfigValue> scope = scopes.get(scopePath);
+
+            Path parentPath = scopePath.parent();
+            Map<String, AbstractConfigValue> parent =
+                    parentPath != null ? scopes.get(parentPath) : root;
+
+            AbstractConfigObject o =
+                    new SimpleConfigObject(
+                            origin, scope, ResolveStatus.RESOLVED, false /* ignoresFallbacks */);
+            parent.put(scopePath.last(), o);
+        }
+
+        // return root config object
+        return new SimpleConfigObject(
+                origin, root, ResolveStatus.RESOLVED, false /* ignoresFallbacks */);
+    }
+}
diff --git a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/SimpleConfigObject.java b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/SimpleConfigObject.java
index ed52f4b41..8ff4b846c 100644
--- a/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/SimpleConfigObject.java
+++ b/seatunnel-config/seatunnel-config-shade/src/main/java/org/apache/seatunnel/shade/com/typesafe/config/impl/SimpleConfigObject.java
@@ -28,10 +28,15 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
     private final Map<String, AbstractConfigValue> value;
     private final boolean resolved;
     private final boolean ignoresFallbacks;
-    private static final SimpleConfigObject EMPTY_INSTANCE = empty(SimpleConfigOrigin.newSimple("empty config"));
+    private static final SimpleConfigObject EMPTY_INSTANCE =
+            empty(SimpleConfigOrigin.newSimple("empty config"));
     private static final int HASH_CODE = 41;
 
-    SimpleConfigObject(ConfigOrigin origin, Map<String, AbstractConfigValue> value, ResolveStatus status, boolean ignoresFallbacks) {
+    SimpleConfigObject(
+            ConfigOrigin origin,
+            Map<String, AbstractConfigValue> value,
+            ResolveStatus status,
+            boolean ignoresFallbacks) {
         super(origin);
         if (value == null) {
             throw new ConfigException.BugOrBroken("creating config object with null map");
@@ -69,12 +74,24 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
             }
         }
 
-        return v == null ? null : new SimpleConfigObject(this.origin(), Collections.singletonMap(key, v), v.resolveStatus(), this.ignoresFallbacks);
+        return v == null
+                ? null
+                : new SimpleConfigObject(
+                        this.origin(),
+                        Collections.singletonMap(key, v),
+                        v.resolveStatus(),
+                        this.ignoresFallbacks);
     }
 
     SimpleConfigObject withOnlyPath(Path path) {
         SimpleConfigObject o = this.withOnlyPathOrNull(path);
-        return o == null ? new SimpleConfigObject(this.origin(), Collections.emptyMap(), ResolveStatus.RESOLVED, this.ignoresFallbacks) : o;
+        return o == null
+                ? new SimpleConfigObject(
+                        this.origin(),
+                        Collections.emptyMap(),
+                        ResolveStatus.RESOLVED,
+                        this.ignoresFallbacks)
+                : o;
     }
 
     SimpleConfigObject withoutPath(Path path) {
@@ -86,17 +103,28 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
             v = ((AbstractConfigObject) v).withoutPath(next);
             smaller = new HashMap<>(this.value);
             smaller.put(key, v);
-            return new SimpleConfigObject(this.origin(), smaller, ResolveStatus.fromValues(smaller.values()), this.ignoresFallbacks);
+            return new SimpleConfigObject(
+                    this.origin(),
+                    smaller,
+                    ResolveStatus.fromValues(smaller.values()),
+                    this.ignoresFallbacks);
         } else if (next == null && v != null) {
             smaller = new HashMap<>(this.value.size() - 1);
 
-            for (Entry<String, AbstractConfigValue> stringAbstractConfigValueEntry : this.value.entrySet()) {
+            for (Entry<String, AbstractConfigValue> stringAbstractConfigValueEntry :
+                    this.value.entrySet()) {
                 if (!stringAbstractConfigValueEntry.getKey().equals(key)) {
-                    smaller.put(stringAbstractConfigValueEntry.getKey(), stringAbstractConfigValueEntry.getValue());
+                    smaller.put(
+                            stringAbstractConfigValueEntry.getKey(),
+                            stringAbstractConfigValueEntry.getValue());
                 }
             }
 
-            return new SimpleConfigObject(this.origin(), smaller, ResolveStatus.fromValues(smaller.values()), this.ignoresFallbacks);
+            return new SimpleConfigObject(
+                    this.origin(),
+                    smaller,
+                    ResolveStatus.fromValues(smaller.values()),
+                    this.ignoresFallbacks);
         } else {
             return this;
         }
@@ -104,7 +132,8 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
 
     public SimpleConfigObject withValue(String key, ConfigValue v) {
         if (v == null) {
-            throw new ConfigException.BugOrBroken("Trying to store null ConfigValue in a ConfigObject");
+            throw new ConfigException.BugOrBroken(
+                    "Trying to store null ConfigValue in a ConfigObject");
         } else {
             Map newMap;
             if (this.value.isEmpty()) {
@@ -114,7 +143,11 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
                 newMap.put(key, v);
             }
 
-            return new SimpleConfigObject(this.origin(), newMap, ResolveStatus.fromValues(newMap.values()), this.ignoresFallbacks);
+            return new SimpleConfigObject(
+                    this.origin(),
+                    newMap,
+                    ResolveStatus.fromValues(newMap.values()),
+                    this.ignoresFallbacks);
         }
     }
 
@@ -128,7 +161,12 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
             if (child instanceof AbstractConfigObject) {
                 return this.withValue(key, ((AbstractConfigObject) child).withValue(next, v));
             } else {
-                SimpleConfig subtree = ((AbstractConfigValue) v).atPath(SimpleConfigOrigin.newSimple("withValue(" + next.render() + ")"), next);
+                SimpleConfig subtree =
+                        ((AbstractConfigValue) v)
+                                .atPath(
+                                        SimpleConfigOrigin.newSimple(
+                                                "withValue(" + next.render() + ")"),
+                                        next);
                 return this.withValue(key, subtree.root());
             }
         }
@@ -138,7 +176,8 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
         return this.value.get(key);
     }
 
-    private SimpleConfigObject newCopy(ResolveStatus newStatus, ConfigOrigin newOrigin, boolean newIgnoresFallbacks) {
+    private SimpleConfigObject newCopy(
+            ResolveStatus newStatus, ConfigOrigin newOrigin, boolean newIgnoresFallbacks) {
         return new SimpleConfigObject(newOrigin, this.value, newStatus, newIgnoresFallbacks);
     }
 
@@ -147,21 +186,25 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
     }
 
     protected SimpleConfigObject withFallbacksIgnored() {
-        return this.ignoresFallbacks ? this : this.newCopy(this.resolveStatus(), this.origin(), true);
+        return this.ignoresFallbacks
+                ? this
+                : this.newCopy(this.resolveStatus(), this.origin(), true);
     }
 
     ResolveStatus resolveStatus() {
         return ResolveStatus.fromBoolean(this.resolved);
     }
 
-    public SimpleConfigObject replaceChild(AbstractConfigValue child, AbstractConfigValue replacement) {
+    public SimpleConfigObject replaceChild(
+            AbstractConfigValue child, AbstractConfigValue replacement) {
         Map<String, AbstractConfigValue> newChildren = new HashMap<>(this.value);
         Iterator<Entry<String, AbstractConfigValue>> var4 = newChildren.entrySet().iterator();
 
         Entry<String, AbstractConfigValue> old;
         do {
             if (!var4.hasNext()) {
-                throw new ConfigException.BugOrBroken("SimpleConfigObject.replaceChild did not find " + child + " in " + this);
+                throw new ConfigException.BugOrBroken(
+                        "SimpleConfigObject.replaceChild did not find " + child + " in " + this);
             }
 
             old = var4.next();
@@ -173,7 +216,11 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
             newChildren.remove(old.getKey());
         }
 
-        return new SimpleConfigObject(this.origin(), newChildren, ResolveStatus.fromValues(newChildren.values()), this.ignoresFallbacks);
+        return new SimpleConfigObject(
+                this.origin(),
+                newChildren,
+                ResolveStatus.fromValues(newChildren.values()),
+                this.ignoresFallbacks);
     }
 
     public boolean hasDescendant(AbstractConfigValue descendant) {
@@ -190,7 +237,8 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
                     }
 
                     child = var2.next();
-                } while (!(child instanceof Container) || !((Container) child).hasDescendant(descendant));
+                } while (!(child instanceof Container)
+                        || !((Container) child).hasDescendant(descendant));
 
                 return true;
             }
@@ -208,8 +256,11 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
     public Map<String, Object> unwrapped() {
         Map<String, Object> m = new HashMap<>();
 
-        for (Entry<String, AbstractConfigValue> stringAbstractConfigValueEntry : this.value.entrySet()) {
-            m.put(stringAbstractConfigValueEntry.getKey(), stringAbstractConfigValueEntry.getValue().unwrapped());
+        for (Entry<String, AbstractConfigValue> stringAbstractConfigValueEntry :
+                this.value.entrySet()) {
+            m.put(
+                    stringAbstractConfigValueEntry.getKey(),
+                    stringAbstractConfigValueEntry.getValue().unwrapped());
         }
 
         return m;
@@ -218,7 +269,8 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
     protected SimpleConfigObject mergedWithObject(AbstractConfigObject abstractFallback) {
         this.requireNotIgnoringFallbacks();
         if (!(abstractFallback instanceof SimpleConfigObject)) {
-            throw new ConfigException.BugOrBroken("should not be reached (merging non-SimpleConfigObject)");
+            throw new ConfigException.BugOrBroken(
+                    "should not be reached (merging non-SimpleConfigObject)");
         } else {
             SimpleConfigObject fallback = (SimpleConfigObject) abstractFallback;
             boolean changed = false;
@@ -253,8 +305,13 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
             ResolveStatus newResolveStatus = ResolveStatus.fromBoolean(allResolved);
             boolean newIgnoresFallbacks = fallback.ignoresFallbacks();
             if (changed) {
-                return new SimpleConfigObject(mergeOrigins(this, fallback), merged, newResolveStatus, newIgnoresFallbacks);
-            } else if (newResolveStatus == this.resolveStatus() && newIgnoresFallbacks == this.ignoresFallbacks()) {
+                return new SimpleConfigObject(
+                        mergeOrigins(this, fallback),
+                        merged,
+                        newResolveStatus,
+                        newIgnoresFallbacks);
+            } else if (newResolveStatus == this.resolveStatus()
+                    && newIgnoresFallbacks == this.ignoresFallbacks()) {
                 return this;
             } else {
                 return this.newCopy(newResolveStatus, this.origin(), newIgnoresFallbacks);
@@ -312,18 +369,24 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
                 }
             }
 
-            return new SimpleConfigObject(this.origin(), modified, sawUnresolved ? ResolveStatus.UNRESOLVED : ResolveStatus.RESOLVED, this.ignoresFallbacks());
+            return new SimpleConfigObject(
+                    this.origin(),
+                    modified,
+                    sawUnresolved ? ResolveStatus.UNRESOLVED : ResolveStatus.RESOLVED,
+                    this.ignoresFallbacks());
         }
     }
 
-    ResolveResult<? extends AbstractConfigObject> resolveSubstitutions(ResolveContext context, ResolveSource source) throws NotPossibleToResolve {
+    ResolveResult<? extends AbstractConfigObject> resolveSubstitutions(
+            ResolveContext context, ResolveSource source) throws NotPossibleToResolve {
         if (this.resolveStatus() == ResolveStatus.RESOLVED) {
             return ResolveResult.make(context, this);
         } else {
             ResolveSource sourceWithParent = source.pushParent(this);
 
             try {
-                SimpleConfigObject.ResolveModifier modifier = new SimpleConfigObject.ResolveModifier(context, sourceWithParent);
+                SimpleConfigObject.ResolveModifier modifier =
+                        new SimpleConfigObject.ResolveModifier(context, sourceWithParent);
                 AbstractConfigValue value = this.modifyMayThrow(modifier);
                 return ResolveResult.make(modifier.context, value).asObjectResult();
             } catch (NotPossibleToResolve | RuntimeException var6) {
@@ -335,14 +398,16 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
     }
 
     SimpleConfigObject relativized(final Path prefix) {
-        return this.modify(new NoExceptionsModifier() {
-            public AbstractConfigValue modifyChild(String key, AbstractConfigValue v) {
-                return v.relativized(prefix);
-            }
-        });
+        return this.modify(
+                new NoExceptionsModifier() {
+                    public AbstractConfigValue modifyChild(String key, AbstractConfigValue v) {
+                        return v.relativized(prefix);
+                    }
+                });
     }
 
-    protected void render(StringBuilder sb, int indent, boolean atRoot, ConfigRenderOptions options) {
+    protected void render(
+            StringBuilder sb, int indent, boolean atRoot, ConfigRenderOptions options) {
         if (this.isEmpty()) {
             sb.append("{}");
         } else {
@@ -423,7 +488,6 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
         if (atRoot && options.getFormatted()) {
             sb.append('\n');
         }
-
     }
 
     public AbstractConfigValue get(Object key) {
@@ -447,7 +511,6 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
 
                     key = var4.next();
                 } while (a.get(key).equals(b.get(key)));
-
             }
             return false;
         }
@@ -460,7 +523,9 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
         int valuesHash = 0;
 
         String k;
-        for (Iterator<String> var3 = keys.iterator(); var3.hasNext(); valuesHash += m.get(k).hashCode()) {
+        for (Iterator<String> var3 = keys.iterator();
+                var3.hasNext();
+                valuesHash += m.get(k).hashCode()) {
             k = var3.next();
         }
 
@@ -498,8 +563,12 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
     public Set<Entry<String, ConfigValue>> entrySet() {
         HashSet<Entry<String, ConfigValue>> entries = new HashSet<>();
 
-        for (Entry<String, AbstractConfigValue> stringAbstractConfigValueEntry : this.value.entrySet()) {
-            entries.add(new AbstractMap.SimpleImmutableEntry<>(stringAbstractConfigValueEntry.getKey(), stringAbstractConfigValueEntry.getValue()));
+        for (Entry<String, AbstractConfigValue> stringAbstractConfigValueEntry :
+                this.value.entrySet()) {
+            entries.add(
+                    new AbstractMap.SimpleImmutableEntry<>(
+                            stringAbstractConfigValueEntry.getKey(),
+                            stringAbstractConfigValueEntry.getValue()));
         }
 
         return entries;
@@ -526,7 +595,9 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
     }
 
     static SimpleConfigObject emptyMissing(ConfigOrigin baseOrigin) {
-        return new SimpleConfigObject(SimpleConfigOrigin.newSimple(baseOrigin.description() + " (not found)"), Collections.emptyMap());
+        return new SimpleConfigObject(
+                SimpleConfigOrigin.newSimple(baseOrigin.description() + " (not found)"),
+                Collections.emptyMap());
     }
 
     private Object writeReplace() throws ObjectStreamException {
@@ -544,13 +615,16 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
             this.originalRestrict = context.restrictToChild();
         }
 
-        public AbstractConfigValue modifyChildMayThrow(String key, AbstractConfigValue v) throws NotPossibleToResolve {
+        public AbstractConfigValue modifyChildMayThrow(String key, AbstractConfigValue v)
+                throws NotPossibleToResolve {
             if (this.context.isRestrictedToChild()) {
                 if (key.equals(this.context.restrictToChild().first())) {
                     Path remainder = this.context.restrictToChild().remainder();
                     if (remainder != null) {
-                        ResolveResult<? extends AbstractConfigValue> result = this.context.restrict(remainder).resolve(v, this.source);
-                        this.context = result.context.unrestricted().restrict(this.originalRestrict);
+                        ResolveResult<? extends AbstractConfigValue> result =
+                                this.context.restrict(remainder).resolve(v, this.source);
+                        this.context =
+                                result.context.unrestricted().restrict(this.originalRestrict);
                         return result.value;
                     } else {
                         return v;
@@ -559,7 +633,8 @@ final class SimpleConfigObject extends AbstractConfigObject implements Serializa
                     return v;
                 }
             } else {
-                ResolveResult<? extends AbstractConfigValue> result = this.context.unrestricted().resolve(v, this.source);
+                ResolveResult<? extends AbstractConfigValue> result =
+                        this.context.unrestricted().resolve(v, this.source);
                 this.context = result.context.unrestricted().restrict(this.originalRestrict);
                 return result.value;
             }
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/CompleteTest.java b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/CompleteTest.java
index 1652b5d45..63e4a51aa 100644
--- a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/CompleteTest.java
+++ b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/CompleteTest.java
@@ -17,12 +17,12 @@
 
 package org.apache.seatunnel.config;
 
-import org.apache.seatunnel.config.utils.FileUtils;
-
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
 
+import org.apache.seatunnel.config.utils.FileUtils;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -34,21 +34,22 @@ public class CompleteTest {
 
     @Test
     public void testVariables() throws URISyntaxException {
-        // We use a map to mock the system property, since the system property will be only loaded once
+        // We use a map to mock the system property, since the system property will be only loaded
+        // once
         // after the test is run. see Issue #1670
         Map<String, String> systemProperties = new HashMap<>();
         systemProperties.put("dt", "20190318");
         systemProperties.put("city2", "shanghai");
 
-        Config config = ConfigFactory
-            .parseFile(FileUtils.getFileFromResources("/seatunnel/variables.conf"))
-            .resolveWith(ConfigFactory.parseMap(systemProperties), ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        Config config =
+                ConfigFactory.parseFile(FileUtils.getFileFromResources("/seatunnel/variables.conf"))
+                        .resolveWith(
+                                ConfigFactory.parseMap(systemProperties),
+                                ConfigResolveOptions.defaults().setAllowUnresolved(true));
         String sql1 = config.getConfigList("transform").get(1).getString("sql");
         String sql2 = config.getConfigList("transform").get(2).getString("sql");
 
         Assertions.assertTrue(sql1.contains("shanghai"));
         Assertions.assertTrue(sql2.contains("20190318"));
-
     }
-
 }
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/ConfigFactoryTest.java b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/ConfigFactoryTest.java
index 4f24ab7af..2a90befbb 100644
--- a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/ConfigFactoryTest.java
+++ b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/ConfigFactoryTest.java
@@ -17,11 +17,11 @@
 
 package org.apache.seatunnel.config;
 
-import org.apache.seatunnel.config.utils.FileUtils;
-
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
+import org.apache.seatunnel.config.utils.FileUtils;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -34,7 +34,8 @@ public class ConfigFactoryTest {
     @Test
     public void testBasicParseAppConf() throws URISyntaxException {
 
-        Config config = ConfigFactory.parseFile(FileUtils.getFileFromResources("/factory/config.conf"));
+        Config config =
+                ConfigFactory.parseFile(FileUtils.getFileFromResources("/factory/config.conf"));
 
         Assertions.assertTrue(config.hasPath("env"));
         Assertions.assertTrue(config.hasPath("source"));
@@ -50,13 +51,15 @@ public class ConfigFactoryTest {
         Assertions.assertEquals("5", env.getString("spark.stream.batchDuration"));
 
         // check custom plugin
-        Assertions.assertEquals("c.Console", config.getConfigList("sink").get(1).getString("plugin_name"));
+        Assertions.assertEquals(
+                "c.Console", config.getConfigList("sink").get(1).getString("plugin_name"));
     }
 
     @Test
     public void testTransformOrder() throws URISyntaxException {
 
-        Config config = ConfigFactory.parseFile(FileUtils.getFileFromResources("/factory/config.conf"));
+        Config config =
+                ConfigFactory.parseFile(FileUtils.getFileFromResources("/factory/config.conf"));
 
         String[] pluginNames = {"split", "sql1", "sql2", "sql3", "json"};
 
@@ -64,20 +67,25 @@ public class ConfigFactoryTest {
         Assertions.assertEquals(pluginNames.length, transforms.size());
 
         for (int i = 0; i < transforms.size(); i++) {
-            String parsedPluginName = String.valueOf(transforms.get(i).root().get("plugin_name").unwrapped());
+            String parsedPluginName =
+                    String.valueOf(transforms.get(i).root().get("plugin_name").unwrapped());
             Assertions.assertEquals(pluginNames[i], parsedPluginName);
         }
-
     }
 
     @Test
     public void testQuotedString() throws URISyntaxException {
-        List<String> keys = Arrays.asList("spark.app.name", "spark.executor.instances", "spark.executor.cores",
-                "spark.executor.memory", "spark.stream.batchDuration");
-
-        Config config = ConfigFactory.parseFile(FileUtils.getFileFromResources("/factory/config.conf"));
+        List<String> keys =
+                Arrays.asList(
+                        "spark.app.name",
+                        "spark.executor.instances",
+                        "spark.executor.cores",
+                        "spark.executor.memory",
+                        "spark.stream.batchDuration");
+
+        Config config =
+                ConfigFactory.parseFile(FileUtils.getFileFromResources("/factory/config.conf"));
         Config evnConfig = config.getConfig("env");
         evnConfig.entrySet().forEach(entry -> Assertions.assertTrue(keys.contains(entry.getKey())));
-
     }
 }
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/JsonFormatTest.java b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/JsonFormatTest.java
index bc39f5cd4..6cd3c1a0a 100644
--- a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/JsonFormatTest.java
+++ b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/JsonFormatTest.java
@@ -17,12 +17,12 @@
 
 package org.apache.seatunnel.config;
 
-import org.apache.seatunnel.config.utils.FileUtils;
-
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
 
+import org.apache.seatunnel.config.utils.FileUtils;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -33,21 +33,21 @@ public class JsonFormatTest {
     @Test
     public void testJsonFormat() throws URISyntaxException {
 
-        Config json = ConfigFactory
-                .parseFile(FileUtils.getFileFromResources("/json/spark.batch.json"))
-                .resolveWith(ConfigFactory.systemProperties(),
-                        ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        Config json =
+                ConfigFactory.parseFile(FileUtils.getFileFromResources("/json/spark.batch.json"))
+                        .resolveWith(
+                                ConfigFactory.systemProperties(),
+                                ConfigResolveOptions.defaults().setAllowUnresolved(true));
 
-        Config config = ConfigFactory
-                .parseFile(FileUtils.getFileFromResources("/json/spark.batch.conf"))
-                .resolveWith(ConfigFactory.systemProperties(),
-                        ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        Config config =
+                ConfigFactory.parseFile(FileUtils.getFileFromResources("/json/spark.batch.conf"))
+                        .resolveWith(
+                                ConfigFactory.systemProperties(),
+                                ConfigResolveOptions.defaults().setAllowUnresolved(true));
 
         Assertions.assertEquals(config.atPath("transform"), json.atPath("transform"));
         Assertions.assertEquals(config.atPath("sink"), json.atPath("sink"));
         Assertions.assertEquals(config.atPath("source"), json.atPath("source"));
         Assertions.assertEquals(config.atPath("env"), json.atPath("env"));
-
     }
-
 }
diff --git a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/utils/FileUtils.java b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/utils/FileUtils.java
index 3b0e01b79..adfbc20e2 100644
--- a/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/utils/FileUtils.java
+++ b/seatunnel-config/seatunnel-config-shade/src/test/java/org/apache/seatunnel/config/utils/FileUtils.java
@@ -24,8 +24,7 @@ import java.nio.file.Paths;
 
 public final class FileUtils {
 
-    private FileUtils() {
-    }
+    private FileUtils() {}
 
     // get file from classpath, resources folder
     public static File getFileFromResources(String fileName) throws URISyntaxException {
@@ -35,5 +34,4 @@ public final class FileUtils {
         }
         return Paths.get(resource.toURI()).toFile();
     }
-
 }
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java
index db74c5e61..9b818ca95 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/AbstractCommandArgs.java
@@ -56,5 +56,17 @@ public abstract class AbstractCommandArgs extends CommandArgs {
             description = "SeaTunnel job name")
     protected String jobName = Constants.LOGO;
 
+    @Parameter(
+            names = {"--encrypt"},
+            description =
+                    "Encrypt config file, when both --decrypt and --encrypt are specified, only --encrypt will take effect")
+    protected boolean encrypt = false;
+
+    @Parameter(
+            names = {"--decrypt"},
+            description =
+                    "Decrypt config file, When both --decrypt and --encrypt are specified, only --encrypt will take effect")
+    protected boolean decrypt = false;
+
     public abstract DeployMode getDeployMode();
 }
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/ConfDecryptCommand.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/ConfDecryptCommand.java
new file mode 100644
index 000000000..190d658a0
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/ConfDecryptCommand.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.command;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
+import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
+import org.apache.seatunnel.core.starter.utils.ConfigShadeUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import static org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist;
+
+@Slf4j
+public class ConfDecryptCommand implements Command<AbstractCommandArgs> {
+
+    private final AbstractCommandArgs abstractCommandArgs;
+
+    public ConfDecryptCommand(AbstractCommandArgs abstractCommandArgs) {
+        this.abstractCommandArgs = abstractCommandArgs;
+    }
+
+    @Override
+    public void execute() throws CommandExecuteException, ConfigCheckException {
+        String decryptConfigFile = abstractCommandArgs.getConfigFile();
+        Path configPath = Paths.get(decryptConfigFile);
+        checkConfigExist(configPath);
+        Config config =
+                ConfigFactory.parseFile(configPath.toFile())
+                        .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+                        .resolveWith(
+                                ConfigFactory.systemProperties(),
+                                ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        Config encryptConfig = ConfigShadeUtils.decryptConfig(config);
+        log.info(
+                "Encrypt config: \n{}",
+                encryptConfig
+                        .root()
+                        .render(ConfigRenderOptions.defaults().setOriginComments(false)));
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/ConfEncryptCommand.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/ConfEncryptCommand.java
new file mode 100644
index 000000000..3fef61756
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/command/ConfEncryptCommand.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.command;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
+import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
+import org.apache.seatunnel.core.starter.utils.ConfigShadeUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+import static org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist;
+
+@Slf4j
+public class ConfEncryptCommand implements Command<AbstractCommandArgs> {
+
+    private final AbstractCommandArgs abstractCommandArgs;
+
+    public ConfEncryptCommand(AbstractCommandArgs abstractCommandArgs) {
+        this.abstractCommandArgs = abstractCommandArgs;
+    }
+
+    @Override
+    public void execute() throws CommandExecuteException, ConfigCheckException {
+        if (abstractCommandArgs.isDecrypt()) {
+            log.warn(
+                    "When both --decrypt and --encrypt are specified, only --encrypt will take effect");
+        }
+        String encryptConfigFile = abstractCommandArgs.getConfigFile();
+        Path configPath = Paths.get(encryptConfigFile);
+        checkConfigExist(configPath);
+        Config config =
+                ConfigFactory.parseFile(configPath.toFile())
+                        .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+                        .resolveWith(
+                                ConfigFactory.systemProperties(),
+                                ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        Config encryptConfig = ConfigShadeUtils.encryptConfig(config);
+        log.info(
+                "Encrypt config: \n{}",
+                encryptConfig
+                        .root()
+                        .render(ConfigRenderOptions.defaults().setOriginComments(false)));
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigAdapterUtils.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigAdapterUtils.java
index b4441ea0e..d9c75fda8 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigAdapterUtils.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigAdapterUtils.java
@@ -39,13 +39,7 @@ public final class ConfigAdapterUtils {
     static {
         ServiceLoader<ConfigAdapter> serviceLoader = ServiceLoader.load(ConfigAdapter.class);
         Iterator<ConfigAdapter> it = serviceLoader.iterator();
-        if (it.hasNext()) {
-            try {
-                CONFIG_ADAPTERS.add(it.next());
-            } catch (Exception loadSpiErr) {
-                log.warn(loadSpiErr.getMessage());
-            }
-        }
+        it.forEachRemaining(CONFIG_ADAPTERS::add);
     }
 
     public static Optional<ConfigAdapter> selectAdapter(@NonNull String filePath) {
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
index 1b8080986..ed66b550a 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigBuilder.java
@@ -36,7 +36,7 @@ import java.util.Optional;
 @Slf4j
 public class ConfigBuilder {
 
-    private static final ConfigRenderOptions CONFIG_RENDER_OPTIONS =
+    public static final ConfigRenderOptions CONFIG_RENDER_OPTIONS =
             ConfigRenderOptions.concise().setFormatted(true);
 
     private ConfigBuilder() {
@@ -44,11 +44,13 @@ public class ConfigBuilder {
     }
 
     private static Config ofInner(@NonNull Path filePath) {
-        return ConfigFactory.parseFile(filePath.toFile())
-                .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
-                .resolveWith(
-                        ConfigFactory.systemProperties(),
-                        ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        Config config =
+                ConfigFactory.parseFile(filePath.toFile())
+                        .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+                        .resolveWith(
+                                ConfigFactory.systemProperties(),
+                                ConfigResolveOptions.defaults().setAllowUnresolved(true));
+        return ConfigShadeUtils.decryptConfig(config);
     }
 
     public static Config of(@NonNull String filePath) {
@@ -68,10 +70,11 @@ public class ConfigBuilder {
     }
 
     public static Config of(@NonNull ConfigAdapter configAdapter, @NonNull Path filePath) {
-        log.info("With spi {}", configAdapter.getClass().getName());
+        log.info("With config adapter spi {}", configAdapter.getClass().getName());
         try {
             Map<String, Object> flattenedMap = configAdapter.loadConfig(filePath);
-            return ConfigFactory.parseMap(flattenedMap);
+            Config config = ConfigFactory.parseMap(flattenedMap);
+            return ConfigShadeUtils.decryptConfig(config);
         } catch (Exception warn) {
             log.warn(
                     "Loading config failed with spi {}, fallback to HOCON loader.",
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java
new file mode 100644
index 000000000..fc4f458b6
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/utils/ConfigShadeUtils.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.utils;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+
+import org.apache.seatunnel.api.configuration.ConfigShade;
+import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.config.TypesafeConfigUtils;
+import org.apache.seatunnel.common.utils.JsonUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+import java.util.function.BiFunction;
+
+/** Config shade utilities */
+@Slf4j
+public final class ConfigShadeUtils {
+
+    private static final String SHADE_IDENTIFIER_OPTION = "shade.identifier";
+
+    private static final String[] DEFAULT_SENSITIVE_OPTIONS =
+            new String[] {"password", "username", "auth"};
+
+    private static final Map<String, ConfigShade> CONFIG_SHADES = new HashMap<>();
+
+    private static final ConfigShade DEFAULT_SHADE = new DefaultConfigShade();
+
+    static {
+        ServiceLoader<ConfigShade> serviceLoader = ServiceLoader.load(ConfigShade.class);
+        Iterator<ConfigShade> it = serviceLoader.iterator();
+        it.forEachRemaining(
+                configShade -> {
+                    CONFIG_SHADES.put(configShade.getIdentifier(), configShade);
+                });
+        log.info("Load config shade spi: {}", CONFIG_SHADES.keySet());
+    }
+
+    private static class DefaultConfigShade implements ConfigShade {
+        private static final String IDENTIFIER = "default";
+
+        @Override
+        public String getIdentifier() {
+            return IDENTIFIER;
+        }
+
+        @Override
+        public String encrypt(String content) {
+            return content;
+        }
+
+        @Override
+        public String decrypt(String content) {
+            return content;
+        }
+    }
+
+    public static String encryptOption(String identifier, String content) {
+        ConfigShade configShade = CONFIG_SHADES.getOrDefault(identifier, DEFAULT_SHADE);
+        return configShade.encrypt(content);
+    }
+
+    public static String decryptOption(String identifier, String content) {
+        ConfigShade configShade = CONFIG_SHADES.getOrDefault(identifier, DEFAULT_SHADE);
+        return configShade.decrypt(content);
+    }
+
+    public static Config decryptConfig(Config config) {
+        String identifier =
+                TypesafeConfigUtils.getConfig(
+                        config.hasPath(Constants.ENV)
+                                ? config.getConfig(Constants.ENV)
+                                : ConfigFactory.empty(),
+                        SHADE_IDENTIFIER_OPTION,
+                        DEFAULT_SHADE.getIdentifier());
+        return decryptConfig(identifier, config);
+    }
+
+    public static Config encryptConfig(Config config) {
+        String identifier =
+                TypesafeConfigUtils.getConfig(
+                        config.hasPath(Constants.ENV)
+                                ? config.getConfig(Constants.ENV)
+                                : ConfigFactory.empty(),
+                        SHADE_IDENTIFIER_OPTION,
+                        DEFAULT_SHADE.getIdentifier());
+        return encryptConfig(identifier, config);
+    }
+
+    public static Config decryptConfig(String identifier, Config config) {
+        return processConfig(identifier, config, true);
+    }
+
+    public static Config encryptConfig(String identifier, Config config) {
+        return processConfig(identifier, config, false);
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Config processConfig(String identifier, Config config, boolean isDecrypted) {
+        ConfigShade configShade = CONFIG_SHADES.getOrDefault(identifier, DEFAULT_SHADE);
+        List<String> sensitiveOptions = new ArrayList<>(Arrays.asList(DEFAULT_SENSITIVE_OPTIONS));
+        sensitiveOptions.addAll(Arrays.asList(configShade.sensitiveOptions()));
+        BiFunction<String, Object, String> processFunction =
+                (key, value) -> {
+                    if (isDecrypted) {
+                        return configShade.decrypt(value.toString());
+                    } else {
+                        return configShade.encrypt(value.toString());
+                    }
+                };
+        String jsonString = config.root().render(ConfigRenderOptions.concise());
+        ObjectNode jsonNodes = JsonUtils.parseObject(jsonString);
+        Map<String, Object> configMap = JsonUtils.toMap(jsonNodes);
+        List<Map<String, Object>> sources =
+                (ArrayList<Map<String, Object>>) configMap.get(Constants.SOURCE);
+        List<Map<String, Object>> sinks =
+                (ArrayList<Map<String, Object>>) configMap.get(Constants.SINK);
+        sources.forEach(
+                source -> {
+                    for (String sensitiveOption : sensitiveOptions) {
+                        source.computeIfPresent(sensitiveOption, processFunction);
+                    }
+                });
+        sinks.forEach(
+                sink -> {
+                    for (String sensitiveOption : sensitiveOptions) {
+                        sink.computeIfPresent(sensitiveOption, processFunction);
+                    }
+                });
+        configMap.put(Constants.SOURCE, sources);
+        configMap.put(Constants.SINK, sinks);
+        return ConfigFactory.parseMap(configMap);
+    }
+
+    public static class Base64ConfigShade implements ConfigShade {
+
+        private static final Base64.Encoder ENCODER = Base64.getEncoder();
+
+        private static final Base64.Decoder DECODER = Base64.getDecoder();
+
+        private static final String IDENTIFIER = "base64";
+
+        @Override
+        public String getIdentifier() {
+            return IDENTIFIER;
+        }
+
+        @Override
+        public String encrypt(String content) {
+            return ENCODER.encodeToString(content.getBytes(StandardCharsets.UTF_8));
+        }
+
+        @Override
+        public String decrypt(String content) {
+            return new String(DECODER.decode(content));
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf b/seatunnel-core/seatunnel-core-starter/src/main/resources/META-INF/services/org.apache.seatunnel.api.configuration.ConfigShade
similarity index 55%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
copy to seatunnel-core/seatunnel-core-starter/src/main/resources/META-INF/services/org.apache.seatunnel.api.configuration.ConfigShade
index bdb29e187..3d4392864 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
+++ b/seatunnel-core/seatunnel-core-starter/src/main/resources/META-INF/services/org.apache.seatunnel.api.configuration.ConfigShade
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -6,43 +5,12 @@
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
 #
-#    http://www.apache.org/licenses/LICENSE-2.0
+#     http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
-
-env {
-    execution.parallelism = 1
-    job.mode = "BATCH"
-
-    #spark config
-    spark.app.name = "SeaTunnel"
-    spark.executor.instances = 2
-    spark.executor.cores = 1
-    spark.executor.memory = "1g"
-    spark.master = local
-}
-
-source {
-    Redis {
-        host = "redis-e2e"
-        port = 6379
-        auth = "SeaTunnel"
-        keys = "key_test*"
-        data_type = key
-    }
-}
 
-sink {
-    Redis {
-        host = "redis-e2e"
-        port = 6379
-        auth = "SeaTunnel"
-        key = "key_list"
-        data_type = list
-    }
-}
\ No newline at end of file
+org.apache.seatunnel.core.starter.utils.ConfigShadeUtils$Base64ConfigShade
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/command/ConfDecryptCommandTest.java b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/command/ConfDecryptCommandTest.java
new file mode 100644
index 000000000..b23d978eb
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/command/ConfDecryptCommandTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.command;
+
+import org.apache.seatunnel.common.config.DeployMode;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class ConfDecryptCommandTest {
+
+    public static Path getFilePath(String path) throws URISyntaxException {
+        URL resource = ConfDecryptCommandTest.class.getResource(path);
+        Assertions.assertNotNull(resource);
+        return Paths.get(resource.toURI());
+    }
+
+    @Test
+    public void testEncrypt() throws URISyntaxException {
+        TestCommandArgs testCommandArgs = new TestCommandArgs();
+        Path filePath = getFilePath("/shade.conf");
+        testCommandArgs.setDecrypt(true);
+        testCommandArgs.setConfigFile(filePath.toString());
+        ConfDecryptCommand confDecryptCommand = new ConfDecryptCommand(testCommandArgs);
+        confDecryptCommand.execute();
+    }
+
+    public static class TestCommandArgs extends AbstractCommandArgs {
+
+        @Override
+        public DeployMode getDeployMode() {
+            return null;
+        }
+
+        @Override
+        public Command<?> buildCommand() {
+            return null;
+        }
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/command/ConfEncryptCommandTest.java b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/command/ConfEncryptCommandTest.java
new file mode 100644
index 000000000..d3b6837e2
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/command/ConfEncryptCommandTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.command;
+
+import org.apache.seatunnel.common.config.DeployMode;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class ConfEncryptCommandTest {
+
+    public static Path getFilePath(String path) throws URISyntaxException {
+        URL resource = ConfEncryptCommandTest.class.getResource(path);
+        Assertions.assertNotNull(resource);
+        return Paths.get(resource.toURI());
+    }
+
+    @Test
+    public void testEncrypt() throws URISyntaxException {
+        TestCommandArgs testCommandArgs = new TestCommandArgs();
+        Path filePath = getFilePath("/origin.conf");
+        testCommandArgs.setEncrypt(true);
+        testCommandArgs.setConfigFile(filePath.toString());
+        ConfEncryptCommand confEncryptCommand = new ConfEncryptCommand(testCommandArgs);
+        confEncryptCommand.execute();
+    }
+
+    public static class TestCommandArgs extends AbstractCommandArgs {
+
+        @Override
+        public DeployMode getDeployMode() {
+            return null;
+        }
+
+        @Override
+        public Command<?> buildCommand() {
+            return null;
+        }
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java
new file mode 100644
index 000000000..e25781b91
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/src/test/java/org/apache/seatunnel/core/starter/utils/ConfigShadeTest.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.utils;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.seatunnel.api.configuration.ConfigShade;
+import org.apache.seatunnel.common.utils.JsonUtils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static org.apache.seatunnel.core.starter.utils.ConfigBuilder.CONFIG_RENDER_OPTIONS;
+
+@Slf4j
+public class ConfigShadeTest {
+
+    private static final String USERNAME = "seatunnel";
+
+    private static final String PASSWORD = "seatunnel_password";
+
+    @Test
+    public void testParseConfig() throws URISyntaxException {
+        URL resource = ConfigShadeTest.class.getResource("/config.shade.conf");
+        Assertions.assertNotNull(resource);
+        Config config = ConfigBuilder.of(Paths.get(resource.toURI()));
+        Config fields =
+                config.getConfigList("source").get(0).getConfig("schema").getConfig("fields");
+        log.info("Schema fields: {}", fields.root().render(CONFIG_RENDER_OPTIONS));
+        ObjectNode jsonNodes = JsonUtils.parseObject(fields.root().render(CONFIG_RENDER_OPTIONS));
+        List<String> field = new ArrayList<>();
+        jsonNodes.fieldNames().forEachRemaining(field::add);
+        Assertions.assertEquals(field.size(), jsonNodes.size());
+        Assertions.assertEquals(field.get(0), "name");
+        Assertions.assertEquals(field.get(1), "age");
+        Assertions.assertEquals(field.get(2), "sex");
+        log.info("Decrypt config: {}", config.root().render(CONFIG_RENDER_OPTIONS));
+        Assertions.assertEquals(
+                config.getConfigList("source").get(0).getString("username"), USERNAME);
+        Assertions.assertEquals(
+                config.getConfigList("source").get(0).getString("password"), PASSWORD);
+    }
+
+    @Test
+    public void testDecryptAndEncrypt() {
+        String encryptUsername = ConfigShadeUtils.encryptOption("base64", USERNAME);
+        String decryptUsername = ConfigShadeUtils.decryptOption("base64", encryptUsername);
+        String encryptPassword = ConfigShadeUtils.encryptOption("base64", PASSWORD);
+        String decryptPassword = ConfigShadeUtils.decryptOption("base64", encryptPassword);
+        Assertions.assertEquals("c2VhdHVubmVs", encryptUsername);
+        Assertions.assertEquals("c2VhdHVubmVsX3Bhc3N3b3Jk", encryptPassword);
+        Assertions.assertEquals(decryptUsername, USERNAME);
+        Assertions.assertEquals(decryptPassword, PASSWORD);
+    }
+
+    public static class Base64ConfigShade implements ConfigShade {
+
+        private static final Base64.Encoder ENCODER = Base64.getEncoder();
+
+        private static final Base64.Decoder DECODER = Base64.getDecoder();
+
+        private static final String IDENTIFIER = "base64";
+
+        @Override
+        public String getIdentifier() {
+            return IDENTIFIER;
+        }
+
+        @Override
+        public String encrypt(String content) {
+            return ENCODER.encodeToString(content.getBytes(StandardCharsets.UTF_8));
+        }
+
+        @Override
+        public String decrypt(String content) {
+            return new String(DECODER.decode(content));
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf b/seatunnel-core/seatunnel-core-starter/src/test/resources/META-INF/services/org.apache.seatunnel.api.configuration.ConfigShade
similarity index 55%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
copy to seatunnel-core/seatunnel-core-starter/src/test/resources/META-INF/services/org.apache.seatunnel.api.configuration.ConfigShade
index bdb29e187..6d7378028 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
+++ b/seatunnel-core/seatunnel-core-starter/src/test/resources/META-INF/services/org.apache.seatunnel.api.configuration.ConfigShade
@@ -1,4 +1,3 @@
-#
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
 # this work for additional information regarding copyright ownership.
@@ -6,43 +5,12 @@
 # (the "License"); you may not use this file except in compliance with
 # the License.  You may obtain a copy of the License at
 #
-#    http://www.apache.org/licenses/LICENSE-2.0
+#     http://www.apache.org/licenses/LICENSE-2.0
 #
 # Unless required by applicable law or agreed to in writing, software
 # distributed under the License is distributed on an "AS IS" BASIS,
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-#
-
-env {
-    execution.parallelism = 1
-    job.mode = "BATCH"
-
-    #spark config
-    spark.app.name = "SeaTunnel"
-    spark.executor.instances = 2
-    spark.executor.cores = 1
-    spark.executor.memory = "1g"
-    spark.master = local
-}
-
-source {
-    Redis {
-        host = "redis-e2e"
-        port = 6379
-        auth = "SeaTunnel"
-        keys = "key_test*"
-        data_type = key
-    }
-}
 
-sink {
-    Redis {
-        host = "redis-e2e"
-        port = 6379
-        auth = "SeaTunnel"
-        key = "key_list"
-        data_type = list
-    }
-}
\ No newline at end of file
+org.apache.seatunnel.core.starter.utils.ConfigShadeTest$Base64ConfigShade
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-core-starter/src/test/resources/config.shade.conf b/seatunnel-core/seatunnel-core-starter/src/test/resources/config.shade.conf
new file mode 100644
index 000000000..e90aca5cc
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/src/test/resources/config.shade.conf
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  shade.identifier = "base64"
+}
+
+source {
+  MySQL-CDC {
+    schema {
+      fields {
+        name = string
+        age = int
+        sex = boolean
+      }
+    }
+    result_table_name = "fake"
+    parallelism = 1
+    server-id = 5656
+    port = 56725
+    hostname = "127.0.0.1"
+    username = "c2VhdHVubmVs"
+    password = "c2VhdHVubmVsX3Bhc3N3b3Jk"
+    database-name = "inventory_vwyw0n"
+    table-name = "products"
+    base-url = "jdbc:mysql://localhost:56725"
+  }
+}
+
+transform {
+}
+
+sink {
+  # choose stdout output plugin to output data to console
+  Clickhouse {
+    host = "localhost:8123"
+    database = "default"
+    table = "fake_all"
+    username = "c2VhdHVubmVs"
+    password = "c2VhdHVubmVsX3Bhc3N3b3Jk"
+
+    # cdc options
+    primary_key = "id"
+    support_upsert = true
+  }
+}
diff --git a/seatunnel-core/seatunnel-core-starter/src/test/resources/log4j2.properties b/seatunnel-core/seatunnel-core-starter/src/test/resources/log4j2.properties
new file mode 100644
index 000000000..2dc1b8ca5
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/src/test/resources/log4j2.properties
@@ -0,0 +1,42 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+rootLogger.level = INFO
+
+rootLogger.appenderRef.consoleStdout.ref = consoleStdoutAppender
+rootLogger.appenderRef.consoleStderr.ref = consoleStderrAppender
+
+appender.consoleStdout.name = consoleStdoutAppender
+appender.consoleStdout.type = CONSOLE
+appender.consoleStdout.target = SYSTEM_OUT
+appender.consoleStdout.layout.type = PatternLayout
+appender.consoleStdout.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n
+appender.consoleStdout.filter.acceptLtWarn.type = ThresholdFilter
+appender.consoleStdout.filter.acceptLtWarn.level = WARN
+appender.consoleStdout.filter.acceptLtWarn.onMatch = DENY
+appender.consoleStdout.filter.acceptLtWarn.onMismatch = ACCEPT
+
+appender.consoleStderr.name = consoleStderrAppender
+appender.consoleStderr.type = CONSOLE
+appender.consoleStderr.target = SYSTEM_ERR
+appender.consoleStderr.layout.type = PatternLayout
+appender.consoleStderr.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %c - %m%n
+appender.consoleStderr.filter.acceptGteWarn.type = ThresholdFilter
+appender.consoleStderr.filter.acceptGteWarn.level = WARN
+appender.consoleStderr.filter.acceptGteWarn.onMatch = ACCEPT
+appender.consoleStderr.filter.acceptGteWarn.onMismatch = DENY
diff --git a/seatunnel-core/seatunnel-core-starter/src/test/resources/origin.conf b/seatunnel-core/seatunnel-core-starter/src/test/resources/origin.conf
new file mode 100644
index 000000000..b3eb98566
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/src/test/resources/origin.conf
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  shade.identifier = "base64"
+}
+
+source {
+  MySQL-CDC {
+    schema {
+      fields {
+        name = string
+        age = int
+        sex = boolean
+      }
+    }
+    result_table_name = "fake"
+    parallelism = 1
+    server-id = 5656
+    port = 56725
+    hostname = "127.0.0.1"
+    username = "seatunnel"
+    password = "seatunnel_password"
+    database-name = "inventory_vwyw0n"
+    table-name = "products"
+    base-url = "jdbc:mysql://localhost:56725"
+  }
+}
+
+transform {
+}
+
+sink {
+  # choose stdout output plugin to output data to console
+  Clickhouse {
+    host = "localhost:8123"
+    database = "default"
+    table = "fake_all"
+    username = "seatunnel"
+    password = "seatunnel_password"
+
+    # cdc options
+    primary_key = "id"
+    support_upsert = true
+  }
+}
diff --git a/seatunnel-core/seatunnel-core-starter/src/test/resources/shade.conf b/seatunnel-core/seatunnel-core-starter/src/test/resources/shade.conf
new file mode 100644
index 000000000..e90aca5cc
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/src/test/resources/shade.conf
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  shade.identifier = "base64"
+}
+
+source {
+  MySQL-CDC {
+    schema {
+      fields {
+        name = string
+        age = int
+        sex = boolean
+      }
+    }
+    result_table_name = "fake"
+    parallelism = 1
+    server-id = 5656
+    port = 56725
+    hostname = "127.0.0.1"
+    username = "c2VhdHVubmVs"
+    password = "c2VhdHVubmVsX3Bhc3N3b3Jk"
+    database-name = "inventory_vwyw0n"
+    table-name = "products"
+    base-url = "jdbc:mysql://localhost:56725"
+  }
+}
+
+transform {
+}
+
+sink {
+  # choose stdout output plugin to output data to console
+  Clickhouse {
+    host = "localhost:8123"
+    database = "default"
+    table = "fake_all"
+    username = "c2VhdHVubmVs"
+    password = "c2VhdHVubmVsX3Bhc3N3b3Jk"
+
+    # cdc options
+    primary_key = "id"
+    support_upsert = true
+  }
+}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index e92ae052a..078c29bbf 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -79,6 +79,14 @@ public class FlinkStarter implements Starter {
         // set job name
         command.add("--name");
         command.add(flinkCommandArgs.getJobName());
+        // set encryption
+        if (flinkCommandArgs.isEncrypt()) {
+            command.add("--encrypt");
+        }
+        // set decryption
+        if (flinkCommandArgs.isDecrypt()) {
+            command.add("--decrypt");
+        }
         // set extra system properties
         flinkCommandArgs.getVariables().stream()
                 .filter(Objects::nonNull)
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
index 488bcfce4..ff098b9df 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
 import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.command.ConfDecryptCommand;
+import org.apache.seatunnel.core.starter.command.ConfEncryptCommand;
 import org.apache.seatunnel.core.starter.enums.MasterType;
 import org.apache.seatunnel.core.starter.flink.command.FlinkConfValidateCommand;
 import org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand;
@@ -56,9 +58,14 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
         Common.setDeployMode(getDeployMode());
         if (checkConfig) {
             return new FlinkConfValidateCommand(this);
-        } else {
-            return new FlinkTaskExecuteCommand(this);
         }
+        if (encrypt) {
+            return new ConfEncryptCommand(this);
+        }
+        if (decrypt) {
+            return new ConfDecryptCommand(this);
+        }
+        return new FlinkTaskExecuteCommand(this);
     }
 
     @Override
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index 9cf237566..2f9021c68 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -79,6 +79,14 @@ public class FlinkStarter implements Starter {
         // set job name
         command.add("--name");
         command.add(flinkCommandArgs.getJobName());
+        // set encryption
+        if (flinkCommandArgs.isEncrypt()) {
+            command.add("--encrypt");
+        }
+        // set decryption
+        if (flinkCommandArgs.isDecrypt()) {
+            command.add("--decrypt");
+        }
         // set extra system properties
         flinkCommandArgs.getVariables().stream()
                 .filter(Objects::nonNull)
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
index 488bcfce4..ff098b9df 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
 import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.command.ConfDecryptCommand;
+import org.apache.seatunnel.core.starter.command.ConfEncryptCommand;
 import org.apache.seatunnel.core.starter.enums.MasterType;
 import org.apache.seatunnel.core.starter.flink.command.FlinkConfValidateCommand;
 import org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand;
@@ -56,9 +58,14 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
         Common.setDeployMode(getDeployMode());
         if (checkConfig) {
             return new FlinkConfValidateCommand(this);
-        } else {
-            return new FlinkTaskExecuteCommand(this);
         }
+        if (encrypt) {
+            return new ConfEncryptCommand(this);
+        }
+        if (decrypt) {
+            return new ConfDecryptCommand(this);
+        }
+        return new FlinkTaskExecuteCommand(this);
     }
 
     @Override
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 11e8050b2..c187cceb6 100644
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -209,6 +209,12 @@ public class SparkStarter implements Starter {
         appendOption(commands, "--master", this.commandArgs.getMaster());
         appendOption(commands, "--deploy-mode", this.commandArgs.getDeployMode().getDeployMode());
         appendOption(commands, "--name", this.commandArgs.getJobName());
+        if (commandArgs.isEncrypt()) {
+            commands.add("--encrypt");
+        }
+        if (commandArgs.isDecrypt()) {
+            commands.add("--decrypt");
+        }
         if (this.commandArgs.isCheckConfig()) {
             commands.add("--check");
         }
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
index aca777138..841383927 100644
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
+++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
 import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.command.ConfDecryptCommand;
+import org.apache.seatunnel.core.starter.command.ConfEncryptCommand;
 import org.apache.seatunnel.core.starter.spark.command.SparkConfValidateCommand;
 import org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand;
 
@@ -54,9 +56,14 @@ public class SparkCommandArgs extends AbstractCommandArgs {
         Common.setDeployMode(getDeployMode());
         if (checkConfig) {
             return new SparkConfValidateCommand(this);
-        } else {
-            return new SparkTaskExecuteCommand(this);
         }
+        if (encrypt) {
+            return new ConfEncryptCommand(this);
+        }
+        if (decrypt) {
+            return new ConfDecryptCommand(this);
+        }
+        return new SparkTaskExecuteCommand(this);
     }
 
     public static class SparkDeployModeConverter implements IStringConverter<DeployMode> {
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 9ecd3bdbc..16a3dacad 100644
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -209,6 +209,12 @@ public class SparkStarter implements Starter {
         appendOption(commands, "--master", this.commandArgs.getMaster());
         appendOption(commands, "--deploy-mode", this.commandArgs.getDeployMode().getDeployMode());
         appendOption(commands, "--name", this.commandArgs.getJobName());
+        if (commandArgs.isEncrypt()) {
+            commands.add("--encrypt");
+        }
+        if (commandArgs.isDecrypt()) {
+            commands.add("--decrypt");
+        }
         if (this.commandArgs.isCheckConfig()) {
             commands.add("--check");
         }
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
index aca777138..841383927 100644
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
+++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
 import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.command.ConfDecryptCommand;
+import org.apache.seatunnel.core.starter.command.ConfEncryptCommand;
 import org.apache.seatunnel.core.starter.spark.command.SparkConfValidateCommand;
 import org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand;
 
@@ -54,9 +56,14 @@ public class SparkCommandArgs extends AbstractCommandArgs {
         Common.setDeployMode(getDeployMode());
         if (checkConfig) {
             return new SparkConfValidateCommand(this);
-        } else {
-            return new SparkTaskExecuteCommand(this);
         }
+        if (encrypt) {
+            return new ConfEncryptCommand(this);
+        }
+        if (decrypt) {
+            return new ConfDecryptCommand(this);
+        }
+        return new SparkTaskExecuteCommand(this);
     }
 
     public static class SparkDeployModeConverter implements IStringConverter<DeployMode> {
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index b0022c6c1..964ff5380 100644
--- a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
 import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.command.ConfDecryptCommand;
+import org.apache.seatunnel.core.starter.command.ConfEncryptCommand;
 import org.apache.seatunnel.core.starter.enums.MasterType;
 import org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand;
 import org.apache.seatunnel.core.starter.seatunnel.command.SeaTunnelConfValidateCommand;
@@ -91,9 +93,14 @@ public class ClientCommandArgs extends AbstractCommandArgs {
         Common.setDeployMode(getDeployMode());
         if (checkConfig) {
             return new SeaTunnelConfValidateCommand(this);
-        } else {
-            return new ClientExecuteCommand(this);
         }
+        if (encrypt) {
+            return new ConfEncryptCommand(this);
+        }
+        if (decrypt) {
+            return new ConfDecryptCommand(this);
+        }
+        return new ClientExecuteCommand(this);
     }
 
     public DeployMode getDeployMode() {
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
index bdb29e187..5cd26838a 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-redis-e2e/src/test/resources/redis-to-redis.conf
@@ -18,6 +18,7 @@
 env {
     execution.parallelism = 1
     job.mode = "BATCH"
+    shade.identifier = "base64"
 
     #spark config
     spark.app.name = "SeaTunnel"
@@ -31,7 +32,7 @@ source {
     Redis {
         host = "redis-e2e"
         port = 6379
-        auth = "SeaTunnel"
+        auth = "U2VhVHVubmVs"
         keys = "key_test*"
         data_type = key
     }
@@ -41,7 +42,7 @@ sink {
     Redis {
         host = "redis-e2e"
         port = 6379
-        auth = "SeaTunnel"
+        auth = "U2VhVHVubmVs"
         key = "key_list"
         data_type = list
     }
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 3f40a9de2..7db63d3db 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -22,8 +22,6 @@ protostuff-collectionschema-1.8.0.jar
 protostuff-core-1.8.0.jar
 protostuff-runtime-1.8.0.jar
 scala-library-2.11.12.jar
-seatunnel-config-base-2.1.1.jar
-seatunnel-config-shade-2.1.1.jar
 seatunnel-jackson-2.3.1-SNAPSHOT-optional.jar
 slf4j-api-1.7.25.jar
 jsqlparser-4.5.jar