You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/09/15 09:29:34 UTC
[incubator-seatunnel] branch st-engine updated: [ST-Engine][Starter] Add seatunnel own engine starter and e2e (#2690)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new c0ba0691f [ST-Engine][Starter] Add seatunnel own engine starter and e2e (#2690)
c0ba0691f is described below
commit c0ba0691ff6ba69190a3837e6747fc646e2bd932
Author: Hisoka <fa...@qq.com>
AuthorDate: Thu Sep 15 17:29:28 2022 +0800
[ST-Engine][Starter] Add seatunnel own engine starter and e2e (#2690)
---
pom.xml | 7 ++
seatunnel-apis/seatunnel-api-base/pom.xml | 4 -
seatunnel-core/README.md | 5 +-
seatunnel-core/pom.xml | 6 +-
seatunnel-core/seatunnel-core-starter/pom.xml | 10 +-
.../starter/config/AbstractExecutionContext.java | 92 --------------
.../core/starter/config/EnvironmentFactory.java | 25 +---
.../core/starter/config/ExecutionFactory.java | 84 -------------
seatunnel-core/seatunnel-flink-starter/pom.xml | 6 -
...or.sh => start-seatunnel-flink-connector-v2.sh} | 0
.../seatunnel/core/starter/flink/FlinkStarter.java | 3 +-
.../core/starter/flink/SeatunnelFlink.java | 3 +-
.../starter/flink/config/FlinkApiEnvironment.java | 75 ------------
.../flink/config/FlinkEnvironmentFactory.java} | 16 ++-
.../starter/flink/execution/FlinkExecution.java | 7 +-
.../core/starter/flink/utils/CommandLineUtils.java | 5 +-
seatunnel-core/seatunnel-spark-starter/pom.xml | 6 -
...or.sh => start-seatunnel-spark-connector-v2.sh} | 0
.../seatunnel/core/starter/spark/SparkStarter.java | 2 +-
.../starter/spark/config/SparkEnvironment.java | 66 ----------
.../spark/config/SparkEnvironmentFactory.java} | 15 ++-
.../starter/spark/execution/SparkExecution.java | 7 +-
.../pom.xml | 6 +-
.../src/main/bin/seatunnel-cluster.sh} | 19 +--
.../src/main/bin/seatunnel.sh} | 17 +--
.../core/starter/seatunnel/CommandLineUtils.java | 74 ++++++++++++
.../core/starter/seatunnel/SeaTunnelClient.java} | 12 +-
.../core/starter/seatunnel/SeaTunnelServer.java} | 27 ++---
.../starter/seatunnel/args/ClientCommandArgs.java} | 42 +++----
.../seatunnel/args/ExecutionModeConverter.java | 0
.../starter/seatunnel/args/ServerCommandArgs.java} | 45 ++-----
.../seatunnel/command/ApiConfValidateCommand.java} | 14 +--
.../seatunnel/command/ClientCommandBuilder.java} | 10 +-
.../seatunnel/command/ClientExecuteCommand.java} | 25 ++--
.../seatunnel/command/ServerCommandBuilder.java} | 11 +-
.../seatunnel/command/ServerExecuteCommand.java | 48 ++++++++
.../config/SeaTunnelApiConfigChecker.java | 0
.../seatunnel/constant/SeaTunnelConstant.java | 19 +--
seatunnel-dist/release-docs/LICENSE | 5 -
.../src/main/assembly/assembly-bin-ci.xml | 15 +--
seatunnel-dist/src/main/assembly/assembly-bin.xml | 15 +--
.../connector-console-seatunnel-e2e}/pom.xml | 22 ++--
.../engine/e2e/console/FakeSourceToConsoleIT.java | 20 ++--
.../src/test/resources/fakesource_to_console.conf} | 23 +---
.../connector-seatunnel-e2e-base}/pom.xml | 45 ++++---
.../seatunnel/engine/e2e}/JobExecutionIT.java | 10 +-
.../seatunnel/engine/e2e/SeaTunnelContainer.java | 133 +++++++++++++++++++++
.../org/apache/seatunnel/engine/e2e/TestUtils.java | 2 +-
.../test/resources/batch_fakesource_to_file.conf | 0
.../batch_fakesource_to_file_complex.conf | 0
.../streaming_fakesource_to_file_complex.conf | 0
seatunnel-e2e/seatunnel-engine-e2e/pom.xml | 11 ++
.../apache/seatunnel/e2e/flink/FlinkContainer.java | 2 +-
.../apache/seatunnel/e2e/spark/SparkContainer.java | 2 +-
seatunnel-engine/seatunnel-engine-client/pom.xml | 9 +-
.../engine/client/job/ClientJobProxy.java | 7 +-
.../engine/client/job/JobConfigParser.java | 2 +-
.../src/test/resources/client_test.conf | 26 ++--
.../org/apache/seatunnel/engine/core/job/Job.java | 2 +
.../engine/server/task/SeaTunnelTask.java | 6 +-
.../engine/server/task/flow/SinkFlowLifeCycle.java | 12 +-
.../seatunnel-engine-examples/pom.xml | 12 +-
.../example/engine/SeaTunnelEngineExample.java | 19 +--
.../engine/SeaTunnelEngineServerExample.java | 15 +--
tools/dependencies/known-dependencies.txt | 6 -
65 files changed, 538 insertions(+), 696 deletions(-)
diff --git a/pom.xml b/pom.xml
index 9750e140e..5f5326181 100644
--- a/pom.xml
+++ b/pom.xml
@@ -197,6 +197,7 @@
<commons-io.version>2.11.0</commons-io.version>
<commons-collections4.version>4.4</commons-collections4.version>
<maven-assembly-plugin.version>3.3.0</maven-assembly-plugin.version>
+ <commons-compress.version>1.20</commons-compress.version>
<protostuff.version>1.8.0</protostuff.version>
<spark.scope>provided</spark.scope>
<flink.scope>provided</flink.scope>
@@ -629,6 +630,12 @@
<version>${jackson.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
+ <version>${commons-compress.version}</version>
+ </dependency>
+
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
diff --git a/seatunnel-apis/seatunnel-api-base/pom.xml b/seatunnel-apis/seatunnel-api-base/pom.xml
index 9fe328718..502cae6c4 100644
--- a/seatunnel-apis/seatunnel-api-base/pom.xml
+++ b/seatunnel-apis/seatunnel-api-base/pom.xml
@@ -36,10 +36,6 @@
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-config-shade</artifactId>
- </dependency>
</dependencies>
</project>
diff --git a/seatunnel-core/README.md b/seatunnel-core/README.md
index c4b48e4cb..7ba7c840f 100644
--- a/seatunnel-core/README.md
+++ b/seatunnel-core/README.md
@@ -5,4 +5,7 @@ This module is the seatunnel job entrypoint. Seatunnel jobs are started by the b
- seatunnel-core-flink: The flink job starter.
- seatunnel-core-flink-sql: The flink sql job starter.
-- seatunnel-core-spark: The spark job starter.
\ No newline at end of file
+- seatunnel-core-spark: The spark job starter.
+- seatunnel-spark-starter: The spark job starter for connector-v2.
+- seatunnel-flink-starter: The flink job starter for connector-v2.
+- seatunnel-starter: The seatunnel engine job starter for connector-v2.
\ No newline at end of file
diff --git a/seatunnel-core/pom.xml b/seatunnel-core/pom.xml
index 9735a18e4..b5a588a83 100644
--- a/seatunnel-core/pom.xml
+++ b/seatunnel-core/pom.xml
@@ -44,7 +44,7 @@
<module>seatunnel-core-starter</module>
<module>seatunnel-flink-starter</module>
<module>seatunnel-spark-starter</module>
- <module>seatunnel-seatunnel-starter</module>
+ <module>seatunnel-starter</module>
</modules>
</profile>
<profile>
@@ -52,7 +52,7 @@
<modules>
<module>seatunnel-core-base</module>
<module>seatunnel-core-starter</module>
- <module>seatunnel-seatunnel-starter</module>
+ <module>seatunnel-starter</module>
</modules>
</profile>
<profile>
@@ -65,7 +65,7 @@
<module>seatunnel-core-starter</module>
<module>seatunnel-flink-starter</module>
<module>seatunnel-spark-starter</module>
- <module>seatunnel-seatunnel-starter</module>
+ <module>seatunnel-starter</module>
</modules>
</profile>
</profiles>
diff --git a/seatunnel-core/seatunnel-core-starter/pom.xml b/seatunnel-core/seatunnel-core-starter/pom.xml
index 561879c68..8d05a0545 100644
--- a/seatunnel-core/seatunnel-core-starter/pom.xml
+++ b/seatunnel-core/seatunnel-core-starter/pom.xml
@@ -40,19 +40,19 @@
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-api-flink</artifactId>
+ <artifactId>seatunnel-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-api-spark</artifactId>
+ <artifactId>seatunnel-plugin-discovery</artifactId>
<version>${project.version}</version>
</dependency>
+
<dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-plugin-discovery</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-compress</artifactId>
</dependency>
<dependency>
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/AbstractExecutionContext.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/AbstractExecutionContext.java
deleted file mode 100644
index a834965c0..000000000
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/AbstractExecutionContext.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.config;
-
-import org.apache.seatunnel.apis.base.api.BaseSink;
-import org.apache.seatunnel.apis.base.api.BaseSource;
-import org.apache.seatunnel.apis.base.api.BaseTransform;
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
-import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import java.net.URL;
-import java.util.Arrays;
-import java.util.List;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/**
- * The ExecutionContext contains all configuration needed to run the job.
- *
- * @param <ENVIRONMENT> environment type.
- */
-public abstract class AbstractExecutionContext<ENVIRONMENT extends RuntimeEnv> {
-
- private final Config config;
- private final EngineType engine;
-
- private final ENVIRONMENT environment;
- private final JobMode jobMode;
-
- public AbstractExecutionContext(Config config, EngineType engine) {
- this.config = config;
- this.engine = engine;
- this.environment = new EnvironmentFactory<ENVIRONMENT>(config, engine).getEnvironment();
- this.jobMode = environment.getJobMode();
- }
-
- public Config getRootConfig() {
- return config;
- }
-
- public EngineType getEngine() {
- return engine;
- }
-
- public ENVIRONMENT getEnvironment() {
- return environment;
- }
-
- public JobMode getJobMode() {
- return jobMode;
- }
-
- public abstract List<BaseSource<ENVIRONMENT>> getSources();
-
- public abstract List<BaseTransform<ENVIRONMENT>> getTransforms();
-
- public abstract List<BaseSink<ENVIRONMENT>> getSinks();
-
- public abstract List<URL> getPluginJars();
-
- @SuppressWarnings("checkstyle:Indentation")
- protected List<PluginIdentifier> getPluginIdentifiers(PluginType... pluginTypes) {
- return Arrays.stream(pluginTypes).flatMap((Function<PluginType, Stream<PluginIdentifier>>) pluginType -> {
- List<? extends Config> configList = config.getConfigList(pluginType.getType());
- return configList.stream()
- .map(pluginConfig -> PluginIdentifier
- .of(engine.getEngine(),
- pluginType.getType(),
- pluginConfig.getString("plugin_name")));
- }).collect(Collectors.toList());
- }
-}
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EnvironmentFactory.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EnvironmentFactory.java
index 20e77a1db..5b99b7aa0 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EnvironmentFactory.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/EnvironmentFactory.java
@@ -19,8 +19,6 @@ package org.apache.seatunnel.core.starter.config;
import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.flink.FlinkEnvironment;
-import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -31,39 +29,28 @@ import java.util.List;
*
* @param <ENVIRONMENT> environment type
*/
-public class EnvironmentFactory<ENVIRONMENT extends RuntimeEnv> {
+public abstract class EnvironmentFactory<ENVIRONMENT extends RuntimeEnv> {
private static final String PLUGIN_NAME_KEY = "plugin_name";
private final Config config;
- private final EngineType engine;
- public EnvironmentFactory(Config config, EngineType engine) {
+ public EnvironmentFactory(Config config) {
this.config = config;
- this.engine = engine;
}
// todo:put this method into submodule to avoid dependency on the engine
public synchronized ENVIRONMENT getEnvironment() {
Config envConfig = config.getConfig("env");
- boolean enableHive = checkIsContainHive();
- ENVIRONMENT env;
- switch (engine) {
- case SPARK:
- env = (ENVIRONMENT) new SparkEnvironment().setEnableHive(enableHive);
- break;
- case FLINK:
- env = (ENVIRONMENT) new FlinkEnvironment();
- break;
- default:
- throw new IllegalArgumentException("Engine: " + engine + " is not supported");
- }
+ ENVIRONMENT env = newEnvironment();
env.setConfig(envConfig)
.setJobMode(getJobMode(envConfig)).prepare();
return env;
}
- private boolean checkIsContainHive() {
+ protected abstract ENVIRONMENT newEnvironment();
+
+ protected boolean checkIsContainHive() {
List<? extends Config> sourceConfigList = config.getConfigList(PluginType.SOURCE.getType());
for (Config c : sourceConfigList) {
if (c.getString(PLUGIN_NAME_KEY).toLowerCase().contains("hive")) {
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ExecutionFactory.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ExecutionFactory.java
deleted file mode 100644
index d0d99cd94..000000000
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ExecutionFactory.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.config;
-
-import org.apache.seatunnel.apis.base.api.BaseSink;
-import org.apache.seatunnel.apis.base.api.BaseSource;
-import org.apache.seatunnel.apis.base.api.BaseTransform;
-import org.apache.seatunnel.apis.base.env.Execution;
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
-import org.apache.seatunnel.flink.FlinkEnvironment;
-import org.apache.seatunnel.flink.batch.FlinkBatchExecution;
-import org.apache.seatunnel.flink.stream.FlinkStreamExecution;
-import org.apache.seatunnel.spark.SparkEnvironment;
-import org.apache.seatunnel.spark.batch.SparkBatchExecution;
-import org.apache.seatunnel.spark.stream.SparkStreamingExecution;
-import org.apache.seatunnel.spark.structuredstream.StructuredStreamingExecution;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Used to create {@link Execution}.
- *
- * @param <ENVIRONMENT> environment type
- */
-public class ExecutionFactory<ENVIRONMENT extends RuntimeEnv> {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(ExecutionFactory.class);
-
- public AbstractExecutionContext<ENVIRONMENT> executionContext;
-
- public ExecutionFactory(AbstractExecutionContext<ENVIRONMENT> executionContext) {
- this.executionContext = executionContext;
- }
-
- public Execution<BaseSource<ENVIRONMENT>, BaseTransform<ENVIRONMENT>, BaseSink<ENVIRONMENT>, ENVIRONMENT> createExecution() {
- Execution execution = null;
- switch (executionContext.getEngine()) {
- case SPARK:
- SparkEnvironment sparkEnvironment = (SparkEnvironment) executionContext.getEnvironment();
- switch (executionContext.getJobMode()) {
- case STREAMING:
- execution = new SparkStreamingExecution(sparkEnvironment);
- break;
- case STRUCTURED_STREAMING:
- execution = new StructuredStreamingExecution(sparkEnvironment);
- break;
- default:
- execution = new SparkBatchExecution(sparkEnvironment);
- }
- break;
- case FLINK:
- FlinkEnvironment flinkEnvironment = (FlinkEnvironment) executionContext.getEnvironment();
- switch (executionContext.getJobMode()) {
- case STREAMING:
- execution = new FlinkStreamExecution(flinkEnvironment);
- break;
- default:
- execution = new FlinkBatchExecution(flinkEnvironment);
- }
- break;
- default:
- throw new IllegalArgumentException("No suitable engine");
- }
- LOGGER.info("current execution is [{}]", execution.getClass().getName());
- return (Execution<BaseSource<ENVIRONMENT>, BaseTransform<ENVIRONMENT>, BaseSink<ENVIRONMENT>, ENVIRONMENT>) execution;
- }
-
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/pom.xml
index 6385b9c41..15131900b 100644
--- a/seatunnel-core/seatunnel-flink-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/pom.xml
@@ -42,12 +42,6 @@
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-api</artifactId>
- <version>${project.version}</version>
- </dependency>
-
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-translation-flink</artifactId>
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-new-connector.sh b/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-new-connector.sh
rename to seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
index 52b207a79..4de645699 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
@@ -20,7 +20,6 @@ package org.apache.seatunnel.core.starter.flink;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.starter.Starter;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.starter.flink.config.FlinkJobType;
import org.apache.seatunnel.core.starter.flink.utils.CommandLineUtils;
import java.util.List;
@@ -44,7 +43,7 @@ public class FlinkStarter implements Starter {
private final String appJar;
FlinkStarter(String[] args) {
- this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+ this.flinkCommandArgs = CommandLineUtils.parseCommandArgs(args);
// set the deployment mode, used to get the job jar path.
Common.setDeployMode(flinkCommandArgs.getDeployMode());
Common.setStarter(true);
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeatunnelFlink.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeatunnelFlink.java
index 330c2e501..20824a901 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeatunnelFlink.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeatunnelFlink.java
@@ -22,13 +22,12 @@ import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.exception.CommandException;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.starter.flink.command.FlinkCommandBuilder;
-import org.apache.seatunnel.core.starter.flink.config.FlinkJobType;
import org.apache.seatunnel.core.starter.flink.utils.CommandLineUtils;
public class SeatunnelFlink {
public static void main(String[] args) throws CommandException {
- FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
+ FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args);
Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
.buildCommand(flinkCommandArgs);
Seatunnel.run(flinkCommand);
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiEnvironment.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiEnvironment.java
deleted file mode 100644
index 5b8ba32d8..000000000
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiEnvironment.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.config;
-
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.JobMode;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URL;
-import java.util.List;
-
-public class FlinkApiEnvironment implements RuntimeEnv {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(FlinkApiEnvironment.class);
-
- private Config config;
-
- @Override
- public FlinkApiEnvironment setConfig(Config config) {
- this.config = config;
- return this;
- }
-
- @Override
- public Config getConfig() {
- return config;
- }
-
- @Override
- public CheckResult checkConfig() {
- // todo
- return null;
- }
-
- @Override
- public FlinkApiEnvironment prepare() {
- // todo
- return null;
- }
-
- @Override
- public FlinkApiEnvironment setJobMode(JobMode mode) {
- return null;
- }
-
- @Override
- public JobMode getJobMode() {
- return null;
- }
-
- @Override
- public void registerPlugin(List<URL> pluginPaths) {
-
- }
-}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkEnvironmentFactory.java
similarity index 67%
copy from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
copy to seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkEnvironmentFactory.java
index bfaff2a9c..a3d3c2eec 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkEnvironmentFactory.java
@@ -15,17 +15,21 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.starter.seatunnel.config;
+package org.apache.seatunnel.core.starter.flink.config;
-import org.apache.seatunnel.core.starter.config.ConfigChecker;
-import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
+import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
+import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-public class SeaTunnelApiConfigChecker implements ConfigChecker {
+public class FlinkEnvironmentFactory extends EnvironmentFactory<FlinkEnvironment> {
+
+ public FlinkEnvironmentFactory(Config config) {
+ super(config);
+ }
@Override
- public void checkConfig(Config config) throws ConfigCheckException {
- // TODO implement
+ protected FlinkEnvironment newEnvironment() {
+ return new FlinkEnvironment();
}
}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index ec7d2a199..9447910dc 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -18,10 +18,9 @@
package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.core.starter.config.EngineType;
-import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.core.starter.execution.TaskExecution;
+import org.apache.seatunnel.core.starter.flink.config.FlinkEnvironmentFactory;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -41,15 +40,13 @@ public class FlinkExecution implements TaskExecution {
private static final Logger LOGGER = LoggerFactory.getLogger(FlinkExecution.class);
- private final Config config;
private final FlinkEnvironment flinkEnvironment;
private final PluginExecuteProcessor sourcePluginExecuteProcessor;
private final PluginExecuteProcessor transformPluginExecuteProcessor;
private final PluginExecuteProcessor sinkPluginExecuteProcessor;
public FlinkExecution(Config config) {
- this.config = config;
- this.flinkEnvironment = new EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
+ this.flinkEnvironment = new FlinkEnvironmentFactory(config).getEnvironment();
JobContext jobContext = new JobContext();
jobContext.setJobMode(flinkEnvironment.getJobMode());
this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(flinkEnvironment, jobContext, config.getConfigList("source"));
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/CommandLineUtils.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/CommandLineUtils.java
index 9b29fdf16..9d9a6a43e 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/CommandLineUtils.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.core.starter.flink.utils;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.starter.flink.config.FlinkJobType;
import org.apache.seatunnel.core.starter.flink.constant.FlinkConstant;
import com.beust.jcommander.JCommander;
@@ -43,10 +42,10 @@ public class CommandLineUtils {
return flinkCommandArgs;
}
- public static FlinkCommandArgs parseCommandArgs(String[] args, FlinkJobType jobType) {
+ public static FlinkCommandArgs parseCommandArgs(String[] args) {
FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
JCommander jCommander = JCommander.newBuilder()
- .programName(jobType.getType())
+ .programName("start-seatunnel-flink-connector-v2.sh")
.addObject(flinkCommandArgs)
.acceptUnknownOptions(true)
.args(args)
diff --git a/seatunnel-core/seatunnel-spark-starter/pom.xml b/seatunnel-core/seatunnel-spark-starter/pom.xml
index d65b58739..e02470ea2 100644
--- a/seatunnel-core/seatunnel-spark-starter/pom.xml
+++ b/seatunnel-core/seatunnel-spark-starter/pom.xml
@@ -35,12 +35,6 @@
<dependencies>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-api</artifactId>
- <version>${project.version}</version>
- </dependency>
-
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-core-starter</artifactId>
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/bin/start-seatunnel-spark-new-connector.sh b/seatunnel-core/seatunnel-spark-starter/src/main/bin/start-seatunnel-spark-connector-v2.sh
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/src/main/bin/start-seatunnel-spark-new-connector.sh
rename to seatunnel-core/seatunnel-spark-starter/src/main/bin/start-seatunnel-spark-connector-v2.sh
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
index 00261a80c..e9cd2b41f 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -134,7 +134,7 @@ public class SparkStarter implements Starter {
private static SparkCommandArgs parseCommandArgs(String[] args) {
SparkCommandArgs commandArgs = new SparkCommandArgs();
JCommander commander = JCommander.newBuilder()
- .programName("start-seatunnel-spark.sh")
+ .programName("start-seatunnel-spark-connector-v2.sh")
.addObject(commandArgs)
.args(args)
.build();
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironment.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironment.java
deleted file mode 100644
index f7d5b83d4..000000000
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironment.java
+++ /dev/null
@@ -1,66 +0,0 @@
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.config;
-
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.JobMode;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import java.net.URL;
-import java.util.List;
-
-public class SparkEnvironment implements RuntimeEnv {
-
- @Override
- public SparkEnvironment setConfig(Config config) {
- return null;
- }
-
- @Override
- public Config getConfig() {
- return null;
- }
-
- @Override
- public CheckResult checkConfig() {
- return null;
- }
-
- @Override
- public SparkEnvironment prepare() {
- return null;
- }
-
- @Override
- public SparkEnvironment setJobMode(JobMode mode) {
- return null;
- }
-
- @Override
- public JobMode getJobMode() {
- return null;
- }
-
- @Override
- public void registerPlugin(List<URL> pluginPaths) {
-
- }
-}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironmentFactory.java
similarity index 65%
copy from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
copy to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironmentFactory.java
index bfaff2a9c..8ac703ea8 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironmentFactory.java
@@ -15,17 +15,20 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.starter.seatunnel.config;
+package org.apache.seatunnel.core.starter.spark.config;
-import org.apache.seatunnel.core.starter.config.ConfigChecker;
-import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
+import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
+import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-public class SeaTunnelApiConfigChecker implements ConfigChecker {
+public class SparkEnvironmentFactory extends EnvironmentFactory<SparkEnvironment> {
+ public SparkEnvironmentFactory(Config config) {
+ super(config);
+ }
@Override
- public void checkConfig(Config config) throws ConfigCheckException {
- // TODO implement
+ protected SparkEnvironment newEnvironment() {
+ return new SparkEnvironment().setEnableHive(checkIsContainHive());
}
}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
index 0bb57268c..0ec9f6662 100644
--- a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
@@ -18,9 +18,8 @@
package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.core.starter.config.EngineType;
-import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+import org.apache.seatunnel.core.starter.spark.config.SparkEnvironmentFactory;
import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -37,15 +36,13 @@ public class SparkExecution {
private static final Logger LOGGER = LoggerFactory.getLogger(SparkExecution.class);
- private final Config config;
private final SparkEnvironment sparkEnvironment;
private final PluginExecuteProcessor sourcePluginExecuteProcessor;
private final PluginExecuteProcessor transformPluginExecuteProcessor;
private final PluginExecuteProcessor sinkPluginExecuteProcessor;
public SparkExecution(Config config) {
- this.config = config;
- this.sparkEnvironment = (SparkEnvironment) new EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
+ this.sparkEnvironment = new SparkEnvironmentFactory(config).getEnvironment();
JobContext jobContext = new JobContext();
jobContext.setJobMode(sparkEnvironment.getJobMode());
this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(sparkEnvironment, jobContext, config.getConfigList("source"));
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/pom.xml b/seatunnel-core/seatunnel-starter/pom.xml
similarity index 94%
rename from seatunnel-core/seatunnel-seatunnel-starter/pom.xml
rename to seatunnel-core/seatunnel-starter/pom.xml
index b4d7c3bf6..81dccddaf 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/pom.xml
+++ b/seatunnel-core/seatunnel-starter/pom.xml
@@ -26,7 +26,7 @@
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>seatunnel-seatunnel-starter</artifactId>
+ <artifactId>seatunnel-starter</artifactId>
<dependencies>
<dependency>
@@ -54,4 +54,8 @@
</dependency>
</dependencies>
+ <build>
+ <finalName>${project.name}</finalName>
+ </build>
+
</project>
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-engine-job.sh b/seatunnel-core/seatunnel-starter/src/main/bin/seatunnel-cluster.sh
similarity index 79%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-engine-job.sh
rename to seatunnel-core/seatunnel-starter/src/main/bin/seatunnel-cluster.sh
index 1cfc741d8..228d07d01 100755
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-engine-job.sh
+++ b/seatunnel-core/seatunnel-starter/src/main/bin/seatunnel-cluster.sh
@@ -36,8 +36,8 @@ done
PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
-APP_JAR=${APP_DIR}/lib/seatunnel-seatunnel-starter.jar
-APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelStarter"
+APP_JAR=${APP_DIR}/lib/seatunnel-starter.jar
+APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer"
if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
. "${CONF_DIR}/seatunnel-env.sh"
@@ -45,20 +45,9 @@ fi
if [ $# == 0 ]
then
- args="-h"
+ args=""
else
args=$@
fi
-CMD=$(java -cp ${APP_JAR} ${APP_MAIN} ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
-if [ ${EXIT_CODE} -eq 234 ]; then
- # print usage
- echo "${CMD}"
- exit 0
-elif [ ${EXIT_CODE} -eq 0 ]; then
- echo "Execute SeaTunnel Job: ${CMD}"
- eval ${CMD}
-else
- echo "${CMD}"
- exit ${EXIT_CODE}
-fi
+java -cp ${APP_JAR} ${APP_MAIN} ${args}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-seatunnel.sh b/seatunnel-core/seatunnel-starter/src/main/bin/seatunnel.sh
similarity index 76%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-seatunnel.sh
rename to seatunnel-core/seatunnel-starter/src/main/bin/seatunnel.sh
index 1cfc741d8..601bfd4e7 100755
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/bin/start-seatunnel-seatunnel.sh
+++ b/seatunnel-core/seatunnel-starter/src/main/bin/seatunnel.sh
@@ -36,8 +36,8 @@ done
PRG_DIR=`dirname "$PRG"`
APP_DIR=`cd "$PRG_DIR/.." >/dev/null; pwd`
CONF_DIR=${APP_DIR}/config
-APP_JAR=${APP_DIR}/lib/seatunnel-seatunnel-starter.jar
-APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelStarter"
+APP_JAR=${APP_DIR}/lib/seatunnel-starter.jar
+APP_MAIN="org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient"
if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
. "${CONF_DIR}/seatunnel-env.sh"
@@ -50,15 +50,4 @@ else
args=$@
fi
-CMD=$(java -cp ${APP_JAR} ${APP_MAIN} ${args}) && EXIT_CODE=$? || EXIT_CODE=$?
-if [ ${EXIT_CODE} -eq 234 ]; then
- # print usage
- echo "${CMD}"
- exit 0
-elif [ ${EXIT_CODE} -eq 0 ]; then
- echo "Execute SeaTunnel Job: ${CMD}"
- eval ${CMD}
-else
- echo "${CMD}"
- exit ${EXIT_CODE}
-fi
+java -cp ${APP_JAR} ${APP_MAIN} ${args}
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java
new file mode 100644
index 000000000..5c0e1b650
--- /dev/null
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.seatunnel;
+
+import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.args.ServerCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.constant.SeaTunnelConstant;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.UnixStyleUsageFormatter;
+
+public class CommandLineUtils {
+
+ private static final String CLIENT_SHELL_NAME = "seatunnel.sh";
+
+ private static final String SERVER_SHELL_NAME = "seatunnel.sh";
+
+ private CommandLineUtils() {
+ throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
+ }
+
+ public static ClientCommandArgs parseSeaTunnelClientArgs(String[] args) {
+ ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
+ JCommander jCommander = getJCommander(CLIENT_SHELL_NAME, args, clientCommandArgs);
+ // The args is not belongs to seatunnel, add into flink params
+ clientCommandArgs.setSeatunnelParams(jCommander.getUnknownOptions());
+ if (clientCommandArgs.isHelp()) {
+ printHelp(jCommander);
+ }
+ return clientCommandArgs;
+ }
+
+ public static ServerCommandArgs parseSeaTunnelServerArgs(String[] args) {
+ ServerCommandArgs serverCommandArgs = new ServerCommandArgs();
+ JCommander jCommander = getJCommander(SERVER_SHELL_NAME, args, serverCommandArgs);
+ // The args is not belongs to seatunnel, add into flink params
+ serverCommandArgs.setSeatunnelParams(jCommander.getUnknownOptions());
+ if (serverCommandArgs.isHelp()) {
+ printHelp(jCommander);
+ }
+ return serverCommandArgs;
+ }
+
+ private static void printHelp(JCommander jCommander) {
+ jCommander.setUsageFormatter(new UnixStyleUsageFormatter(jCommander));
+ jCommander.usage();
+ System.exit(SeaTunnelConstant.USAGE_EXIT_CODE);
+ }
+
+ private static JCommander getJCommander(String shellName, String[] args, Object serverCommandArgs) {
+ return JCommander.newBuilder()
+ .programName(shellName)
+ .addObject(serverCommandArgs)
+ .acceptUnknownOptions(true)
+ .args(args)
+ .build();
+ }
+
+}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
similarity index 73%
copy from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java
copy to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
index a3a98a7b1..0f092781b 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelClient.java
@@ -20,14 +20,14 @@ package org.apache.seatunnel.core.starter.seatunnel;
import org.apache.seatunnel.core.starter.Seatunnel;
import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.exception.CommandException;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
-import org.apache.seatunnel.core.starter.seatunnel.command.SeaTunnelCommandBuilder;
+import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.command.ClientCommandBuilder;
-public class SeaTunnel {
+public class SeaTunnelClient {
public static void main(String[] args) throws CommandException {
- SeaTunnelCommandArgs seaTunnelCommandArgs = CommandLineUtils.parseSeaTunnelArgs(args);
- Command<SeaTunnelCommandArgs> command =
- new SeaTunnelCommandBuilder().buildCommand(seaTunnelCommandArgs);
+ ClientCommandArgs clientCommandArgs = CommandLineUtils.parseSeaTunnelClientArgs(args);
+ Command<ClientCommandArgs> command =
+ new ClientCommandBuilder().buildCommand(clientCommandArgs);
Seatunnel.run(command);
}
}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java
similarity index 56%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java
rename to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java
index fd204e87a..a15adeb9f 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/CommandLineUtils.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelServer.java
@@ -17,22 +17,17 @@
package org.apache.seatunnel.core.starter.seatunnel;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import org.apache.seatunnel.core.starter.Seatunnel;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.exception.CommandException;
+import org.apache.seatunnel.core.starter.seatunnel.args.ServerCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.command.ServerCommandBuilder;
-import com.beust.jcommander.JCommander;
-
-public class CommandLineUtils {
-
- private CommandLineUtils() {
- throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
- }
-
- public static SeaTunnelCommandArgs parseSeaTunnelArgs(String[] args) {
- SeaTunnelCommandArgs seatunnelCommandArgs = new SeaTunnelCommandArgs();
- JCommander.newBuilder()
- .addObject(seatunnelCommandArgs)
- .build()
- .parse(args);
- return seatunnelCommandArgs;
+public class SeaTunnelServer {
+ public static void main(String[] args) throws CommandException {
+ ServerCommandArgs serverCommandArgs = CommandLineUtils.parseSeaTunnelServerArgs(args);
+ Command<ServerCommandArgs> command =
+ new ServerCommandBuilder().buildCommand(serverCommandArgs);
+ Seatunnel.run(command);
}
}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
similarity index 95%
copy from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
copy to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index 9e86b2800..d5202b23c 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -26,7 +26,7 @@ import com.beust.jcommander.Parameter;
import java.util.List;
-public class SeaTunnelCommandArgs extends AbstractCommandArgs {
+public class ClientCommandArgs extends AbstractCommandArgs {
/**
* Undefined parameters parsed will be stored here as seatunnel engine command parameters.
@@ -37,14 +37,30 @@ public class SeaTunnelCommandArgs extends AbstractCommandArgs {
description = "The name of job")
private String name = "seatunnel_job";
+ @Parameter(names = {"-e", "--deploy-mode"},
+ description = "SeaTunnel deploy mode",
+ converter = ExecutionModeConverter.class)
+ private ExecutionMode executionMode = ExecutionMode.CLUSTER;
+
@Parameter(names = {"-cn", "--cluster"},
description = "The name of cluster")
private String clusterName = "seatunnel_default_cluster";
- @Parameter(names = {"-e", "--deploy-mode"},
- description = "SeaTunnel deploy mode",
- converter = ExecutionModeConverter.class)
- private ExecutionMode executionMode = ExecutionMode.LOCAL;
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
+ public List<String> getSeatunnelParams() {
+ return seatunnelParams;
+ }
+
+ public void setSeatunnelParams(List<String> seatunnelParams) {
+ this.seatunnelParams = seatunnelParams;
+ }
public ExecutionMode getExecutionMode() {
return executionMode;
@@ -62,14 +78,6 @@ public class SeaTunnelCommandArgs extends AbstractCommandArgs {
this.name = name;
}
- public String getClusterName() {
- return clusterName;
- }
-
- public void setClusterName(String clusterName) {
- this.clusterName = clusterName;
- }
-
@Override
public EngineType getEngineType() {
return EngineType.SEATUNNEL;
@@ -79,12 +87,4 @@ public class SeaTunnelCommandArgs extends AbstractCommandArgs {
public DeployMode getDeployMode() {
return DeployMode.CLIENT;
}
-
- public List<String> getSeatunnelParams() {
- return seatunnelParams;
- }
-
- public void setSeatunnelParams(List<String> seatunnelParams) {
- this.seatunnelParams = seatunnelParams;
- }
}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ExecutionModeConverter.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ExecutionModeConverter.java
similarity index 100%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ExecutionModeConverter.java
rename to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ExecutionModeConverter.java
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
similarity index 58%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
rename to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
index 9e86b2800..ec0019eb9 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ServerCommandArgs.java
@@ -17,49 +17,34 @@
package org.apache.seatunnel.core.starter.seatunnel.args;
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
-import org.apache.seatunnel.core.starter.config.EngineType;
-import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.core.starter.command.CommandArgs;
import com.beust.jcommander.Parameter;
import java.util.List;
-public class SeaTunnelCommandArgs extends AbstractCommandArgs {
+public class ServerCommandArgs implements CommandArgs {
/**
* Undefined parameters parsed will be stored here as seatunnel engine command parameters.
*/
private List<String> seatunnelParams;
- @Parameter(names = {"-n", "--name"},
- description = "The name of job")
- private String name = "seatunnel_job";
-
@Parameter(names = {"-cn", "--cluster"},
description = "The name of cluster")
private String clusterName = "seatunnel_default_cluster";
- @Parameter(names = {"-e", "--deploy-mode"},
- description = "SeaTunnel deploy mode",
- converter = ExecutionModeConverter.class)
- private ExecutionMode executionMode = ExecutionMode.LOCAL;
-
- public ExecutionMode getExecutionMode() {
- return executionMode;
- }
-
- public void setExecutionMode(ExecutionMode executionMode) {
- this.executionMode = executionMode;
- }
+ @Parameter(names = {"-h", "--help"},
+ help = true,
+ description = "Show the usage message")
+ private boolean help = false;
- public String getName() {
- return name;
+ public boolean isHelp() {
+ return help;
}
- public void setName(String name) {
- this.name = name;
+ public void setHelp(boolean help) {
+ this.help = help;
}
public String getClusterName() {
@@ -70,16 +55,6 @@ public class SeaTunnelCommandArgs extends AbstractCommandArgs {
this.clusterName = clusterName;
}
- @Override
- public EngineType getEngineType() {
- return EngineType.SEATUNNEL;
- }
-
- @Override
- public DeployMode getDeployMode() {
- return DeployMode.CLIENT;
- }
-
public List<String> getSeatunnelParams() {
return seatunnelParams;
}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiConfValidateCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ApiConfValidateCommand.java
similarity index 77%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiConfValidateCommand.java
rename to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ApiConfValidateCommand.java
index 8036120ae..20bc93659 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ApiConfValidateCommand.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.core.starter.seatunnel.command;
import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.config.ConfigBuilder;
import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
import org.apache.seatunnel.core.starter.seatunnel.config.SeaTunnelApiConfigChecker;
import org.apache.seatunnel.core.starter.utils.FileUtils;
@@ -32,19 +32,19 @@ import java.nio.file.Path;
/**
* Use to validate the configuration of the SeaTunnel API.
*/
-public class SeaTunnelApiConfValidateCommand implements Command<SeaTunnelCommandArgs> {
+public class ApiConfValidateCommand implements Command<ClientCommandArgs> {
- private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelApiConfValidateCommand.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ApiConfValidateCommand.class);
- private final SeaTunnelCommandArgs seaTunnelCommandArgs;
+ private final ClientCommandArgs clientCommandArgs;
- public SeaTunnelApiConfValidateCommand(SeaTunnelCommandArgs seaTunnelCommandArgs) {
- this.seaTunnelCommandArgs = seaTunnelCommandArgs;
+ public ApiConfValidateCommand(ClientCommandArgs clientCommandArgs) {
+ this.clientCommandArgs = clientCommandArgs;
}
@Override
public void execute() throws ConfigCheckException {
- Path configPath = FileUtils.getConfigPath(seaTunnelCommandArgs);
+ Path configPath = FileUtils.getConfigPath(clientCommandArgs);
ConfigBuilder configBuilder = new ConfigBuilder(configPath);
new SeaTunnelApiConfigChecker().checkConfig(configBuilder.getConfig());
LOGGER.info("config OK !");
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientCommandBuilder.java
similarity index 73%
copy from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java
copy to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientCommandBuilder.java
index 722c84e7b..fb9bfc6d0 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientCommandBuilder.java
@@ -20,14 +20,14 @@ package org.apache.seatunnel.core.starter.seatunnel.command;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.command.CommandBuilder;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
-public class SeaTunnelCommandBuilder implements CommandBuilder<SeaTunnelCommandArgs> {
+public class ClientCommandBuilder implements CommandBuilder<ClientCommandArgs> {
@Override
- public Command<SeaTunnelCommandArgs> buildCommand(SeaTunnelCommandArgs commandArgs) {
+ public Command<ClientCommandArgs> buildCommand(ClientCommandArgs commandArgs) {
Common.setDeployMode(commandArgs.getDeployMode());
- return commandArgs.isCheckConfig() ? new SeaTunnelApiConfValidateCommand(commandArgs)
- : new SeaTunnelApiTaskExecuteCommand(commandArgs);
+ return commandArgs.isCheckConfig() ? new ApiConfValidateCommand(commandArgs)
+ : new ClientExecuteCommand(commandArgs);
}
}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
similarity index 82%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiTaskExecuteCommand.java
rename to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index d9d8638d9..d99e708e9 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -19,7 +19,7 @@ package org.apache.seatunnel.core.starter.seatunnel.command;
import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
import org.apache.seatunnel.core.starter.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
@@ -41,26 +41,25 @@ import java.util.concurrent.ExecutionException;
/**
* This command is used to execute the SeaTunnel engine job by SeaTunnel API.
*/
-public class SeaTunnelApiTaskExecuteCommand implements Command<SeaTunnelCommandArgs> {
+public class ClientExecuteCommand implements Command<ClientCommandArgs> {
- private final SeaTunnelCommandArgs seaTunnelCommandArgs;
+ private final ClientCommandArgs clientCommandArgs;
- // TODO custom cluster name on cluster execution mode
-
- public SeaTunnelApiTaskExecuteCommand(SeaTunnelCommandArgs seaTunnelCommandArgs) {
- this.seaTunnelCommandArgs = seaTunnelCommandArgs;
+ public ClientExecuteCommand(ClientCommandArgs clientCommandArgs) {
+ this.clientCommandArgs = clientCommandArgs;
}
@Override
public void execute() throws CommandExecuteException {
- Path configFile = FileUtils.getConfigPath(seaTunnelCommandArgs);
+ Path configFile = FileUtils.getConfigPath(clientCommandArgs);
JobConfig jobConfig = new JobConfig();
- jobConfig.setName(seaTunnelCommandArgs.getName());
+ jobConfig.setName(clientCommandArgs.getName());
HazelcastInstance instance = null;
+ ClientJobProxy clientJobProxy = null;
try {
- String clusterName = seaTunnelCommandArgs.getClusterName();
- if (seaTunnelCommandArgs.getExecutionMode().equals(ExecutionMode.LOCAL)) {
+ String clusterName = clientCommandArgs.getClusterName();
+ if (clientCommandArgs.getExecutionMode().equals(ExecutionMode.LOCAL)) {
clusterName = creatRandomClusterName(clusterName);
instance = createServerInLocal(clusterName);
}
@@ -69,7 +68,6 @@ public class SeaTunnelApiTaskExecuteCommand implements Command<SeaTunnelCommandA
SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
JobExecutionEnvironment jobExecutionEnv = engineClient.createExecutionContext(configFile.toString(), jobConfig);
- ClientJobProxy clientJobProxy;
clientJobProxy = jobExecutionEnv.execute();
clientJobProxy.waitForJobComplete();
} catch (ExecutionException | InterruptedException e) {
@@ -78,6 +76,9 @@ public class SeaTunnelApiTaskExecuteCommand implements Command<SeaTunnelCommandA
if (instance != null) {
instance.shutdown();
}
+ if (clientJobProxy != null) {
+ clientJobProxy.close();
+ }
}
}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerCommandBuilder.java
similarity index 65%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java
rename to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerCommandBuilder.java
index 722c84e7b..999bd7893 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerCommandBuilder.java
@@ -17,17 +17,14 @@
package org.apache.seatunnel.core.starter.seatunnel.command;
-import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.command.CommandBuilder;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.args.ServerCommandArgs;
-public class SeaTunnelCommandBuilder implements CommandBuilder<SeaTunnelCommandArgs> {
+public class ServerCommandBuilder implements CommandBuilder<ServerCommandArgs> {
@Override
- public Command<SeaTunnelCommandArgs> buildCommand(SeaTunnelCommandArgs commandArgs) {
- Common.setDeployMode(commandArgs.getDeployMode());
- return commandArgs.isCheckConfig() ? new SeaTunnelApiConfValidateCommand(commandArgs)
- : new SeaTunnelApiTaskExecuteCommand(commandArgs);
+ public Command<ServerCommandArgs> buildCommand(ServerCommandArgs commandArgs) {
+ return new ServerExecuteCommand(commandArgs);
}
}
diff --git a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommand.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommand.java
new file mode 100644
index 000000000..bbd50fd56
--- /dev/null
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ServerExecuteCommand.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.seatunnel.command;
+
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.seatunnel.args.ServerCommandArgs;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
+
+import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+
+/**
+ * This command is used to execute the SeaTunnel engine job by SeaTunnel API.
+ */
+public class ServerExecuteCommand implements Command<ServerCommandArgs> {
+
+ private final ServerCommandArgs serverCommandArgs;
+
+ public ServerExecuteCommand(ServerCommandArgs serverCommandArgs) {
+ this.serverCommandArgs = serverCommandArgs;
+ }
+
+ @Override
+ public void execute() {
+ SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig.getHazelcastConfig().setClusterName(serverCommandArgs.getClusterName());
+ HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
+ Thread.currentThread().getName(),
+ new SeaTunnelNodeContext(seaTunnelConfig));
+ }
+
+}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
similarity index 100%
copy from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
copy to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/constant/SeaTunnelConstant.java
similarity index 68%
copy from seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
copy to seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/constant/SeaTunnelConstant.java
index 517d5b1f2..7b032b9f4 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/constant/SeaTunnelConstant.java
@@ -15,21 +15,8 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.core.job;
+package org.apache.seatunnel.core.starter.seatunnel.constant;
-import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-
-/**
- * Job interface define the Running job apis
- */
-public interface Job {
- long getJobId();
-
- PassiveCompletableFuture<JobStatus> doWaitForJobComplete();
-
- void cancelJob();
-
- JobStatus getJobStatus();
-
- JobStatus waitForJobComplete();
+public class SeaTunnelConstant {
+ public static final int USAGE_EXIT_CODE = 234;
}
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index d693eea8c..fb6b6359f 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -345,9 +345,7 @@ The text of each license is the standard Apache 2.0 license.
(Apache License, Version 2.0) Apache Commons CLI (commons-cli:commons-cli:1.4 - http://commons.apache.org/proper/commons-cli/)
(Apache License, Version 2.0) Apache Commons Codec (commons-codec:commons-codec:1.13 - https://commons.apache.org/proper/commons-codec/)
(Apache License, Version 2.0) Apache Commons Collections (org.apache.commons:commons-collections4:4.4 - https://commons.apache.org/proper/commons-collections/)
- (Apache License, Version 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.18 - https://commons.apache.org/proper/commons-compress/)
(Apache License, Version 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.20 - https://commons.apache.org/proper/commons-compress/)
- (Apache License, Version 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.21 - https://commons.apache.org/proper/commons-compress/)
(Apache License, Version 2.0) Apache Commons Configuration (org.apache.commons:commons-configuration2:2.1.1 - http://commons.apache.org/proper/commons-configuration/)
(Apache License, Version 2.0) Apache Commons Crypto (org.apache.commons:commons-crypto:1.0.0 - http://commons.apache.org/proper/commons-crypto/)
(Apache License, Version 2.0) Apache Commons Email (org.apache.commons:commons-email:1.5 - http://commons.apache.org/proper/commons-email/)
@@ -627,7 +625,6 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) Apache Avro IPC (org.apache.avro:avro-ipc:1.8.2 - http://avro.apache.org)
(The Apache Software License, Version 2.0) Apache Avro Mapred API (org.apache.avro:avro-mapred:1.8.2 - http://avro.apache.org/avro-mapred)
(The Apache Software License, Version 2.0) Apache Commons CSV (org.apache.commons:commons-csv:1.0 - http://commons.apache.org/proper/commons-csv/)
- (The Apache Software License, Version 2.0) Apache Commons Compress (org.apache.commons:commons-compress:1.8.1 - http://commons.apache.org/proper/commons-compress/)
(The Apache Software License, Version 2.0) Apache Commons DBCP (org.apache.commons:commons-dbcp2:2.0.1 - http://commons.apache.org/proper/commons-dbcp/)
(The Apache Software License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.3.2 - http://commons.apache.org/proper/commons-lang/)
(The Apache Software License, Version 2.0) Apache Commons Logging (commons-logging:commons-logging:1.2 - http://commons.apache.org/proper/commons-logging/)
@@ -678,7 +675,6 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) ClassMate (com.fasterxml:classmate:1.1.0 - http://github.com/cowtowncoder/java-classmate)
(The Apache Software License, Version 2.0) Commons BeanUtils Core (commons-beanutils:commons-beanutils-core:1.8.0 - http://commons.apache.org/beanutils/)
(The Apache Software License, Version 2.0) Commons CLI (commons-cli:commons-cli:1.2 - http://commons.apache.org/cli/)
- (The Apache Software License, Version 2.0) Commons Compress (org.apache.commons:commons-compress:1.4.1 - http://commons.apache.org/compress/)
(The Apache Software License, Version 2.0) Commons Configuration (commons-configuration:commons-configuration:1.6 - http://commons.apache.org/${pom.artifactId.substring(8)}/)
(The Apache Software License, Version 2.0) Commons Configuration (commons-configuration:commons-configuration:1.7 - http://commons.apache.org/configuration/)
(The Apache Software License, Version 2.0) Commons DBCP (commons-dbcp:commons-dbcp:1.4 - http://commons.apache.org/dbcp/)
@@ -1211,7 +1207,6 @@ The following components are provided under the Public Domain License. See proje
The text of each license is also included at licenses/LICENSE-[project].txt.
(Public Domain) AOP alliance (aopalliance:aopalliance:1.0 - http://aopalliance.sourceforge.net)
- (Public Domain) XZ for Java (org.tukaani:xz:1.0 - http://tukaani.org/xz/java.html)
(Public Domain) XZ for Java (org.tukaani:xz:1.5 - http://tukaani.org/xz/java.html)
(Public Domain) XZ for Java (org.tukaani:xz:1.8 - https://tukaani.org/xz/java.html)
(Public Domain, per Creative Commons CC0) HdrHistogram (org.hdrhistogram:HdrHistogram:2.1.9 - http://hdrhistogram.github.io/HdrHistogram/)
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
index d4f7aaffa..87eb3a111 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin-ci.xml
@@ -120,19 +120,14 @@
</fileSet>
<!-- seatunnel engine -->
<fileSet>
- <directory>../seatunnel-engine/seatunnel-engine-client/target</directory>
- <includes>
- <include>seatunnel-engine-*.jar</include>
- </includes>
- <excludes>
- <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
- </excludes>
- <outputDirectory>/lib</outputDirectory>
+ <directory>../seatunnel-core/seatunnel-starter/src/main/bin</directory>
+ <outputDirectory>/bin</outputDirectory>
+ <fileMode>0755</fileMode>
</fileSet>
<fileSet>
- <directory>../seatunnel-engine/seatunnel-engine-server/target</directory>
+ <directory>../seatunnel-core/seatunnel-starter/target</directory>
<includes>
- <include>seatunnel-engine-*.jar</include>
+ <include>seatunnel-starter*.jar</include>
</includes>
<excludes>
<exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
diff --git a/seatunnel-dist/src/main/assembly/assembly-bin.xml b/seatunnel-dist/src/main/assembly/assembly-bin.xml
index eddaf8129..6da678c6e 100644
--- a/seatunnel-dist/src/main/assembly/assembly-bin.xml
+++ b/seatunnel-dist/src/main/assembly/assembly-bin.xml
@@ -120,19 +120,14 @@
</fileSet>
<!-- seatunnel engine -->
<fileSet>
- <directory>../seatunnel-engine/seatunnel-engine-client/target</directory>
- <includes>
- <include>seatunnel-engine-*.jar</include>
- </includes>
- <excludes>
- <exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
- </excludes>
- <outputDirectory>/lib</outputDirectory>
+ <directory>../seatunnel-core/seatunnel-starter/src/main/bin</directory>
+ <outputDirectory>/bin</outputDirectory>
+ <fileMode>0755</fileMode>
</fileSet>
<fileSet>
- <directory>../seatunnel-engine/seatunnel-engine-server/target</directory>
+ <directory>../seatunnel-core/seatunnel-starter/target</directory>
<includes>
- <include>seatunnel-engine-*.jar</include>
+ <include>seatunnel-starter*.jar</include>
</includes>
<excludes>
<exclude>%regex[.*((javadoc)|(sources))\.jar]</exclude>
diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/pom.xml
similarity index 71%
copy from seatunnel-examples/seatunnel-engine-examples/pom.xml
copy to seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/pom.xml
index b75a68b85..afad8f533 100644
--- a/seatunnel-examples/seatunnel-engine-examples/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/pom.xml
@@ -20,29 +20,23 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>seatunnel-examples</artifactId>
+ <artifactId>seatunnel-engine-e2e</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>seatunnel-engine-examples</artifactId>
- <dependencies>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-seatunnel-starter</artifactId>
- <version>${project.version}</version>
- </dependency>
+ <artifactId>connector-console-seatunnel-e2e</artifactId>
+ <dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-fake</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-console</artifactId>
+ <artifactId>connector-seatunnel-e2e-base</artifactId>
<version>${project.version}</version>
+ <classifier>tests</classifier>
+ <type>test-jar</type>
+ <scope>test</scope>
</dependency>
</dependencies>
+
</project>
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java
similarity index 59%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
rename to seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java
index bfaff2a9c..fa63feb1c 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/java/org/apache/seatunnel/engine/e2e/console/FakeSourceToConsoleIT.java
@@ -15,17 +15,21 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.starter.seatunnel.config;
+package org.apache.seatunnel.engine.e2e.console;
-import org.apache.seatunnel.core.starter.config.ConfigChecker;
-import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
+import org.apache.seatunnel.engine.e2e.SeaTunnelContainer;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.junit.Assert;
+import org.junit.Test;
+import org.testcontainers.containers.Container;
-public class SeaTunnelApiConfigChecker implements ConfigChecker {
+import java.io.IOException;
- @Override
- public void checkConfig(Config config) throws ConfigCheckException {
- // TODO implement
+public class FakeSourceToConsoleIT extends SeaTunnelContainer {
+
+ @Test
+ public void testFakeSourceToConsoleSink() throws IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelJob("/fakesource_to_console.conf");
+ Assert.assertEquals(0, execResult.getExitCode());
}
}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/resources/fakesource_to_console.conf
similarity index 63%
copy from seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file.conf
copy to seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/resources/fakesource_to_console.conf
index 1e49074ba..35f8633ef 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-console-seatunnel-e2e/src/test/resources/fakesource_to_console.conf
@@ -19,41 +19,22 @@
######
env {
- # You can set flink configuration here
- execution.parallelism = 1
job.mode = "BATCH"
- execution.checkpoint.interval = 5000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
# This is a example source plugin **only for test and demonstrate the feature source plugin**
FakeSource {
result_table_name = "fake"
- field_name = "name,age",
- parallelism = 3
+ field_name = "name,age"
}
-
}
transform {
}
sink {
- LocalFile {
- path="file:///tmp/hive/warehouse/test2"
- field_delimiter="\t"
- row_delimiter="\n"
- partition_by=["age"]
- partition_dir_expression="${k0}=${v0}"
- is_partition_field_write_in_file=true
- file_name_expression="${transactionId}_${now}"
- file_format="text"
- sink_columns=["name","age"]
- filename_time_format="yyyy.MM.dd"
- is_enable_transaction=true
- save_mode="error"
-
+ console {
}
}
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
similarity index 59%
copy from seatunnel-examples/seatunnel-engine-examples/pom.xml
copy to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
index b75a68b85..03f7c66bf 100644
--- a/seatunnel-examples/seatunnel-engine-examples/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
@@ -20,29 +20,36 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>seatunnel-examples</artifactId>
+ <artifactId>seatunnel-engine-e2e</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>seatunnel-engine-examples</artifactId>
- <dependencies>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-seatunnel-starter</artifactId>
- <version>${project.version}</version>
- </dependency>
+ <artifactId>connector-seatunnel-e2e-base</artifactId>
+
+ <properties>
+ <maven-jar-plugin.version>2.4</maven-jar-plugin.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>${maven-jar-plugin.version}</version>
+ <configuration>
+ <skip>false</skip>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-fake</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-console</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
similarity index 95%
rename from seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
rename to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 8fb21b51b..b1c444ace 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/engine/JobExecutionIT.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -15,9 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.engine.e2e.engine;
-
-import static org.awaitility.Awaitility.await;
+package org.apache.seatunnel.engine.e2e;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
@@ -29,12 +27,12 @@ import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelClientConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.e2e.TestUtils;
import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
import com.google.common.collect.Lists;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -88,7 +86,7 @@ public class JobExecutionIT {
return clientJobProxy.waitForJobComplete();
});
- await().atMost(20000, TimeUnit.MILLISECONDS)
+ Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assert.assertTrue(
objectCompletableFuture.isDone() && JobStatus.FINISHED.equals(objectCompletableFuture.get())));
@@ -121,7 +119,7 @@ public class JobExecutionIT {
Thread.sleep(1000);
clientJobProxy.cancelJob();
- await().atMost(20000, TimeUnit.MILLISECONDS)
+ Awaitility.await().atMost(20000, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assert.assertTrue(
objectCompletableFuture.isDone() && JobStatus.CANCELED.equals(objectCompletableFuture.get())));
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java
new file mode 100644
index 000000000..9b6495850
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/SeaTunnelContainer.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+public abstract class SeaTunnelContainer {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SeaTunnelContainer.class);
+ private static final String JDK_DOCKER_IMAGE = "openjdk:8";
+ protected static final Network NETWORK = Network.newNetwork();
+ private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent().getParent();
+
+ private static final String SEATUNNEL_HOME = "/tmp/seatunnel";
+ private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
+ private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, "bin").toString();
+ private static final String SEATUNNEL_LIB = Paths.get(SEATUNNEL_HOME, "lib").toString();
+ private static final String SEATUNNEL_CONNECTORS = Paths.get(SEATUNNEL_HOME, "connectors").toString();
+ private static final String CLIENT_SHELL = "seatunnel.sh";
+ private static final String SERVER_SHELL = "seatunnel-cluster.sh";
+ private GenericContainer<?> server;
+
+ @Before
+ public void before() {
+ Map<String, String> mountMapping = getFileMountMapping();
+ server = new GenericContainer<>(JDK_DOCKER_IMAGE)
+ .withNetwork(NETWORK)
+ .withCommand(Paths.get(SEATUNNEL_BIN, SERVER_SHELL).toString())
+ .withNetworkAliases("server")
+ .withExposedPorts()
+ .withLogConsumer(new Slf4jLogConsumer(LOG))
+ .waitingFor(Wait.forLogMessage(".*received new worker register.*\\n", 1));
+ mountMapping.forEach(server::withFileSystemBind);
+ server.start();
+ }
+
+ protected Map<String, String> getFileMountMapping() {
+
+ Map<String, String> mountMapping = new HashMap<>();
+ // copy lib
+ mountMapping.put(PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-starter/target/seatunnel-starter.jar",
+ Paths.get(SEATUNNEL_LIB, "seatunnel-starter.jar").toString());
+
+ // copy bin
+ mountMapping.put(PROJECT_ROOT_PATH + "/seatunnel-core/seatunnel-starter/src/main/bin/",
+ Paths.get(SEATUNNEL_BIN).toString());
+
+ // copy connectors
+ mountMapping.put(PROJECT_ROOT_PATH +
+ "/seatunnel-connectors-v2-dist/target/lib", Paths.get(SEATUNNEL_CONNECTORS, "seatunnel").toString());
+
+ // copy plugin-mapping.properties
+ mountMapping.put(PROJECT_ROOT_PATH + "/plugin-mapping.properties", Paths.get(SEATUNNEL_CONNECTORS, PLUGIN_MAPPING_FILE).toString());
+
+ return mountMapping;
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ public Container.ExecResult executeSeaTunnelJob(String confFile) throws IOException, InterruptedException {
+ final String confPath = getResource(confFile);
+ if (!new File(confPath).exists()) {
+ throw new IllegalArgumentException(confFile + " doesn't exist");
+ }
+ final String targetConfInContainer = Paths.get("/tmp", confFile).toString();
+ server.copyFileToContainer(MountableFile.forHostPath(confPath), targetConfInContainer);
+
+ // Running IT use cases under Windows requires replacing \ with /
+ String conf = targetConfInContainer.replaceAll("\\\\", "/");
+ final List<String> command = new ArrayList<>();
+ command.add(Paths.get(SEATUNNEL_BIN, CLIENT_SHELL).toString());
+ command.add("--config " + conf);
+
+ CompletableFuture.runAsync(() -> {
+ try {
+ Thread.sleep(15000);
+ // cancel server if bash command not return
+ server.stop();
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ Container.ExecResult execResult = server.execInContainer("bash", "-c", String.join(" ", command));
+ LOG.info(execResult.getStdout());
+ LOG.error(execResult.getStderr());
+ return execResult;
+ }
+
+ @After
+ public void after() {
+ if (server != null) {
+ server.close();
+ }
+ }
+
+ private String getResource(String confFile) {
+ return System.getProperty("user.dir") + "/src/test/resources" + confFile;
+ }
+
+}
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
similarity index 96%
rename from seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
rename to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
index 786a34ce6..f44177e09 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TestUtils.java
@@ -37,7 +37,7 @@ public class TestUtils {
public static void initPluginDir() {
// copy connectors to project_root/connectors dir
System.setProperty("SEATUNNEL_HOME", System.getProperty("user.dir") +
- String.format("%s..%s..%s", File.separator, File.separator, File.separator));
+ String.format("%s..%s..%s..%s", File.separator, File.separator, File.separator, File.separator));
File seatunnelRootDir = new File(System.getProperty("SEATUNNEL_HOME"));
File connectorDir = new File(seatunnelRootDir +
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
similarity index 100%
rename from seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file.conf
rename to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
similarity index 100%
rename from seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/batch_fakesource_to_file_complex.conf
rename to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/streaming_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
similarity index 100%
rename from seatunnel-e2e/seatunnel-engine-e2e/src/test/resources/streaming_fakesource_to_file_complex.conf
rename to seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/pom.xml b/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
index ea8a31f3a..570bbefa7 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/pom.xml
@@ -27,6 +27,12 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>seatunnel-engine-e2e</artifactId>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>connector-seatunnel-e2e-base</module>
+ <module>connector-console-seatunnel-e2e</module>
+ </modules>
<dependencies>
<dependency>
@@ -66,9 +72,14 @@
<artifactId>seatunnel-engine-server</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ </dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
+ <scope>test</scope>
</dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
index 0ffa31634..6666bd5ef 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
@@ -53,7 +53,7 @@ public abstract class FlinkContainer {
protected GenericContainer<?> jobManager;
protected GenericContainer<?> taskManager;
private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent();
- private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-flink-new-connector.sh";
+ private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-flink-connector-v2.sh";
private static final String SEATUNNEL_FLINK_JAR = "seatunnel-flink-starter.jar";
private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
index de30eeb41..282a74c6a 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
@@ -52,7 +52,7 @@ public abstract class SparkContainer {
protected GenericContainer<?> master;
private static final Path PROJECT_ROOT_PATH = Paths.get(System.getProperty("user.dir")).getParent().getParent();
- private static final String SEATUNNEL_SPARK_BIN = "start-seatunnel-spark-new-connector.sh";
+ private static final String SEATUNNEL_SPARK_BIN = "start-seatunnel-spark-connector-v2.sh";
private static final String SEATUNNEL_SPARK_JAR = "seatunnel-spark-starter.jar";
private static final String PLUGIN_MAPPING_FILE = "plugin-mapping.properties";
private static final String SEATUNNEL_HOME = "/tmp/spark/seatunnel";
diff --git a/seatunnel-engine/seatunnel-engine-client/pom.xml b/seatunnel-engine/seatunnel-engine-client/pom.xml
index b24ad2088..f8e51c2fc 100644
--- a/seatunnel-engine/seatunnel-engine-client/pom.xml
+++ b/seatunnel-engine/seatunnel-engine-client/pom.xml
@@ -41,7 +41,7 @@
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-core-base</artifactId>
+ <artifactId>seatunnel-core-starter</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@@ -66,6 +66,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-console</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-file-local</artifactId>
@@ -73,6 +79,7 @@
<scope>test</scope>
</dependency>
+
<dependency>
<groupId>com.hazelcast</groupId>
<artifactId>hazelcast</artifactId>
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index 8c372f2d5..044143108 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -53,6 +53,7 @@ public class ClientJobProxy implements Job {
}
private void submitJob() {
+ LOGGER.info(String.format("start submit job, job id: %s, with plugin jar %s", jobImmutableInformation.getJobId(), jobImmutableInformation.getPluginJarsUrls()));
ClientMessage request = SeaTunnelSubmitJobCodec.encodeRequest(jobImmutableInformation.getJobId(),
seaTunnelHazelcastClient.getSerializationService().toData(jobImmutableInformation));
PassiveCompletableFuture<Void> submitJobFuture =
@@ -84,10 +85,14 @@ public class ClientJobProxy implements Job {
jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId(),
jobStatus));
- this.seaTunnelHazelcastClient.getHazelcastInstance().shutdown();
return jobStatus;
}
+ @Override
+ public void close() {
+ this.seaTunnelHazelcastClient.getHazelcastInstance().shutdown();
+ }
+
@Override
public PassiveCompletableFuture<JobStatus> doWaitForJobComplete() {
return seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
diff --git a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
index 608838e9b..0d19411d6 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
+++ b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobConfigParser.java
@@ -28,7 +28,7 @@ import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.apis.base.plugin.Plugin;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
diff --git a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
index 69ce5614a..e2634dfa8 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
@@ -46,18 +46,18 @@ transform {
sink {
LocalFile {
- path="file:///tmp/hive/warehouse/test2"
- field_delimiter="\t"
- row_delimiter="\n"
- partition_by=["age"]
- partition_dir_expression="${k0}=${v0}"
- is_partition_field_write_in_file=true
- file_name_expression="${transactionId}_${now}"
- file_format="text"
- sink_columns=["name","age"]
- filename_time_format="yyyy.MM.dd"
- is_enable_transaction=true
- save_mode="error",
- source_table_name="fake"
+ path = "file:///tmp/hive/warehouse/test2"
+ field_delimiter = "\t"
+ row_delimiter = "\n"
+ partition_by = ["age"]
+ partition_dir_expression = "${k0}=${v0}"
+ is_partition_field_write_in_file = true
+ file_name_expression = "${transactionId}_${now}"
+ file_format = "text"
+ sink_columns = ["name", "age"]
+ filename_time_format = "yyyy.MM.dd"
+ is_enable_transaction = true
+ save_mode = "error",
+ source_table_name = "fake"
}
}
\ No newline at end of file
diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
index 517d5b1f2..69ed8ad81 100644
--- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
+++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/Job.java
@@ -32,4 +32,6 @@ public interface Job {
JobStatus getJobStatus();
JobStatus waitForJobComplete();
+
+ void close();
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
index f7b215820..5379e1d60 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelTask.java
@@ -231,7 +231,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
@Override
public Set<URL> getJarsUrl() {
- List<Flow> now = Collections.singletonList(executionFlow);
+ List<Flow> now = new ArrayList<>();
+ now.add(executionFlow);
Set<URL> urls = new HashSet<>();
while (!now.isEmpty()) {
final List<Flow> next = new ArrayList<>();
@@ -241,7 +242,8 @@ public abstract class SeaTunnelTask extends AbstractTask {
}
next.addAll(n.getNext());
});
- now = next;
+ now.clear();
+ now.addAll(next);
}
return urls;
}
diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index f5eeb8e80..684ed7904 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -124,7 +124,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends Serializable, AggregatedCo
prepareClose = true;
}
if (barrier.snapshot()) {
- if (writerStateSerializer.isPresent()) {
+ if (!writerStateSerializer.isPresent()) {
runningTask.addState(barrier, sinkAction.getId(), Collections.emptyList());
} else {
List<StateT> states = writer.snapshotState(barrier.getId());
@@ -136,10 +136,14 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends Serializable, AggregatedCo
writer.abortPrepare();
throw e;
}
- lastCommitInfo.ifPresent(commitInfoT -> runningTask.getExecutionContext().sendToMember(new SinkPrepareCommitOperation(barrier, committerTaskLocation,
- SerializationUtils.serialize(commitInfoT)), committerTaskAddress).join());
+ if (containAggCommitter) {
+ lastCommitInfo.ifPresent(commitInfoT -> runningTask.getExecutionContext().sendToMember(new SinkPrepareCommitOperation(barrier, committerTaskLocation,
+ SerializationUtils.serialize(commitInfoT)), committerTaskAddress).join());
+ }
} else {
- runningTask.getExecutionContext().sendToMember(new CheckpointBarrierTriggerOperation(barrier, committerTaskLocation), committerTaskAddress);
+ if (containAggCommitter) {
+ runningTask.getExecutionContext().sendToMember(new CheckpointBarrierTriggerOperation(barrier, committerTaskLocation), committerTaskAddress);
+ }
}
runningTask.ack(barrier);
} else {
diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml b/seatunnel-examples/seatunnel-engine-examples/pom.xml
index b75a68b85..a0d952b2c 100644
--- a/seatunnel-examples/seatunnel-engine-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-engine-examples/pom.xml
@@ -30,19 +30,9 @@
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-seatunnel-starter</artifactId>
+ <artifactId>seatunnel-starter</artifactId>
<version>${project.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-fake</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-console</artifactId>
- <version>${project.version}</version>
- </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java b/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java
index a50f31271..02736987a 100644
--- a/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java
+++ b/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java
@@ -20,8 +20,9 @@ package org.apache.seatunnel.example.engine;
import org.apache.seatunnel.core.starter.Seatunnel;
import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.exception.CommandException;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
-import org.apache.seatunnel.core.starter.seatunnel.command.SeaTunnelCommandBuilder;
+import org.apache.seatunnel.core.starter.seatunnel.args.ClientCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.command.ClientCommandBuilder;
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
import java.io.FileNotFoundException;
import java.net.URISyntaxException;
@@ -32,12 +33,14 @@ public class SeaTunnelEngineExample {
public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException {
String configFile = getTestConfigFile("/examples/fake_to_console.conf");
- SeaTunnelCommandArgs seaTunnelCommandArgs = new SeaTunnelCommandArgs();
- seaTunnelCommandArgs.setConfigFile(configFile);
- seaTunnelCommandArgs.setCheckConfig(false);
- seaTunnelCommandArgs.setName("fake_to_console");
- Command<SeaTunnelCommandArgs> command =
- new SeaTunnelCommandBuilder().buildCommand(seaTunnelCommandArgs);
+ ClientCommandArgs clientCommandArgs = new ClientCommandArgs();
+ clientCommandArgs.setConfigFile(configFile);
+ clientCommandArgs.setCheckConfig(false);
+ clientCommandArgs.setName("fake_to_console");
+ // Change Execution Mode to CLUSTER to use client mode, before do this, you should start SeaTunnelEngineServerExample
+ clientCommandArgs.setExecutionMode(ExecutionMode.LOCAL);
+ Command<ClientCommandArgs> command =
+ new ClientCommandBuilder().buildCommand(clientCommandArgs);
Seatunnel.run(command);
}
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java b/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineServerExample.java
similarity index 70%
rename from seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java
rename to seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineServerExample.java
index a3a98a7b1..af28f2872 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java
+++ b/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineServerExample.java
@@ -15,19 +15,20 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.starter.seatunnel;
+package org.apache.seatunnel.example.engine;
import org.apache.seatunnel.core.starter.Seatunnel;
import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.exception.CommandException;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
-import org.apache.seatunnel.core.starter.seatunnel.command.SeaTunnelCommandBuilder;
+import org.apache.seatunnel.core.starter.seatunnel.args.ServerCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.command.ServerCommandBuilder;
-public class SeaTunnel {
+public class SeaTunnelEngineServerExample {
public static void main(String[] args) throws CommandException {
- SeaTunnelCommandArgs seaTunnelCommandArgs = CommandLineUtils.parseSeaTunnelArgs(args);
- Command<SeaTunnelCommandArgs> command =
- new SeaTunnelCommandBuilder().buildCommand(seaTunnelCommandArgs);
+ ServerCommandArgs serverCommandArgs = new ServerCommandArgs();
+ Command<ServerCommandArgs> command =
+ new ServerCommandBuilder().buildCommand(serverCommandArgs);
Seatunnel.run(command);
}
+
}
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 077ddc966..0ddee1d08 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -68,12 +68,7 @@ commons-collections-3.2.2.jar
commons-collections4-4.4.jar
commons-compiler-3.0.9.jar
commons-compiler-3.1.6.jar
-commons-compress-1.18.jar
-commons-compress-1.19.jar
commons-compress-1.20.jar
-commons-compress-1.21.jar
-commons-compress-1.4.1.jar
-commons-compress-1.8.1.jar
commons-configuration-1.7.jar
commons-configuration2-2.1.1.jar
commons-crypto-1.0.0.jar
@@ -722,7 +717,6 @@ xercesImpl-2.9.1.jar
xml-apis-1.3.04.jar
xmlbeans-3.1.0.jar
xmlenc-0.52.jar
-xz-1.0.jar
xz-1.5.jar
xz-1.8.jar
zkclient-0.3.jar