You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/04/27 05:37:58 UTC

[incubator-seatunnel] branch dev updated: [Bug][seatunnel-core-spark] spark on yarn cluster mode can not get --config/-c file (#1747)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new aa8f6aa2 [Bug][seatunnel-core-spark] spark on yarn cluster mode can not get --config/-c file (#1747)
aa8f6aa2 is described below

commit aa8f6aa2a198ca0ff5af2d20de8b3001e96f7bc6
Author: whb-bigdata <70...@users.noreply.github.com>
AuthorDate: Wed Apr 27 13:37:53 2022 +0800

    [Bug][seatunnel-core-spark] spark on yarn cluster mode can not get --config/-c file (#1747)
    
    * fix yarn can't get config file
    * Optimize read conf in cluster/client mode
    Co-authored-by: ruanwenjun <we...@apache.org>
---
 .../apache/seatunnel/common/config/DeployMode.java |  5 +-
 ...ModeValidator.java => DeployModeConverter.java} | 17 +++---
 .../apache/seatunnel/command/SparkCommandArgs.java | 16 ++---
 .../command/flink/FlinkConfValidateCommand.java    |  5 +-
 .../command/flink/FlinkTaskExecuteCommand.java     |  4 +-
 .../command/spark/SparkConfValidateCommand.java    | 11 +---
 .../command/spark/SparkTaskExecuteCommand.java     |  4 +-
 .../org/apache/seatunnel/config/ConfigBuilder.java | 10 ++--
 .../java/org/apache/seatunnel/utils/FileUtils.java | 70 ++++++++++++++++++++++
 .../org/apache/seatunnel/utils/FileUtilsTest.java  | 34 ++++++-----
 .../src/test/resources/flink.batch.conf            | 56 +++++++++++++++++
 .../seatunnel/example/spark/LocalSparkExample.java |  2 +-
 12 files changed, 184 insertions(+), 50 deletions(-)

diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
index ce621ac5..efdebcc8 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.common.config;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 public enum DeployMode {
     CLIENT("client"),
@@ -39,8 +40,8 @@ public enum DeployMode {
         return name;
     }
 
-    public static DeployMode from(String name) {
-        return NAME_MAP.get(name);
+    public static Optional<DeployMode> from(String name) {
+        return Optional.ofNullable(NAME_MAP.get(name));
     }
 
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/DeployModeValidator.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/DeployModeConverter.java
similarity index 67%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/DeployModeValidator.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/DeployModeConverter.java
index de1c2079..ff7cc76f 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/DeployModeValidator.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/DeployModeConverter.java
@@ -17,17 +17,18 @@
 
 package org.apache.seatunnel.command;
 
-import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
 
-import com.beust.jcommander.IParameterValidator;
+import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.ParameterException;
 
-public class DeployModeValidator implements IParameterValidator {
+import java.util.Optional;
+
+public class DeployModeConverter implements IStringConverter<DeployMode> {
+
     @Override
-    public void validate(String name, String value)
-        throws ParameterException {
-        if (!Common.isModeAllowed(value)) {
-            throw new ParameterException("deploy-mode: " + value + " is not allowed.");
-        }
+    public DeployMode convert(String value) {
+        Optional<DeployMode> deployMode = DeployMode.from(value);
+        return deployMode.orElseThrow(() -> new ParameterException("deploy-mode: " + value + " is not allowed."));
     }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/SparkCommandArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/SparkCommandArgs.java
index 51ab57f9..e03faa00 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/SparkCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/SparkCommandArgs.java
@@ -27,19 +27,14 @@ public class SparkCommandArgs extends AbstractCommandArgs {
     @Parameter(names = {"-e", "--deploy-mode"},
         description = "Spark deploy mode",
         required = true,
-        validateWith = DeployModeValidator.class)
-    private String deployMode;
+        converter = DeployModeConverter.class)
+    private DeployMode deployMode;
 
     @Parameter(names = {"-m", "--master"},
         description = "Spark master",
         required = true)
     private String master = null;
 
-    @Override
-    public DeployMode getDeployMode() {
-        return DeployMode.from(deployMode);
-    }
-
     public String getMaster() {
         return master;
     }
@@ -49,7 +44,12 @@ public class SparkCommandArgs extends AbstractCommandArgs {
         return EngineType.SPARK;
     }
 
-    public void setDeployMode(String deployMode) {
+    @Override
+    public DeployMode getDeployMode() {
+        return deployMode;
+    }
+
+    public void setDeployMode(DeployMode deployMode) {
         this.deployMode = deployMode;
     }
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
index c05b4ce1..85ac71b2 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
@@ -21,10 +21,13 @@ import org.apache.seatunnel.command.Command;
 import org.apache.seatunnel.command.FlinkCommandArgs;
 import org.apache.seatunnel.config.ConfigBuilder;
 import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.utils.FileUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.nio.file.Path;
+
 /**
  * Used to check the Flink conf is validated.
  */
@@ -34,7 +37,7 @@ public class FlinkConfValidateCommand implements Command<FlinkCommandArgs> {
 
     @Override
     public void execute(FlinkCommandArgs flinkCommandArgs) {
-        String configPath = flinkCommandArgs.getConfigFile();
+        Path configPath = FileUtils.getConfigPath(flinkCommandArgs);
         new ConfigBuilder<FlinkEnvironment>(configPath, flinkCommandArgs.getEngineType()).checkConfig();
         LOGGER.info("config OK !");
     }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
index 97f0f882..d6863467 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
@@ -28,9 +28,11 @@ import org.apache.seatunnel.config.ExecutionContext;
 import org.apache.seatunnel.config.ExecutionFactory;
 import org.apache.seatunnel.env.Execution;
 import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.utils.FileUtils;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import java.nio.file.Path;
 import java.util.List;
 
 /**
@@ -41,7 +43,7 @@ public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommand
     @Override
     public void execute(FlinkCommandArgs flinkCommandArgs) {
         EngineType engine = flinkCommandArgs.getEngineType();
-        String configFile = flinkCommandArgs.getConfigFile();
+        Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
 
         Config config = new ConfigBuilder<>(configFile, engine).getConfig();
         ExecutionContext<FlinkEnvironment> executionContext = new ExecutionContext<>(config, engine);
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
index 6aa3f948..73aafe7b 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
@@ -19,14 +19,14 @@ package org.apache.seatunnel.command.spark;
 
 import org.apache.seatunnel.command.Command;
 import org.apache.seatunnel.command.SparkCommandArgs;
-import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.config.ConfigBuilder;
 import org.apache.seatunnel.spark.SparkEnvironment;
+import org.apache.seatunnel.utils.FileUtils;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.file.Paths;
+import java.nio.file.Path;
 
 /**
  * Used to validate the spark task conf is validated.
@@ -37,12 +37,7 @@ public class SparkConfValidateCommand implements Command<SparkCommandArgs> {
 
     @Override
     public void execute(SparkCommandArgs sparkCommandArgs) {
-        String confPath;
-        if (DeployMode.CLUSTER.equals(sparkCommandArgs.getDeployMode())) {
-            confPath = Paths.get(sparkCommandArgs.getConfigFile()).getFileName().toString();
-        } else {
-            confPath = sparkCommandArgs.getConfigFile();
-        }
+        Path confPath = FileUtils.getConfigPath(sparkCommandArgs);
         new ConfigBuilder<SparkEnvironment>(confPath, sparkCommandArgs.getEngineType()).checkConfig();
         LOGGER.info("config OK !");
     }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
index f1cfc2b6..3c58da84 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
@@ -28,9 +28,11 @@ import org.apache.seatunnel.config.ExecutionContext;
 import org.apache.seatunnel.config.ExecutionFactory;
 import org.apache.seatunnel.env.Execution;
 import org.apache.seatunnel.spark.SparkEnvironment;
+import org.apache.seatunnel.utils.FileUtils;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import java.nio.file.Path;
 import java.util.List;
 
 public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommandArgs, SparkEnvironment> {
@@ -38,7 +40,7 @@ public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommand
     @Override
     public void execute(SparkCommandArgs sparkCommandArgs) {
         EngineType engine = sparkCommandArgs.getEngineType();
-        String confFile = sparkCommandArgs.getConfigFile();
+        Path confFile = FileUtils.getConfigPath(sparkCommandArgs);
 
         Config config = new ConfigBuilder<>(confFile, engine).getConfig();
         ExecutionContext<SparkEnvironment> executionContext = new ExecutionContext<>(config, engine);
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
index 3e870205..4e9d05b9 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
@@ -28,7 +28,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
+import java.nio.file.Path;
 
 /**
  * Used to build the {@link  Config} from file.
@@ -40,11 +40,11 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
     private static final Logger LOGGER = LoggerFactory.getLogger(ConfigBuilder.class);
 
     private static final String PLUGIN_NAME_KEY = "plugin_name";
-    private final String configFile;
+    private final Path configFile;
     private final EngineType engine;
     private final Config config;
 
-    public ConfigBuilder(String configFile, EngineType engine) {
+    public ConfigBuilder(Path configFile, EngineType engine) {
         this.configFile = configFile;
         this.engine = engine;
         this.config = load();
@@ -52,7 +52,7 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
 
     private Config load() {
 
-        if (configFile.isEmpty()) {
+        if (configFile == null) {
             throw new ConfigRuntimeException("Please specify config file");
         }
 
@@ -61,7 +61,7 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
         // variables substitution / variables resolution order:
         // config file --> system environment --> java properties
         Config config = ConfigFactory
-            .parseFile(new File(configFile))
+            .parseFile(configFile.toFile())
             .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
             .resolveWith(ConfigFactory.systemProperties(),
                 ConfigResolveOptions.defaults().setAllowUnresolved(true));
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/FileUtils.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/FileUtils.java
new file mode 100644
index 00000000..cd2d4975
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/FileUtils.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.utils;
+
+import org.apache.seatunnel.command.AbstractCommandArgs;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class FileUtils {
+
+    private FileUtils() {
+        throw new UnsupportedOperationException("This class cannot be instantiated");
+    }
+
+    /**
+     * Get the seatunnel config path.
+     * In client mode, the path to the config file is directly given by user.
+     * In cluster mode, the path to the config file is the `executor path/config file name`.
+     *
+     * @param args args
+     * @return path of the seatunnel config file.
+     */
+    public static Path getConfigPath(AbstractCommandArgs args) {
+        checkNotNull(args, "args");
+        checkNotNull(args.getDeployMode(), "deploy mode");
+        switch (args.getDeployMode()) {
+            case CLIENT:
+                return Paths.get(args.getConfigFile());
+            case CLUSTER:
+                return Paths.get(getFileName(args.getConfigFile()));
+            default:
+                throw new IllegalArgumentException("Unsupported deploy mode: " + args.getDeployMode());
+        }
+    }
+
+    /**
+     * Get the file name from the given path.
+     * e.g. seatunnel/conf/config.conf -> config.conf
+     *
+     * @param filePath the path to the file
+     * @return file name
+     */
+    private static String getFileName(String filePath) {
+        checkNotNull(filePath, "file path");
+        return filePath.substring(filePath.lastIndexOf(File.separatorChar) + 1);
+    }
+
+    private static <T> void checkNotNull(T arg, String argName) {
+        if (arg == null) {
+            throw new IllegalArgumentException(argName + " cannot be null");
+        }
+    }
+}
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/FileUtilsTest.java
similarity index 51%
copy from seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
copy to seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/FileUtilsTest.java
index 520fa2b5..59e71677 100644
--- a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
+++ b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/FileUtilsTest.java
@@ -15,27 +15,31 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.example.spark;
+package org.apache.seatunnel.utils;
 
-import org.apache.seatunnel.Seatunnel;
 import org.apache.seatunnel.command.SparkCommandArgs;
 import org.apache.seatunnel.common.config.DeployMode;
 
-public class LocalSparkExample {
+import org.junit.Assert;
+import org.junit.Test;
 
-    public static final String TEST_RESOURCE_DIR = "/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/";
+import java.net.URISyntaxException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 
-    public static void main(String[] args) {
-        String configFile = getTestConfigFile("spark.batch.conf");
-        SparkCommandArgs sparkArgs = new SparkCommandArgs();
-        sparkArgs.setConfigFile(configFile);
-        sparkArgs.setCheckConfig(false);
-        sparkArgs.setVariables(null);
-        sparkArgs.setDeployMode(DeployMode.CLIENT.getName());
-        Seatunnel.run(sparkArgs);
-    }
+public class FileUtilsTest {
+
+    @Test
+    public void getConfigPath() throws URISyntaxException {
+        // test client mode.
+        SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
+        sparkCommandArgs.setDeployMode(DeployMode.CLIENT);
+        Path expectConfPath = Paths.get(FileUtilsTest.class.getResource("/flink.batch.conf").toURI());
+        sparkCommandArgs.setConfigFile(expectConfPath.toString());
+        Assert.assertEquals(expectConfPath, FileUtils.getConfigPath(sparkCommandArgs));
 
-    public static String getTestConfigFile(String configFile) {
-        return System.getProperty("user.dir") + TEST_RESOURCE_DIR + configFile;
+        // test cluster mode
+        sparkCommandArgs.setDeployMode(DeployMode.CLUSTER);
+        Assert.assertEquals("flink.batch.conf", FileUtils.getConfigPath(sparkCommandArgs).toString());
     }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/test/resources/flink.batch.conf b/seatunnel-core/seatunnel-core-base/src/test/resources/flink.batch.conf
new file mode 100644
index 00000000..45b3e853
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/test/resources/flink.batch.conf
@@ -0,0 +1,56 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of batch processing in SeaTunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+}
+
+source {
+  # This is a example input plugin **only for test and demonstrate the feature input plugin**
+  FileSource {
+    path = "hdfs://localhost:9000/output/text"
+    format.type = "text"
+    schema = "string"
+    result_table_name = "test"
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of input plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake
+}
+
+transform {
+  Sql {
+    sql = "select * from test"
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of filter plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
+}
+
+sink {
+  # choose stdout output plugin to output data to console
+  ConsoleSink {
+  }
+
+  # If you would like to get more information about how to configure seatunnel and see full list of output plugins,
+  # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
+}
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
index 520fa2b5..57ad918e 100644
--- a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
+++ b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
@@ -31,7 +31,7 @@ public class LocalSparkExample {
         sparkArgs.setConfigFile(configFile);
         sparkArgs.setCheckConfig(false);
         sparkArgs.setVariables(null);
-        sparkArgs.setDeployMode(DeployMode.CLIENT.getName());
+        sparkArgs.setDeployMode(DeployMode.CLIENT);
         Seatunnel.run(sparkArgs);
     }