You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/03/14 11:26:13 UTC
[incubator-seatunnel] branch dev updated: [Feature #1421][core] Use different command to execute task (#1422)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2ff6131 [Feature #1421][core] Use different command to execute task (#1422)
2ff6131 is described below
commit 2ff61311d2d04e43243399e1409c5c061b884b03
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Mon Mar 14 19:26:07 2022 +0800
[Feature #1421][core] Use different command to execute task (#1422)
---
.../apache/seatunnel/common/config/DeployMode.java | 11 ++
.../main/java/org/apache/seatunnel/Seatunnel.java | 158 +++------------------
.../seatunnel/command/BaseTaskExecuteCommand.java | 135 ++++++++++++++++++
.../java/org/apache/seatunnel/command/Command.java | 28 ++--
.../Engine.java => command/CommandArgs.java} | 17 +--
.../Engine.java => command/CommandBuilder.java} | 17 +--
.../apache/seatunnel/command/CommandFactory.java | 45 ++++++
.../{config => }/command/DeployModeValidator.java | 2 +-
.../FlinkCommandArgs.java} | 48 ++++---
.../SparkCommandArgs.java} | 57 +++++++-
.../command/flink/FlinkCommandBuilder.java | 21 ++-
.../flink/FlinkConfValidateCommand.java} | 27 ++--
.../command/flink/FlinkTaskExecuteCommand.java | 55 +++++++
.../command/spark/SparkCommandBuilder.java | 21 ++-
.../command/spark/SparkConfValidateCommand.java | 49 +++++++
.../command/spark/SparkTaskExecuteCommand.java | 53 +++++++
.../org/apache/seatunnel/config/ConfigBuilder.java | 6 +-
.../{utils/Engine.java => config/EngineType.java} | 12 +-
.../seatunnel/{utils => config}/PluginType.java | 4 +-
.../seatunnel/config/command/CommandLineArgs.java | 59 --------
.../command => utils}/CommandLineUtils.java | 31 ++--
.../seatunnel/command/CommandFactoryTest.java | 64 +++++++++
.../FlinkCommandArgsTest.java} | 8 +-
.../SparkCommandArgsTest.java} | 8 +-
.../command => utils}/CommandLineUtilsTest.java | 8 +-
.../java/org/apache/seatunnel/SeatunnelFlink.java | 10 +-
.../java/org/apache/seatunnel/SeatunnelSpark.java | 10 +-
.../apache/seatunnel/core/sql/SeatunnelSql.java | 6 +-
.../core/sql/SqlVariableSubstitutionTest.java | 6 +-
.../seatunnel/example/flink/LocalFlinkExample.java | 13 +-
.../seatunnel/example/spark/LocalSparkExample.java | 15 +-
...{spark.batch.conf.template => spark.batch.conf} | 4 +-
32 files changed, 639 insertions(+), 369 deletions(-)
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
index 44beef8..ce621ac 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
@@ -17,11 +17,18 @@
package org.apache.seatunnel.common.config;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
public enum DeployMode {
CLIENT("client"),
CLUSTER("cluster"),
;
+ private static final Map<String, DeployMode> NAME_MAP = Arrays.stream(DeployMode.values())
+ .collect(HashMap::new, (map, deployMode) -> map.put(deployMode.getName(), deployMode), Map::putAll);
+
private final String name;
DeployMode(String name) {
@@ -32,4 +39,8 @@ public enum DeployMode {
return name;
}
+ public static DeployMode from(String name) {
+ return NAME_MAP.get(name);
+ }
+
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java
index c11905b..e73187a 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/Seatunnel.java
@@ -17,155 +17,41 @@
package org.apache.seatunnel;
-import org.apache.seatunnel.apis.BaseSink;
-import org.apache.seatunnel.apis.BaseSource;
-import org.apache.seatunnel.apis.BaseTransform;
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.command.Command;
+import org.apache.seatunnel.command.CommandArgs;
+import org.apache.seatunnel.command.CommandFactory;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.ConfigRuntimeException;
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.config.ConfigBuilder;
-import org.apache.seatunnel.config.command.CommandLineArgs;
-import org.apache.seatunnel.env.Execution;
-import org.apache.seatunnel.env.RuntimeEnv;
-import org.apache.seatunnel.plugin.Plugin;
-import org.apache.seatunnel.utils.AsciiArtUtils;
-import org.apache.seatunnel.utils.CompressionUtils;
-import org.apache.seatunnel.utils.Engine;
-import org.apache.seatunnel.utils.PluginType;
-import org.apache.commons.compress.archivers.ArchiveException;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.Objects;
-import java.util.Optional;
-
public class Seatunnel {
private static final Logger LOGGER = LoggerFactory.getLogger(Seatunnel.class);
- public static void run(CommandLineArgs commandLineArgs, Engine engine) throws Exception {
- if (!Common.setDeployMode(commandLineArgs.getDeployMode())) {
- throw new IllegalArgumentException(
- String.format("Deploy mode: %s is Illegal", commandLineArgs.getDeployMode()));
- }
-
- String configFilePath = getConfigFilePath(commandLineArgs, engine);
- if (commandLineArgs.isTestConfig()) {
- new ConfigBuilder(configFilePath, engine).checkConfig();
- LOGGER.info("config OK !");
- } else {
- try {
- entryPoint(configFilePath, engine);
- } catch (ConfigRuntimeException e) {
- showConfigError(e);
- throw e;
- } catch (Exception e) {
- showFatalError(e);
- throw e;
- }
- }
- }
-
- private static String getConfigFilePath(CommandLineArgs cmdArgs, Engine engine) {
- String path = null;
- switch (engine) {
- case FLINK:
- path = cmdArgs.getConfigFile();
- break;
- case SPARK:
- final Optional<String> mode = Common.getDeployMode();
- if (mode.isPresent() && DeployMode.CLUSTER.getName().equals(mode.get())) {
- path = Paths.get(cmdArgs.getConfigFile()).getFileName().toString();
- } else {
- path = cmdArgs.getConfigFile();
- }
- break;
- default:
- break;
- }
- return path;
- }
-
- private static void entryPoint(String configFile, Engine engine) throws Exception {
-
- ConfigBuilder configBuilder = new ConfigBuilder(configFile, engine);
- List<BaseSource> sources = configBuilder.createPlugins(PluginType.SOURCE);
- List<BaseTransform> transforms = configBuilder.createPlugins(PluginType.TRANSFORM);
- List<BaseSink> sinks = configBuilder.createPlugins(PluginType.SINK);
- Execution execution = configBuilder.createExecution();
- baseCheckConfig(sources, transforms, sinks);
- prepare(configBuilder.getEnv(), sources, transforms, sinks);
- showAsciiLogo();
-
- execution.start(sources, transforms, sinks);
- }
+ /**
+ * This method is the entrypoint of SeaTunnel.
+ *
+ * @param commandArgs commandArgs
+ * @param <T> commandType
+ */
+ public static <T extends CommandArgs> void run(T commandArgs) {
- @SafeVarargs
- private static void baseCheckConfig(List<? extends Plugin>... plugins) {
- for (List<? extends Plugin> pluginList : plugins) {
- for (Plugin plugin : pluginList) {
- CheckResult checkResult;
- try {
- checkResult = plugin.checkConfig();
- } catch (Exception e) {
- checkResult = CheckResult.error(e.getMessage());
- }
- if (!checkResult.isSuccess()) {
- LOGGER.error("Plugin[{}] contains invalid config, error: {} \n", plugin.getClass().getName(), checkResult.getMsg());
- System.exit(-1); // invalid configuration
- }
- }
- }
- deployModeCheck();
- }
-
- private static void deployModeCheck() {
- final Optional<String> mode = Common.getDeployMode();
- if (mode.isPresent() && DeployMode.CLUSTER.getName().equals(mode.get())) {
-
- LOGGER.info("preparing cluster mode work dir files...");
- File workDir = new File(".");
-
- for (File file : Objects.requireNonNull(workDir.listFiles())) {
- LOGGER.warn("\t list file: " + file.getAbsolutePath());
- }
- // decompress plugin dir
- File compressedFile = new File("plugins.tar.gz");
-
- try {
- File tempFile = CompressionUtils.unGzip(compressedFile, workDir);
- try {
- CompressionUtils.unTar(tempFile, workDir);
- LOGGER.info("succeeded to decompress plugins.tar.gz");
- } catch (ArchiveException e) {
- LOGGER.error("failed to decompress plugins.tar.gz", e);
- System.exit(-1);
- }
- } catch (IOException e) {
- LOGGER.error("failed to decompress plugins.tar.gz", e);
- System.exit(-1);
- }
- }
- }
-
- private static void prepare(RuntimeEnv env, List<? extends Plugin>... plugins) {
- for (List<? extends Plugin> pluginList : plugins) {
- pluginList.forEach(plugin -> plugin.prepare(env));
+ if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
+ throw new IllegalArgumentException(
+ String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
}
- }
-
- private static void showAsciiLogo() {
- String printAsciiLogo = System.getenv("SEATUNNEL_PRINT_ASCII_LOGO");
- if ("true".equalsIgnoreCase(printAsciiLogo)) {
- AsciiArtUtils.printAsciiArt(Constants.LOGO);
+ try {
+ Command<T> command = CommandFactory.createCommand(commandArgs);
+ command.execute(commandArgs);
+ } catch (ConfigRuntimeException e) {
+ showConfigError(e);
+ throw e;
+ } catch (Exception e) {
+ showFatalError(e);
+ throw e;
}
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
new file mode 100644
index 0000000..ff00e4b
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.command;
+
+import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.env.RuntimeEnv;
+import org.apache.seatunnel.plugin.Plugin;
+import org.apache.seatunnel.utils.AsciiArtUtils;
+import org.apache.seatunnel.utils.CompressionUtils;
+
+import org.apache.commons.compress.archivers.ArchiveException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * Base task execute command. More details see:
+ * <ul>
+ * <li>{@link org.apache.seatunnel.command.flink.FlinkTaskExecuteCommand}</li>
+ * <li>{@link org.apache.seatunnel.command.spark.SparkTaskExecuteCommand}</li>
+ * </ul>
+ *
+ * @param <T> command args.
+ */
+public abstract class BaseTaskExecuteCommand<T extends CommandArgs> implements Command<T> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(BaseTaskExecuteCommand.class);
+
+ /**
+ * Check the plugin config.
+ *
+ * @param plugins plugin list.
+ */
+ protected void baseCheckConfig(List<? extends Plugin>... plugins) {
+ pluginCheck(plugins);
+ deployModeCheck();
+ }
+
+ /**
+ * Execute prepare method defined in {@link Plugin}.
+ *
+ * @param env runtimeEnv
+ * @param plugins plugin list
+ */
+ protected void prepare(RuntimeEnv env, List<? extends Plugin>... plugins) {
+ for (List<? extends Plugin> pluginList : plugins) {
+ pluginList.forEach(plugin -> plugin.prepare(env));
+ }
+ }
+
+ /**
+ * Print the logo.
+ */
+ protected void showAsciiLogo() {
+ String printAsciiLogo = System.getenv("SEATUNNEL_PRINT_ASCII_LOGO");
+ if ("true".equalsIgnoreCase(printAsciiLogo)) {
+ AsciiArtUtils.printAsciiArt(Constants.LOGO);
+ }
+ }
+
+ /**
+ * Execute the checkConfig method defined in {@link Plugin}.
+ *
+ * @param plugins plugin list
+ */
+ private void pluginCheck(List<? extends Plugin>... plugins) {
+ for (List<? extends Plugin> pluginList : plugins) {
+ for (Plugin plugin : pluginList) {
+ CheckResult checkResult;
+ try {
+ checkResult = plugin.checkConfig();
+ } catch (Exception e) {
+ checkResult = CheckResult.error(e.getMessage());
+ }
+ if (!checkResult.isSuccess()) {
+ LOGGER.error("Plugin[{}] contains invalid config, error: {} \n", plugin.getClass().getName(), checkResult.getMsg());
+ System.exit(-1); // invalid configuration
+ }
+ }
+ }
+ }
+
+ private void deployModeCheck() {
+ final Optional<String> mode = Common.getDeployMode();
+ if (mode.isPresent() && DeployMode.CLUSTER.getName().equals(mode.get())) {
+
+ LOGGER.info("preparing cluster mode work dir files...");
+ File workDir = new File(".");
+
+ for (File file : Objects.requireNonNull(workDir.listFiles())) {
+ LOGGER.warn("\t list file: " + file.getAbsolutePath());
+ }
+ // decompress plugin dir
+ File compressedFile = new File("plugins.tar.gz");
+
+ try {
+ File tempFile = CompressionUtils.unGzip(compressedFile, workDir);
+ try {
+ CompressionUtils.unTar(tempFile, workDir);
+ LOGGER.info("succeeded to decompress plugins.tar.gz");
+ } catch (ArchiveException e) {
+ LOGGER.error("failed to decompress plugins.tar.gz", e);
+ System.exit(-1);
+ }
+ } catch (IOException e) {
+ LOGGER.error("failed to decompress plugins.tar.gz", e);
+ System.exit(-1);
+ }
+ }
+ }
+
+}
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/Command.java
similarity index 74%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/Command.java
index 44beef8..767435c 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/Command.java
@@ -15,21 +15,21 @@
* limitations under the License.
*/
-package org.apache.seatunnel.common.config;
+package org.apache.seatunnel.command;
-public enum DeployMode {
- CLIENT("client"),
- CLUSTER("cluster"),
- ;
-
- private final String name;
-
- DeployMode(String name) {
- this.name = name;
- }
+/**
+ * Command interface.
+ *
+ * @param <T> args type
+ */
+@FunctionalInterface
+public interface Command<T extends CommandArgs> {
- public String getName() {
- return name;
- }
+ /**
+ * Execute command
+ *
+ * @param commandArgs args
+ */
+ void execute(T commandArgs);
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandArgs.java
similarity index 76%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandArgs.java
index 7670fba..7f3516e 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandArgs.java
@@ -15,18 +15,15 @@
* limitations under the License.
*/
-package org.apache.seatunnel.utils;
+package org.apache.seatunnel.command;
-public enum Engine {
- SPARK("spark"), FLINK("flink"), NULL("");
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.config.EngineType;
- private String engine;
+public interface CommandArgs {
- Engine(String engine) {
- this.engine = engine;
- }
+ EngineType getEngineType();
+
+ DeployMode getDeployMode();
- public String getEngine() {
- return engine;
- }
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandBuilder.java
similarity index 75%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandBuilder.java
index 7670fba..f4fc8e5 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandBuilder.java
@@ -15,18 +15,9 @@
* limitations under the License.
*/
-package org.apache.seatunnel.utils;
+package org.apache.seatunnel.command;
-public enum Engine {
- SPARK("spark"), FLINK("flink"), NULL("");
-
- private String engine;
-
- Engine(String engine) {
- this.engine = engine;
- }
-
- public String getEngine() {
- return engine;
- }
+@FunctionalInterface
+public interface CommandBuilder<T extends CommandArgs> {
+ Command<T> buildCommand(T commandArgs);
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandFactory.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandFactory.java
new file mode 100644
index 0000000..0264446
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/CommandFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.command;
+
+import org.apache.seatunnel.command.flink.FlinkCommandBuilder;
+import org.apache.seatunnel.command.spark.SparkCommandBuilder;
+
+public class CommandFactory {
+
+ private CommandFactory() {
+ }
+
+ /**
+ * Create seatunnel command.
+ *
+ * @param commandArgs command args.
+ * @return Special command.s
+ */
+ @SuppressWarnings("unchecked")
+ public static <T extends CommandArgs> Command<T> createCommand(T commandArgs) {
+ switch (commandArgs.getEngineType()) {
+ case FLINK:
+ return (Command<T>) new FlinkCommandBuilder().buildCommand((FlinkCommandArgs) commandArgs);
+ case SPARK:
+ return (Command<T>) new SparkCommandBuilder().buildCommand((SparkCommandArgs) commandArgs);
+ default:
+ throw new RuntimeException(String.format("engine type: %s is not supported", commandArgs.getEngineType()));
+ }
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/DeployModeValidator.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/DeployModeValidator.java
similarity index 96%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/DeployModeValidator.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/DeployModeValidator.java
index 19422dc..de1c207 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/DeployModeValidator.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/DeployModeValidator.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.config.command;
+package org.apache.seatunnel.command;
import org.apache.seatunnel.common.config.Common;
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandSparkArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
similarity index 67%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandSparkArgs.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
index c847590..ecc886b 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandSparkArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/FlinkCommandArgs.java
@@ -15,57 +15,61 @@
* limitations under the License.
*/
-package org.apache.seatunnel.config.command;
+package org.apache.seatunnel.command;
import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.config.EngineType;
import com.beust.jcommander.Parameter;
import java.util.List;
-public class CommandSparkArgs {
+public class FlinkCommandArgs implements CommandArgs {
@Parameter(names = {"-c", "--config"},
description = "config file",
required = true)
private String configFile = "application.conf";
- @Parameter(names = {"-e", "--deploy-mode"},
- description = "spark deploy mode",
- required = true,
- validateWith = org.apache.seatunnel.config.command.DeployModeValidator.class)
- private String deployMode = DeployMode.CLIENT.getName();
-
- @Parameter(names = {"-m", "--master"},
- description = "spark master",
- required = true)
- private String master = null;
-
@Parameter(names = {"-i", "--variable"},
description = "variable substitution, such as -i city=beijing, or -i date=20190318")
private List<String> variables = null;
@Parameter(names = {"-t", "--check"},
description = "check config")
- private boolean testConfig = false;
+ private boolean checkConfig = false;
public String getConfigFile() {
return configFile;
}
- public String getDeployMode() {
- return deployMode;
+ public boolean isCheckConfig() {
+ return checkConfig;
}
- public boolean isTestConfig() {
- return testConfig;
+ public List<String> getVariables() {
+ return variables;
}
- public String getMaster() {
- return master;
+ @Override
+ public EngineType getEngineType() {
+ return EngineType.FLINK;
}
- public List<String> getVariables() {
- return variables;
+ @Override
+ public DeployMode getDeployMode() {
+ return DeployMode.CLIENT;
+ }
+
+ public void setConfigFile(String configFile) {
+ this.configFile = configFile;
+ }
+
+ public void setVariables(List<String> variables) {
+ this.variables = variables;
+ }
+
+ public void setCheckConfig(boolean checkConfig) {
+ this.checkConfig = checkConfig;
}
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandFlinkArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/SparkCommandArgs.java
similarity index 50%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandFlinkArgs.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/SparkCommandArgs.java
index 95cf813..fcfa7dc 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandFlinkArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/SparkCommandArgs.java
@@ -15,36 +15,83 @@
* limitations under the License.
*/
-package org.apache.seatunnel.config.command;
+package org.apache.seatunnel.command;
+
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.config.EngineType;
import com.beust.jcommander.Parameter;
import java.util.List;
-public class CommandFlinkArgs {
+public class SparkCommandArgs implements CommandArgs {
@Parameter(names = {"-c", "--config"},
description = "config file",
required = true)
private String configFile = "application.conf";
+ @Parameter(names = {"-e", "--deploy-mode"},
+ description = "spark deploy mode",
+ required = true,
+ validateWith = DeployModeValidator.class)
+ private String deployMode = DeployMode.CLIENT.getName();
+
+ @Parameter(names = {"-m", "--master"},
+ description = "spark master",
+ required = true)
+ private String master = null;
+
@Parameter(names = {"-i", "--variable"},
description = "variable substitution, such as -i city=beijing, or -i date=20190318")
private List<String> variables = null;
@Parameter(names = {"-t", "--check"},
description = "check config")
- private boolean testConfig = false;
+ private boolean checkConfig = false;
public String getConfigFile() {
return configFile;
}
- public boolean isTestConfig() {
- return testConfig;
+ public DeployMode getDeployMode() {
+ return DeployMode.from(deployMode);
+ }
+
+ public boolean isCheckConfig() {
+ return checkConfig;
+ }
+
+ public String getMaster() {
+ return master;
}
public List<String> getVariables() {
return variables;
}
+
+ @Override
+ public EngineType getEngineType() {
+ return EngineType.SPARK;
+ }
+
+ public void setConfigFile(String configFile) {
+ this.configFile = configFile;
+ }
+
+ public void setDeployMode(String deployMode) {
+ this.deployMode = deployMode;
+ }
+
+ public void setMaster(String master) {
+ this.master = master;
+ }
+
+ public void setVariables(List<String> variables) {
+ this.variables = variables;
+ }
+
+ public void setCheckConfig(boolean checkConfig) {
+ this.checkConfig = checkConfig;
+ }
}
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkCommandBuilder.java
similarity index 62%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkCommandBuilder.java
index 44beef8..5eedfdc 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkCommandBuilder.java
@@ -15,21 +15,16 @@
* limitations under the License.
*/
-package org.apache.seatunnel.common.config;
+package org.apache.seatunnel.command.flink;
-public enum DeployMode {
- CLIENT("client"),
- CLUSTER("cluster"),
- ;
+import org.apache.seatunnel.command.Command;
+import org.apache.seatunnel.command.CommandBuilder;
+import org.apache.seatunnel.command.FlinkCommandArgs;
- private final String name;
+public class FlinkCommandBuilder implements CommandBuilder<FlinkCommandArgs> {
- DeployMode(String name) {
- this.name = name;
+ @Override
+ public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
+ return commandArgs.isCheckConfig() ? new FlinkConfValidateCommand() : new FlinkTaskExecuteCommand();
}
-
- public String getName() {
- return name;
- }
-
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/DeployModeValidator.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
similarity index 53%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/DeployModeValidator.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
index 19422dc..02086b2 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/DeployModeValidator.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkConfValidateCommand.java
@@ -15,19 +15,26 @@
* limitations under the License.
*/
-package org.apache.seatunnel.config.command;
+package org.apache.seatunnel.command.flink;
-import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.command.Command;
+import org.apache.seatunnel.command.FlinkCommandArgs;
+import org.apache.seatunnel.config.ConfigBuilder;
-import com.beust.jcommander.IParameterValidator;
-import com.beust.jcommander.ParameterException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to check the Flink conf is validated.
+ */
+public class FlinkConfValidateCommand implements Command<FlinkCommandArgs> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(FlinkConfValidateCommand.class);
-public class DeployModeValidator implements IParameterValidator {
@Override
- public void validate(String name, String value)
- throws ParameterException {
- if (!Common.isModeAllowed(value)) {
- throw new ParameterException("deploy-mode: " + value + " is not allowed.");
- }
+ public void execute(FlinkCommandArgs flinkCommandArgs) {
+ String configPath = flinkCommandArgs.getConfigFile();
+ new ConfigBuilder(configPath, flinkCommandArgs.getEngineType()).checkConfig();
+ LOGGER.info("config OK !");
}
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
new file mode 100644
index 0000000..d112515
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.command.flink;
+
+import org.apache.seatunnel.apis.BaseSink;
+import org.apache.seatunnel.apis.BaseSource;
+import org.apache.seatunnel.apis.BaseTransform;
+import org.apache.seatunnel.command.BaseTaskExecuteCommand;
+import org.apache.seatunnel.command.FlinkCommandArgs;
+import org.apache.seatunnel.config.ConfigBuilder;
+import org.apache.seatunnel.config.PluginType;
+import org.apache.seatunnel.env.Execution;
+
+import java.util.List;
+
+/**
+ * Used to execute Flink Job.
+ */
+public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommandArgs> {
+
+ @Override
+ public void execute(FlinkCommandArgs flinkCommandArgs) {
+ ConfigBuilder configBuilder = new ConfigBuilder(flinkCommandArgs.getConfigFile(), flinkCommandArgs.getEngineType());
+
+ List<BaseSource> sources = configBuilder.createPlugins(PluginType.SOURCE);
+ List<BaseTransform> transforms = configBuilder.createPlugins(PluginType.TRANSFORM);
+ List<BaseSink> sinks = configBuilder.createPlugins(PluginType.SINK);
+ Execution execution = configBuilder.createExecution();
+ baseCheckConfig(sources, transforms, sinks);
+ prepare(configBuilder.getEnv(), sources, transforms, sinks);
+ showAsciiLogo();
+
+ try {
+ execution.start(sources, transforms, sinks);
+ } catch (Exception e) {
+ throw new RuntimeException("Execute Flink task error", e);
+ }
+ }
+
+}
diff --git a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkCommandBuilder.java
similarity index 62%
copy from seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkCommandBuilder.java
index 44beef8..ee3972e 100644
--- a/seatunnel-common/src/main/java/org/apache/seatunnel/common/config/DeployMode.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkCommandBuilder.java
@@ -15,21 +15,18 @@
* limitations under the License.
*/
-package org.apache.seatunnel.common.config;
+package org.apache.seatunnel.command.spark;
-public enum DeployMode {
- CLIENT("client"),
- CLUSTER("cluster"),
- ;
+import org.apache.seatunnel.command.Command;
+import org.apache.seatunnel.command.CommandBuilder;
+import org.apache.seatunnel.command.SparkCommandArgs;
- private final String name;
+public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
- DeployMode(String name) {
- this.name = name;
- }
-
- public String getName() {
- return name;
+ @Override
+ public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
+ return commandArgs.isCheckConfig() ? new SparkConfValidateCommand() : new SparkTaskExecuteCommand();
}
}
+
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
new file mode 100644
index 0000000..bfa9f78
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkConfValidateCommand.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.command.spark;
+
+import org.apache.seatunnel.command.Command;
+import org.apache.seatunnel.command.SparkCommandArgs;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.config.ConfigBuilder;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Paths;
+
+/**
+ * Used to validate the spark task conf is validated.
+ */
+public class SparkConfValidateCommand implements Command<SparkCommandArgs> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SparkConfValidateCommand.class);
+
+ @Override
+ public void execute(SparkCommandArgs sparkCommandArgs) {
+ String confPath;
+ if (DeployMode.CLUSTER.equals(sparkCommandArgs.getDeployMode())) {
+ confPath = Paths.get(sparkCommandArgs.getConfigFile()).getFileName().toString();
+ } else {
+ confPath = sparkCommandArgs.getConfigFile();
+ }
+ new ConfigBuilder(confPath, sparkCommandArgs.getEngineType()).checkConfig();
+ LOGGER.info("config OK !");
+ }
+
+}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
new file mode 100644
index 0000000..f155839
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.command.spark;
+
+import org.apache.seatunnel.apis.BaseSink;
+import org.apache.seatunnel.apis.BaseSource;
+import org.apache.seatunnel.apis.BaseTransform;
+import org.apache.seatunnel.command.BaseTaskExecuteCommand;
+import org.apache.seatunnel.command.SparkCommandArgs;
+import org.apache.seatunnel.config.ConfigBuilder;
+import org.apache.seatunnel.config.PluginType;
+import org.apache.seatunnel.env.Execution;
+
+import java.util.List;
+
+public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommandArgs> {
+
+ @Override
+ public void execute(SparkCommandArgs sparkCommandArgs) {
+ String confFile = sparkCommandArgs.getConfigFile();
+
+ ConfigBuilder configBuilder = new ConfigBuilder(confFile, sparkCommandArgs.getEngineType());
+ List<BaseSource> sources = configBuilder.createPlugins(PluginType.SOURCE);
+ List<BaseTransform> transforms = configBuilder.createPlugins(PluginType.TRANSFORM);
+ List<BaseSink> sinks = configBuilder.createPlugins(PluginType.SINK);
+ Execution execution = configBuilder.createExecution();
+ baseCheckConfig(sources, transforms, sinks);
+ prepare(configBuilder.getEnv(), sources, transforms, sinks);
+ showAsciiLogo();
+
+ try {
+ execution.start(sources, transforms, sinks);
+ } catch (Exception e) {
+ throw new RuntimeException("Execute Spark task error", e);
+ }
+ }
+
+}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
index 7bb8c7a..3a7c295 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
@@ -27,8 +27,6 @@ import org.apache.seatunnel.plugin.Plugin;
import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.spark.batch.SparkBatchExecution;
import org.apache.seatunnel.spark.stream.SparkStreamingExecution;
-import org.apache.seatunnel.utils.Engine;
-import org.apache.seatunnel.utils.PluginType;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -52,14 +50,14 @@ public class ConfigBuilder {
private static final String PLUGIN_NAME_KEY = "plugin_name";
private final String configFile;
- private final Engine engine;
+ private final EngineType engine;
private ConfigPackage configPackage;
private final Config config;
private boolean streaming;
private Config envConfig;
private final RuntimeEnv env;
- public ConfigBuilder(String configFile, Engine engine) {
+ public ConfigBuilder(String configFile, EngineType engine) {
this.configFile = configFile;
this.engine = engine;
this.config = load();
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/EngineType.java
similarity index 83%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/EngineType.java
index 7670fba..5b8d239 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/Engine.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/EngineType.java
@@ -15,14 +15,16 @@
* limitations under the License.
*/
-package org.apache.seatunnel.utils;
+package org.apache.seatunnel.config;
-public enum Engine {
- SPARK("spark"), FLINK("flink"), NULL("");
+public enum EngineType {
+ SPARK("spark"),
+ FLINK("flink"),
+ ;
- private String engine;
+ private final String engine;
- Engine(String engine) {
+ EngineType(String engine) {
this.engine = engine;
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/PluginType.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginType.java
similarity index 93%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/PluginType.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginType.java
index 9de7975..e8c0705 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/PluginType.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/PluginType.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.seatunnel.utils;
+package org.apache.seatunnel.config;
public enum PluginType {
SOURCE("source"), TRANSFORM("transform"), SINK("sink");
- private String type;
+ private final String type;
PluginType(String type) {
this.type = type;
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandLineArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandLineArgs.java
deleted file mode 100644
index 1fe0ced..0000000
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandLineArgs.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.config.command;
-
-import org.apache.seatunnel.common.config.DeployMode;
-
-import java.util.List;
-
-public class CommandLineArgs {
-
- private String deployMode = DeployMode.CLIENT.getName();
- private final String configFile;
- private final boolean testConfig;
- private List<String> variables;
-
- public CommandLineArgs(String configFile, boolean testConfig, List<String> variables) {
- this.configFile = configFile;
- this.testConfig = testConfig;
- this.variables = variables;
- }
-
- public CommandLineArgs(String deployMode, String configFile, boolean testConfig) {
- this.deployMode = deployMode;
- this.configFile = configFile;
- this.testConfig = testConfig;
- }
-
- public String getDeployMode() {
- return deployMode;
- }
-
- public String getConfigFile() {
- return configFile;
- }
-
- public boolean isTestConfig() {
- return testConfig;
- }
-
- public List<String> getVariables() {
- return variables;
- }
-
-}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandLineUtils.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/CommandLineUtils.java
similarity index 59%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandLineUtils.java
rename to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/CommandLineUtils.java
index 412f5d4..540d4ce 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/command/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/utils/CommandLineUtils.java
@@ -15,7 +15,10 @@
* limitations under the License.
*/
-package org.apache.seatunnel.config.command;
+package org.apache.seatunnel.utils;
+
+import org.apache.seatunnel.command.FlinkCommandArgs;
+import org.apache.seatunnel.command.SparkCommandArgs;
import com.beust.jcommander.JCommander;
@@ -24,32 +27,22 @@ public final class CommandLineUtils {
private CommandLineUtils() {
}
- public static CommandLineArgs parseSparkArgs(String[] args) {
- CommandSparkArgs commandSparkArgs = new CommandSparkArgs();
+ public static SparkCommandArgs parseSparkArgs(String[] args) {
+ SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
JCommander.newBuilder()
- .addObject(commandSparkArgs)
+ .addObject(sparkCommandArgs)
.build()
.parse(args);
-
- return new CommandLineArgs(
- commandSparkArgs.getDeployMode(),
- commandSparkArgs.getConfigFile(),
- commandSparkArgs.isTestConfig()
- );
+ return sparkCommandArgs;
}
- public static CommandLineArgs parseFlinkArgs(String[] args) {
- CommandFlinkArgs commandFlinkArgs = new CommandFlinkArgs();
+ public static FlinkCommandArgs parseFlinkArgs(String[] args) {
+ FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
JCommander.newBuilder()
- .addObject(commandFlinkArgs)
+ .addObject(flinkCommandArgs)
.build()
.parse(args);
-
- return new CommandLineArgs(
- commandFlinkArgs.getConfigFile(),
- commandFlinkArgs.isTestConfig(),
- commandFlinkArgs.getVariables()
- );
+ return flinkCommandArgs;
}
}
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/CommandFactoryTest.java b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/CommandFactoryTest.java
new file mode 100644
index 0000000..a5bed87
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/CommandFactoryTest.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.command;
+
+import org.apache.seatunnel.command.flink.FlinkConfValidateCommand;
+import org.apache.seatunnel.command.flink.FlinkTaskExecuteCommand;
+import org.apache.seatunnel.command.spark.SparkConfValidateCommand;
+import org.apache.seatunnel.command.spark.SparkTaskExecuteCommand;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class CommandFactoryTest {
+
+ @Test
+ public void testCreateSparkConfValidateCommand() {
+ SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
+ sparkCommandArgs.setCheckConfig(true);
+ Command<SparkCommandArgs> sparkCommand = CommandFactory.createCommand(sparkCommandArgs);
+ Assert.assertEquals(SparkConfValidateCommand.class, sparkCommand.getClass());
+ }
+
+ @Test
+ public void testCreateSparkExecuteTaskCommand() {
+ SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
+ sparkCommandArgs.setCheckConfig(false);
+ Command<SparkCommandArgs> sparkCommand = CommandFactory.createCommand(sparkCommandArgs);
+ Assert.assertEquals(SparkTaskExecuteCommand.class, sparkCommand.getClass());
+ }
+
+ @Test
+ public void testCreateFlinkConfValidateCommand() {
+ FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+ flinkCommandArgs.setCheckConfig(true);
+
+ Command<FlinkCommandArgs> flinkCommand = CommandFactory.createCommand(flinkCommandArgs);
+ Assert.assertEquals(FlinkConfValidateCommand.class, flinkCommand.getClass());
+ }
+
+ @Test
+ public void testCreateFlinkExecuteTaskCommand() {
+ FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+ flinkCommandArgs.setCheckConfig(false);
+
+ Command<FlinkCommandArgs> flinkCommand = CommandFactory.createCommand(flinkCommandArgs);
+ Assert.assertEquals(FlinkTaskExecuteCommand.class, flinkCommand.getClass());
+
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandFlinkArgsTest.java b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/FlinkCommandArgsTest.java
similarity index 87%
rename from seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandFlinkArgsTest.java
rename to seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/FlinkCommandArgsTest.java
index e446bb4..56e990d 100644
--- a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandFlinkArgsTest.java
+++ b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/FlinkCommandArgsTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.config.command;
+package org.apache.seatunnel.command;
import com.beust.jcommander.JCommander;
import org.junit.Assert;
@@ -23,18 +23,18 @@ import org.junit.Test;
import java.util.Arrays;
-public class CommandFlinkArgsTest {
+public class FlinkCommandArgsTest {
@Test
public void testParseFlinkArgs() {
String[] args = {"-c", "app.conf", "-t", "-i", "city=shenyang", "-i", "date=20200202"};
- CommandFlinkArgs flinkArgs = new CommandFlinkArgs();
+ FlinkCommandArgs flinkArgs = new FlinkCommandArgs();
JCommander.newBuilder()
.addObject(flinkArgs)
.build()
.parse(args);
Assert.assertEquals("app.conf", flinkArgs.getConfigFile());
- Assert.assertTrue(flinkArgs.isTestConfig());
+ Assert.assertTrue(flinkArgs.isCheckConfig());
Assert.assertEquals(Arrays.asList("city=shenyang", "date=20200202"), flinkArgs.getVariables());
}
}
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandSparkArgsTest.java b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/SparkCommandArgsTest.java
similarity index 86%
rename from seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandSparkArgsTest.java
rename to seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/SparkCommandArgsTest.java
index f9af86f..2b2baba 100644
--- a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandSparkArgsTest.java
+++ b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/SparkCommandArgsTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.config.command;
+package org.apache.seatunnel.command;
import org.apache.seatunnel.common.config.DeployMode;
@@ -25,18 +25,18 @@ import org.junit.Test;
import java.util.Arrays;
-public class CommandSparkArgsTest {
+public class SparkCommandArgsTest {
@Test
public void testParseSparkArgs() {
String[] args = {"-c", "app.conf", "-e", "client", "-m", "yarn", "-i", "city=shijiazhuang", "-i", "name=Tom"};
- CommandSparkArgs sparkArgs = new CommandSparkArgs();
+ SparkCommandArgs sparkArgs = new SparkCommandArgs();
JCommander.newBuilder()
.addObject(sparkArgs)
.build()
.parse(args);
Assert.assertEquals("app.conf", sparkArgs.getConfigFile());
- Assert.assertEquals(DeployMode.CLIENT.getName(), sparkArgs.getDeployMode());
+ Assert.assertEquals(DeployMode.CLIENT, sparkArgs.getDeployMode());
Assert.assertEquals("yarn", sparkArgs.getMaster());
Assert.assertEquals(Arrays.asList("city=shijiazhuang", "name=Tom"), sparkArgs.getVariables());
}
diff --git a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandLineUtilsTest.java b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/CommandLineUtilsTest.java
similarity index 88%
rename from seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandLineUtilsTest.java
rename to seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/CommandLineUtilsTest.java
index 7231b89..f24fd3a 100644
--- a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/config/command/CommandLineUtilsTest.java
+++ b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/utils/CommandLineUtilsTest.java
@@ -15,7 +15,9 @@
* limitations under the License.
*/
-package org.apache.seatunnel.config.command;
+package org.apache.seatunnel.utils;
+
+import org.apache.seatunnel.command.SparkCommandArgs;
import com.beust.jcommander.ParameterException;
import org.junit.Assert;
@@ -26,10 +28,10 @@ public class CommandLineUtilsTest {
@Test
public void testParseSparkArgs() {
String[] args = {"-c", "app.conf", "-e", "cluster", "-m", "local[*]"};
- CommandLineArgs commandLineArgs = CommandLineUtils.parseSparkArgs(args);
+ SparkCommandArgs commandLineArgs = CommandLineUtils.parseSparkArgs(args);
Assert.assertEquals("app.conf", commandLineArgs.getConfigFile());
- Assert.assertEquals("cluster", commandLineArgs.getDeployMode());
+ Assert.assertEquals("cluster", commandLineArgs.getDeployMode().getName());
}
@Test
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/SeatunnelFlink.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/SeatunnelFlink.java
index b3f9b5b..c240a43 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/SeatunnelFlink.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/SeatunnelFlink.java
@@ -17,16 +17,14 @@
package org.apache.seatunnel;
-import static org.apache.seatunnel.utils.Engine.FLINK;
-
-import org.apache.seatunnel.config.command.CommandLineArgs;
-import org.apache.seatunnel.config.command.CommandLineUtils;
+import org.apache.seatunnel.command.FlinkCommandArgs;
+import org.apache.seatunnel.utils.CommandLineUtils;
public class SeatunnelFlink {
public static void main(String[] args) throws Exception {
- CommandLineArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
- Seatunnel.run(flinkArgs, FLINK);
+ FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
+ Seatunnel.run(flinkArgs);
}
}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SeatunnelSpark.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SeatunnelSpark.java
index 68c352f..7d6eb82 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SeatunnelSpark.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/SeatunnelSpark.java
@@ -17,15 +17,13 @@
package org.apache.seatunnel;
-import static org.apache.seatunnel.utils.Engine.SPARK;
-
-import org.apache.seatunnel.config.command.CommandLineArgs;
-import org.apache.seatunnel.config.command.CommandLineUtils;
+import org.apache.seatunnel.command.SparkCommandArgs;
+import org.apache.seatunnel.utils.CommandLineUtils;
public class SeatunnelSpark {
public static void main(String[] args) throws Exception {
- CommandLineArgs sparkArgs = CommandLineUtils.parseSparkArgs(args);
- Seatunnel.run(sparkArgs, SPARK);
+ SparkCommandArgs sparkArgs = CommandLineUtils.parseSparkArgs(args);
+ Seatunnel.run(sparkArgs);
}
}
diff --git a/seatunnel-core/seatunnel-core-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java b/seatunnel-core/seatunnel-core-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
index 9e67f1a..ea8475b 100644
--- a/seatunnel-core/seatunnel-core-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
+++ b/seatunnel-core/seatunnel-core-sql/src/main/java/org/apache/seatunnel/core/sql/SeatunnelSql.java
@@ -17,10 +17,10 @@
package org.apache.seatunnel.core.sql;
-import org.apache.seatunnel.config.command.CommandLineArgs;
-import org.apache.seatunnel.config.command.CommandLineUtils;
+import org.apache.seatunnel.command.FlinkCommandArgs;
import org.apache.seatunnel.core.sql.job.Executor;
import org.apache.seatunnel.core.sql.job.JobInfo;
+import org.apache.seatunnel.utils.CommandLineUtils;
import org.apache.commons.io.FileUtils;
@@ -36,7 +36,7 @@ public class SeatunnelSql {
}
private static JobInfo parseJob(String[] args) throws IOException {
- CommandLineArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
+ FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
String configFilePath = flinkArgs.getConfigFile();
String jobContent = FileUtils.readFileToString(new File(configFilePath), StandardCharsets.UTF_8);
JobInfo jobInfo = new JobInfo(jobContent);
diff --git a/seatunnel-core/seatunnel-core-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java b/seatunnel-core/seatunnel-core-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
index 30ada8a..63f8ed8 100644
--- a/seatunnel-core/seatunnel-core-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
+++ b/seatunnel-core/seatunnel-core-sql/src/test/java/org/apache/seatunnel/core/sql/SqlVariableSubstitutionTest.java
@@ -17,9 +17,9 @@
package org.apache.seatunnel.core.sql;
-import org.apache.seatunnel.config.command.CommandLineArgs;
-import org.apache.seatunnel.config.command.CommandLineUtils;
+import org.apache.seatunnel.command.FlinkCommandArgs;
import org.apache.seatunnel.core.sql.job.JobInfo;
+import org.apache.seatunnel.utils.CommandLineUtils;
import org.apache.commons.io.FileUtils;
import org.junit.Assert;
@@ -36,7 +36,7 @@ public class SqlVariableSubstitutionTest {
String[] args = {"-c", System.getProperty("user.dir") + TEST_RESOURCE_DIR + "flink.sql.conf.template",
"-t", "-i", "table_name=events"};
- CommandLineArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
+ FlinkCommandArgs flinkArgs = CommandLineUtils.parseFlinkArgs(args);
String configFilePath = flinkArgs.getConfigFile();
String jobContent = FileUtils.readFileToString(new File(configFilePath), StandardCharsets.UTF_8);
JobInfo jobInfo = new JobInfo(jobContent);
diff --git a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
index 25316e4..e42c7e0 100644
--- a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
+++ b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
@@ -17,19 +17,20 @@
package org.apache.seatunnel.example.flink;
-import static org.apache.seatunnel.utils.Engine.FLINK;
-
import org.apache.seatunnel.Seatunnel;
-import org.apache.seatunnel.config.command.CommandLineArgs;
+import org.apache.seatunnel.command.FlinkCommandArgs;
public class LocalFlinkExample {
public static final String TEST_RESOURCE_DIR = "/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/";
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
String configFile = getTestConfigFile("fake_to_console.conf");
- CommandLineArgs flinkArgs = new CommandLineArgs(configFile, false, null);
- Seatunnel.run(flinkArgs, FLINK);
+ FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
+ flinkCommandArgs.setConfigFile(configFile);
+ flinkCommandArgs.setCheckConfig(false);
+ flinkCommandArgs.setVariables(null);
+ Seatunnel.run(flinkCommandArgs);
}
public static String getTestConfigFile(String configFile) {
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
index d4b1e5e..5fbf353 100644
--- a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
+++ b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
@@ -17,19 +17,20 @@
package org.apache.seatunnel.example.spark;
-import static org.apache.seatunnel.utils.Engine.SPARK;
-
import org.apache.seatunnel.Seatunnel;
-import org.apache.seatunnel.config.command.CommandLineArgs;
+import org.apache.seatunnel.command.SparkCommandArgs;
public class LocalSparkExample {
public static final String TEST_RESOURCE_DIR = "/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/";
- public static void main(String[] args) throws Exception {
- String configFile = getTestConfigFile("spark.batch.conf.template");
- CommandLineArgs sparkArgs = new CommandLineArgs(configFile, false, null);
- Seatunnel.run(sparkArgs, SPARK);
+ public static void main(String[] args) {
+ String configFile = getTestConfigFile("spark.batch.conf");
+ SparkCommandArgs sparkArgs = new SparkCommandArgs();
+ sparkArgs.setConfigFile(configFile);
+ sparkArgs.setCheckConfig(false);
+ sparkArgs.setVariables(null);
+ Seatunnel.run(sparkArgs);
}
public static String getTestConfigFile(String configFile) {
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf.template b/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf
similarity index 95%
rename from seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf.template
rename to seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf
index c703d09..b7c6aa4 100644
--- a/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf.template
+++ b/seatunnel-examples/seatunnel-spark-examples/src/main/resources/examples/spark.batch.conf
@@ -49,12 +49,12 @@ source {
transform {
# split data by specific delimiter
- # you can also you other filter plugins, such as sql
+ # you can also use other transform plugins, such as sql
# sql {
# sql = "select * from accesslog where request_time > 1000"
# }
- # If you would like to get more information about how to configure seatunnel and see full list of filter plugins,
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/spark/configuration/transform-plugins/Split
}