You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2023/04/09 12:26:35 UTC
[incubator-seatunnel] branch dev updated: [Improve][Starter] Optimize code structure & remove redundant code (#4525)
This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c24e39927 [Improve][Starter] Optimize code structure & remove redundant code (#4525)
c24e39927 is described below
commit c24e39927f9946015398855fd9e1907b3bd014da
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Sun Apr 9 20:26:27 2023 +0800
[Improve][Starter] Optimize code structure & remove redundant code (#4525)
* [Improve][Starter] Optimize code structure & remove redundant code
* [Improve][Starter] Revert SeaTunnelApiExample
* [Improve][Starter] Fix ci error
---
seatunnel-core/seatunnel-flink-starter/pom.xml | 1 +
.../seatunnel-flink-13-starter/pom.xml | 14 +-
.../seatunnel-flink-15-starter/pom.xml | 6 +
.../core/starter/flink/args/FlinkCommandArgs.java | 138 --------------
.../flink/command/FlinkConfValidateCommand.java | 44 -----
.../flink/command/FlinkTaskExecuteCommand.java | 66 -------
.../FlinkAbstractPluginExecuteProcessor.java | 91 ---------
.../starter/flink/execution/FlinkExecution.java | 207 ---------------------
.../flink/execution/SourceExecuteProcessor.java | 147 ---------------
.../flink/execution/TransformExecuteProcessor.java | 138 --------------
.../core/starter/flink/utils/ConfigKeyName.java | 49 -----
.../core/starter/flink/utils/EnvironmentUtil.java | 120 ------------
.../core/starter/flink/utils/TableUtil.java | 50 -----
.../pom.xml | 7 +-
.../seatunnel/core/starter/flink/FlinkStarter.java | 0
.../core/starter/flink/SeaTunnelFlink.java | 0
.../core/starter/flink/args/FlinkCommandArgs.java | 0
.../flink/command/FlinkConfValidateCommand.java | 0
.../flink/command/FlinkTaskExecuteCommand.java | 0
.../FlinkAbstractPluginExecuteProcessor.java | 0
.../starter/flink/execution/FlinkExecution.java | 0
.../flink/execution/FlinkRuntimeEnvironment.java | 0
.../flink/execution/SinkExecuteProcessor.java | 0
.../flink/execution/SourceExecuteProcessor.java | 0
.../flink/execution/TransformExecuteProcessor.java | 0
.../core/starter/flink/utils/ConfigKeyName.java | 0
.../core/starter/flink/utils/EnvironmentUtil.java | 0
.../core/starter/flink/utils/TableUtil.java | 0
seatunnel-core/seatunnel-spark-starter/pom.xml | 1 +
.../seatunnel-spark-2-starter/pom.xml | 12 ++
.../seatunnel-spark-3-starter/pom.xml | 18 +-
.../core/starter/spark/args/SparkCommandArgs.java | 89 ---------
.../spark/command/SparkConfValidateCommand.java | 44 -----
.../spark/command/SparkTaskExecuteCommand.java | 66 -------
.../spark/execution/SourceExecuteProcessor.java | 111 -----------
.../SparkAbstractPluginExecuteProcessor.java | 78 --------
.../starter/spark/execution/SparkExecution.java | 81 --------
.../spark/execution/SparkRuntimeEnvironment.java | 179 ------------------
.../spark/execution/TransformExecuteProcessor.java | 173 -----------------
.../pom.xml | 6 +-
.../core/starter/spark/SeaTunnelSpark.java | 0
.../seatunnel/core/starter/spark/SparkStarter.java | 0
.../core/starter/spark/args/SparkCommandArgs.java | 0
.../spark/command/SparkConfValidateCommand.java | 0
.../spark/command/SparkTaskExecuteCommand.java | 0
.../spark/execution/SinkExecuteProcessor.java | 0
.../spark/execution/SourceExecuteProcessor.java | 0
.../SparkAbstractPluginExecuteProcessor.java | 0
.../starter/spark/execution/SparkExecution.java | 0
.../spark/execution/SparkRuntimeEnvironment.java | 0
.../spark/execution/TransformExecuteProcessor.java | 0
51 files changed, 54 insertions(+), 1882 deletions(-)
diff --git a/seatunnel-core/seatunnel-flink-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/pom.xml
index 969397088..3b4f5442c 100644
--- a/seatunnel-core/seatunnel-flink-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/pom.xml
@@ -35,6 +35,7 @@
<modules>
<module>seatunnel-flink-13-starter</module>
<module>seatunnel-flink-15-starter</module>
+ <module>seatunnel-flink-starter-common</module>
</modules>
<properties>
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/pom.xml
index c41e48bc1..3eb93c5c0 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/pom.xml
@@ -31,6 +31,18 @@
<dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-flink-starter-common</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<!-- flink-translation -->
<dependency>
<groupId>org.apache.seatunnel</groupId>
@@ -66,7 +78,7 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${flink.1.13.6.version}</version>
- <scope>provided</scope>
+ <scope>${flink.scope}</scope>
</dependency>
</dependencies>
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml
index 97fb280c6..c92ae6c96 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml
@@ -31,6 +31,12 @@
<dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-flink-starter-common</artifactId>
+ <version>${revision}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-translation-flink-15</artifactId>
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
deleted file mode 100644
index ff098b9df..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.args;
-
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
-import org.apache.seatunnel.core.starter.command.Command;
-import org.apache.seatunnel.core.starter.command.ConfDecryptCommand;
-import org.apache.seatunnel.core.starter.command.ConfEncryptCommand;
-import org.apache.seatunnel.core.starter.enums.MasterType;
-import org.apache.seatunnel.core.starter.flink.command.FlinkConfValidateCommand;
-import org.apache.seatunnel.core.starter.flink.command.FlinkTaskExecuteCommand;
-
-import com.beust.jcommander.IStringConverter;
-import com.beust.jcommander.Parameter;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-import java.util.ArrayList;
-import java.util.List;
-
-@EqualsAndHashCode(callSuper = true)
-@Data
-public class FlinkCommandArgs extends AbstractCommandArgs {
-
- @Parameter(
- names = {"-e", "--deploy-mode"},
- converter = FlinkDeployModeConverter.class,
- description = "Flink job deploy mode, support [run, run-application]")
- private DeployMode deployMode = DeployMode.RUN;
-
- @Parameter(
- names = {"--master", "--target"},
- converter = FlinkMasterTargetConverter.class,
- description =
- "Flink job submitted target master, support [local, remote, yarn-session, yarn-per-job, "
- + "kubernetes-session, yarn-application, kubernetes-application]")
- private MasterType masterType;
-
- @Override
- public Command<?> buildCommand() {
- Common.setDeployMode(getDeployMode());
- if (checkConfig) {
- return new FlinkConfValidateCommand(this);
- }
- if (encrypt) {
- return new ConfEncryptCommand(this);
- }
- if (decrypt) {
- return new ConfDecryptCommand(this);
- }
- return new FlinkTaskExecuteCommand(this);
- }
-
- @Override
- public String toString() {
- return "FlinkCommandArgs{"
- + "deployMode="
- + deployMode
- + ", masterType="
- + masterType
- + ", configFile='"
- + configFile
- + '\''
- + ", variables="
- + variables
- + ", jobName='"
- + jobName
- + '\''
- + ", originalParameters="
- + originalParameters
- + '}';
- }
-
- public static class FlinkMasterTargetConverter implements IStringConverter<MasterType> {
- private static final List<MasterType> MASTER_TYPE_LIST = new ArrayList<>();
-
- static {
- MASTER_TYPE_LIST.add(MasterType.LOCAL);
- MASTER_TYPE_LIST.add(MasterType.REMOTE);
- MASTER_TYPE_LIST.add(MasterType.YARN_SESSION);
- MASTER_TYPE_LIST.add(MasterType.YARN_PER_JOB);
- MASTER_TYPE_LIST.add(MasterType.KUBERNETES_SESSION);
- MASTER_TYPE_LIST.add(MasterType.YARN_APPLICATION);
- MASTER_TYPE_LIST.add(MasterType.KUBERNETES_APPLICATION);
- }
-
- @Override
- public MasterType convert(String value) {
- MasterType masterType = MasterType.valueOf(value.toUpperCase().replaceAll("-", "_"));
- if (MASTER_TYPE_LIST.contains(masterType)) {
- return masterType;
- } else {
- throw new IllegalArgumentException(
- "SeaTunnel job on flink engine submitted target only "
- + "support these options: [local, remote, yarn-session, yarn-per-job, kubernetes-session, "
- + "yarn-application, kubernetes-application]");
- }
- }
- }
-
- public static class FlinkDeployModeConverter implements IStringConverter<DeployMode> {
- private static final List<DeployMode> DEPLOY_MODE_TYPE_LIST = new ArrayList<>();
-
- static {
- DEPLOY_MODE_TYPE_LIST.add(DeployMode.RUN);
- DEPLOY_MODE_TYPE_LIST.add(DeployMode.RUN_APPLICATION);
- }
-
- @Override
- public DeployMode convert(String value) {
- DeployMode deployMode = DeployMode.valueOf(value.toUpperCase().replaceAll("-", "_"));
- if (DEPLOY_MODE_TYPE_LIST.contains(deployMode)) {
- return deployMode;
- } else {
- throw new IllegalArgumentException(
- "SeaTunnel job on flink engine deploy mode only "
- + "support these options: [run, run-application]");
- }
- }
- }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java
deleted file mode 100644
index 78921e698..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.command;
-
-import org.apache.seatunnel.core.starter.command.Command;
-import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
-import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.starter.utils.FileUtils;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.nio.file.Path;
-
-/** Use to validate the configuration of the SeaTunnel API. */
-@Slf4j
-public class FlinkConfValidateCommand implements Command<FlinkCommandArgs> {
-
- private final FlinkCommandArgs flinkCommandArgs;
-
- public FlinkConfValidateCommand(FlinkCommandArgs flinkCommandArgs) {
- this.flinkCommandArgs = flinkCommandArgs;
- }
-
- @Override
- public void execute() throws ConfigCheckException {
- Path configPath = FileUtils.getConfigPath(flinkCommandArgs);
- // TODO: validate the config by new api
- }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
deleted file mode 100644
index f8539af75..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.command;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigUtil;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.core.starter.command.Command;
-import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
-import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.starter.flink.execution.FlinkExecution;
-import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
-import org.apache.seatunnel.core.starter.utils.FileUtils;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.nio.file.Path;
-
-import static org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist;
-
-@Slf4j
-public class FlinkTaskExecuteCommand implements Command<FlinkCommandArgs> {
-
- private final FlinkCommandArgs flinkCommandArgs;
-
- public FlinkTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
- this.flinkCommandArgs = flinkCommandArgs;
- }
-
- @Override
- public void execute() throws CommandExecuteException {
- Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
- checkConfigExist(configFile);
- Config config = ConfigBuilder.of(configFile);
- // if user specified job name using command line arguments, override config option
- if (!flinkCommandArgs.getJobName().equals(Constants.LOGO)) {
- config =
- config.withValue(
- ConfigUtil.joinPath("env", "job.name"),
- ConfigValueFactory.fromAnyRef(flinkCommandArgs.getJobName()));
- }
- FlinkExecution seaTunnelTaskExecution = new FlinkExecution(config);
- try {
- seaTunnelTaskExecution.execute();
- } catch (Exception e) {
- throw new CommandExecuteException("Flink job executed failed", e);
- }
- }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
deleted file mode 100644
index e9d36ba06..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.common.utils.ReflectionUtils;
-import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
-import org.apache.seatunnel.core.starter.flink.utils.TableUtil;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
-
-import java.net.URL;
-import java.net.URLClassLoader;
-import java.util.List;
-import java.util.Optional;
-import java.util.function.BiConsumer;
-
-public abstract class FlinkAbstractPluginExecuteProcessor<T>
- implements PluginExecuteProcessor<DataStream<Row>, FlinkRuntimeEnvironment> {
- protected static final String ENGINE_TYPE = "seatunnel";
- protected static final String PLUGIN_NAME = "plugin_name";
- protected static final String SOURCE_TABLE_NAME = "source_table_name";
-
- protected static final BiConsumer<ClassLoader, URL> ADD_URL_TO_CLASSLOADER =
- (classLoader, url) -> {
- if (classLoader.getClass().getName().endsWith("SafetyNetWrapperClassLoader")) {
- URLClassLoader c =
- (URLClassLoader) ReflectionUtils.getField(classLoader, "inner").get();
- ReflectionUtils.invoke(c, "addURL", url);
- } else if (classLoader instanceof URLClassLoader) {
- ReflectionUtils.invoke(classLoader, "addURL", url);
- } else {
- throw new RuntimeException(
- "Unsupported classloader: " + classLoader.getClass().getName());
- }
- };
-
- protected FlinkRuntimeEnvironment flinkRuntimeEnvironment;
- protected final List<? extends Config> pluginConfigs;
- protected JobContext jobContext;
- protected final List<T> plugins;
-
- protected FlinkAbstractPluginExecuteProcessor(
- List<URL> jarPaths, List<? extends Config> pluginConfigs, JobContext jobContext) {
- this.pluginConfigs = pluginConfigs;
- this.jobContext = jobContext;
- this.plugins = initializePlugins(jarPaths, pluginConfigs);
- }
-
- @Override
- public void setRuntimeEnvironment(FlinkRuntimeEnvironment flinkRuntimeEnvironment) {
- this.flinkRuntimeEnvironment = flinkRuntimeEnvironment;
- }
-
- protected Optional<DataStream<Row>> fromSourceTable(Config pluginConfig) {
- if (pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
- StreamTableEnvironment tableEnvironment =
- flinkRuntimeEnvironment.getStreamTableEnvironment();
- Table table = tableEnvironment.from(pluginConfig.getString(SOURCE_TABLE_NAME));
- return Optional.ofNullable(TableUtil.tableToDataStream(tableEnvironment, table, true));
- }
- return Optional.empty();
- }
-
- protected void registerResultTable(Config pluginConfig, DataStream<Row> dataStream) {
- flinkRuntimeEnvironment.registerResultTable(pluginConfig, dataStream);
- }
-
- protected abstract List<T> initializePlugins(
- List<URL> jarPaths, List<? extends Config> pluginConfigs);
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
deleted file mode 100644
index a3282cc4a..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigUtil;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.env.EnvCommonOptions;
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
-import org.apache.seatunnel.common.utils.SeaTunnelException;
-import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
-import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
-import org.apache.seatunnel.core.starter.execution.TaskExecution;
-import org.apache.seatunnel.core.starter.flink.FlinkStarter;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.types.Row;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.nio.file.Path;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-/** Used to execute a SeaTunnelTask. */
-@Slf4j
-public class FlinkExecution implements TaskExecution {
- private final FlinkRuntimeEnvironment flinkRuntimeEnvironment;
- private final PluginExecuteProcessor<DataStream<Row>, FlinkRuntimeEnvironment>
- sourcePluginExecuteProcessor;
- private final PluginExecuteProcessor<DataStream<Row>, FlinkRuntimeEnvironment>
- transformPluginExecuteProcessor;
- private final PluginExecuteProcessor<DataStream<Row>, FlinkRuntimeEnvironment>
- sinkPluginExecuteProcessor;
- private final List<URL> jarPaths;
-
- public FlinkExecution(Config config) {
- try {
- jarPaths =
- new ArrayList<>(
- Collections.singletonList(
- new File(
- Common.appStarterDir()
- .resolve(FlinkStarter.APP_JAR_NAME)
- .toString())
- .toURI()
- .toURL()));
- } catch (MalformedURLException e) {
- throw new SeaTunnelException("load flink starter error.", e);
- }
- registerPlugin(config.getConfig("env"));
- JobContext jobContext = new JobContext();
- jobContext.setJobMode(RuntimeEnvironment.getJobMode(config));
-
- this.sourcePluginExecuteProcessor =
- new SourceExecuteProcessor(
- jarPaths, config.getConfigList(Constants.SOURCE), jobContext);
- this.transformPluginExecuteProcessor =
- new TransformExecuteProcessor(
- jarPaths,
- TypesafeConfigUtils.getConfigList(
- config, Constants.TRANSFORM, Collections.emptyList()),
- jobContext);
- this.sinkPluginExecuteProcessor =
- new SinkExecuteProcessor(
- jarPaths, config.getConfigList(Constants.SINK), jobContext);
-
- this.flinkRuntimeEnvironment =
- FlinkRuntimeEnvironment.getInstance(this.registerPlugin(config, jarPaths));
-
- this.sourcePluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
- this.transformPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
- this.sinkPluginExecuteProcessor.setRuntimeEnvironment(flinkRuntimeEnvironment);
- }
-
- @Override
- public void execute() throws TaskExecuteException {
- List<DataStream<Row>> dataStreams = new ArrayList<>();
- dataStreams = sourcePluginExecuteProcessor.execute(dataStreams);
- dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
- sinkPluginExecuteProcessor.execute(dataStreams);
- log.info(
- "Flink Execution Plan: {}",
- flinkRuntimeEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
- log.info("Flink job name: {}", flinkRuntimeEnvironment.getJobName());
- try {
- flinkRuntimeEnvironment
- .getStreamExecutionEnvironment()
- .execute(flinkRuntimeEnvironment.getJobName());
- } catch (Exception e) {
- throw new TaskExecuteException("Execute Flink job error", e);
- }
- }
-
- private void registerPlugin(Config envConfig) {
- List<Path> thirdPartyJars = new ArrayList<>();
- if (envConfig.hasPath(EnvCommonOptions.JARS.key())) {
- thirdPartyJars =
- new ArrayList<>(
- Common.getThirdPartyJars(
- envConfig.getString(EnvCommonOptions.JARS.key())));
- }
- thirdPartyJars.addAll(Common.getPluginsJarDependencies());
- List<URL> jarDependencies =
- Stream.concat(thirdPartyJars.stream(), Common.getLibJars().stream())
- .map(Path::toUri)
- .map(
- uri -> {
- try {
- return uri.toURL();
- } catch (MalformedURLException e) {
- throw new RuntimeException(
- "the uri of jar illegal:" + uri, e);
- }
- })
- .collect(Collectors.toList());
- jarDependencies.forEach(
- url ->
- FlinkAbstractPluginExecuteProcessor.ADD_URL_TO_CLASSLOADER.accept(
- Thread.currentThread().getContextClassLoader(), url));
- jarPaths.addAll(jarDependencies);
- }
-
- private Config registerPlugin(Config config, List<URL> jars) {
- config =
- this.injectJarsToConfig(
- config, ConfigUtil.joinPath("env", "pipeline", "jars"), jars);
- return this.injectJarsToConfig(
- config, ConfigUtil.joinPath("env", "pipeline", "classpaths"), jars);
- }
-
- private Config injectJarsToConfig(Config config, String path, List<URL> jars) {
- List<URL> validJars = new ArrayList<>();
- for (URL jarUrl : jars) {
- if (new File(jarUrl.getFile()).exists()) {
- validJars.add(jarUrl);
- log.info("Inject jar to config: {}", jarUrl);
- } else {
- log.warn("Remove invalid jar when inject jars into config: {}", jarUrl);
- }
- }
-
- if (config.hasPath(path)) {
- Set<URL> paths =
- Arrays.stream(config.getString(path).split(";"))
- .map(
- uri -> {
- try {
- return new URL(uri);
- } catch (MalformedURLException e) {
- throw new RuntimeException(
- "the uri of jar illegal:" + uri, e);
- }
- })
- .collect(Collectors.toSet());
- paths.addAll(validJars);
-
- config =
- config.withValue(
- path,
- ConfigValueFactory.fromAnyRef(
- paths.stream()
- .map(URL::toString)
- .distinct()
- .collect(Collectors.joining(";"))));
-
- } else {
- config =
- config.withValue(
- path,
- ConfigValueFactory.fromAnyRef(
- validJars.stream()
- .map(URL::toString)
- .distinct()
- .collect(Collectors.joining(";"))));
- }
- return config;
- }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
deleted file mode 100644
index a3897a526..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.CommonOptions;
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SupportCoordinate;
-import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.core.starter.enums.PluginType;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
-import org.apache.seatunnel.translation.flink.source.BaseSeaTunnelSourceFunction;
-import org.apache.seatunnel.translation.flink.source.SeaTunnelCoordinatedSource;
-import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.types.Row;
-
-import com.google.common.collect.Lists;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-public class SourceExecuteProcessor extends FlinkAbstractPluginExecuteProcessor<SeaTunnelSource> {
- private static final String PLUGIN_TYPE = PluginType.SOURCE.getType();
-
- public SourceExecuteProcessor(
- List<URL> jarPaths, List<? extends Config> sourceConfigs, JobContext jobContext) {
- super(jarPaths, sourceConfigs, jobContext);
- }
-
- @Override
- public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) {
- StreamExecutionEnvironment executionEnvironment =
- flinkRuntimeEnvironment.getStreamExecutionEnvironment();
- List<DataStream<Row>> sources = new ArrayList<>();
- for (int i = 0; i < plugins.size(); i++) {
- SeaTunnelSource internalSource = plugins.get(i);
- BaseSeaTunnelSourceFunction sourceFunction;
- if (internalSource instanceof SupportCoordinate) {
- sourceFunction = new SeaTunnelCoordinatedSource(internalSource);
- } else {
- sourceFunction = new SeaTunnelParallelSource(internalSource);
- }
- DataStreamSource<Row> sourceStream =
- addSource(
- executionEnvironment,
- sourceFunction,
- "SeaTunnel " + internalSource.getClass().getSimpleName(),
- internalSource.getBoundedness()
- == org.apache.seatunnel.api.source.Boundedness.BOUNDED);
- Config pluginConfig = pluginConfigs.get(i);
- if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
- int parallelism = pluginConfig.getInt(CommonOptions.PARALLELISM.key());
- sourceStream.setParallelism(parallelism);
- }
- registerResultTable(pluginConfig, sourceStream);
- sources.add(sourceStream);
- }
- return sources;
- }
-
- private DataStreamSource<Row> addSource(
- StreamExecutionEnvironment streamEnv,
- BaseSeaTunnelSourceFunction function,
- String sourceName,
- boolean bounded) {
- checkNotNull(function);
- checkNotNull(sourceName);
- checkNotNull(bounded);
-
- TypeInformation<Row> resolvedTypeInfo = function.getProducedType();
-
- boolean isParallel = function instanceof ParallelSourceFunction;
-
- streamEnv.clean(function);
-
- final StreamSource<Row, ?> sourceOperator = new StreamSource<>(function);
- return new DataStreamSource<>(
- streamEnv,
- resolvedTypeInfo,
- sourceOperator,
- isParallel,
- sourceName,
- bounded ? Boundedness.BOUNDED : Boundedness.CONTINUOUS_UNBOUNDED);
- }
-
- @Override
- protected List<SeaTunnelSource> initializePlugins(
- List<URL> jarPaths, List<? extends Config> pluginConfigs) {
- SeaTunnelSourcePluginDiscovery sourcePluginDiscovery =
- new SeaTunnelSourcePluginDiscovery(ADD_URL_TO_CLASSLOADER);
- List<SeaTunnelSource> sources = new ArrayList<>();
- Set<URL> jars = new HashSet<>();
- for (Config sourceConfig : pluginConfigs) {
- PluginIdentifier pluginIdentifier =
- PluginIdentifier.of(
- ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));
- jars.addAll(
- sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- SeaTunnelSource seaTunnelSource =
- sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
- seaTunnelSource.prepare(sourceConfig);
- seaTunnelSource.setJobContext(jobContext);
- if (jobContext.getJobMode() == JobMode.BATCH
- && seaTunnelSource.getBoundedness()
- == org.apache.seatunnel.api.source.Boundedness.UNBOUNDED) {
- throw new UnsupportedOperationException(
- String.format(
- "'%s' source don't support off-line job.",
- seaTunnelSource.getPluginName()));
- }
- sources.add(seaTunnelSource);
- }
- jarPaths.addAll(jars);
- return sources;
- }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
deleted file mode 100644
index a358fb6f3..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
-import org.apache.seatunnel.translation.flink.serialization.FlinkRowConverter;
-import org.apache.seatunnel.translation.flink.utils.TypeConverterUtils;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.types.Row;
-import org.apache.flink.util.Collector;
-
-import com.google.common.collect.Lists;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class TransformExecuteProcessor
- extends FlinkAbstractPluginExecuteProcessor<SeaTunnelTransform> {
-
- private static final String PLUGIN_TYPE = "transform";
-
- protected TransformExecuteProcessor(
- List<URL> jarPaths, List<? extends Config> pluginConfigs, JobContext jobContext) {
- super(jarPaths, pluginConfigs, jobContext);
- }
-
- @Override
- protected List<SeaTunnelTransform> initializePlugins(
- List<URL> jarPaths, List<? extends Config> pluginConfigs) {
- SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
- new SeaTunnelTransformPluginDiscovery();
- List<URL> pluginJars = new ArrayList<>();
- List<SeaTunnelTransform> transforms =
- pluginConfigs.stream()
- .map(
- transformConfig -> {
- PluginIdentifier pluginIdentifier =
- PluginIdentifier.of(
- ENGINE_TYPE,
- PLUGIN_TYPE,
- transformConfig.getString(PLUGIN_NAME));
- List<URL> pluginJarPaths =
- transformPluginDiscovery.getPluginJarPaths(
- Lists.newArrayList(pluginIdentifier));
- SeaTunnelTransform<?> seaTunnelTransform =
- transformPluginDiscovery.createPluginInstance(
- pluginIdentifier);
- jarPaths.addAll(pluginJarPaths);
- seaTunnelTransform.prepare(transformConfig);
- seaTunnelTransform.setJobContext(jobContext);
- return seaTunnelTransform;
- })
- .distinct()
- .collect(Collectors.toList());
- jarPaths.addAll(pluginJars);
- return transforms;
- }
-
- @Override
- public List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
- throws TaskExecuteException {
- if (plugins.isEmpty()) {
- return upstreamDataStreams;
- }
- DataStream<Row> input = upstreamDataStreams.get(0);
- List<DataStream<Row>> result = new ArrayList<>();
- for (int i = 0; i < plugins.size(); i++) {
- try {
- SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);
- Config pluginConfig = pluginConfigs.get(i);
- DataStream<Row> stream = fromSourceTable(pluginConfig).orElse(input);
- input = flinkTransform(transform, stream);
- registerResultTable(pluginConfig, input);
- result.add(input);
- } catch (Exception e) {
- throw new TaskExecuteException(
- String.format(
- "SeaTunnel transform task: %s execute error",
- plugins.get(i).getPluginName()),
- e);
- }
- }
- return result;
- }
-
- protected DataStream<Row> flinkTransform(SeaTunnelTransform transform, DataStream<Row> stream) {
- SeaTunnelDataType seaTunnelDataType = TypeConverterUtils.convert(stream.getType());
- transform.setTypeInfo(seaTunnelDataType);
- TypeInformation rowTypeInfo = TypeConverterUtils.convert(transform.getProducedType());
- FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(seaTunnelDataType);
- FlinkRowConverter transformOutputRowConverter =
- new FlinkRowConverter(transform.getProducedType());
- DataStream<Row> output =
- stream.flatMap(
- new FlatMapFunction<Row, Row>() {
- @Override
- public void flatMap(Row value, Collector<Row> out) throws Exception {
- SeaTunnelRow seaTunnelRow =
- transformInputRowConverter.reconvert(value);
- SeaTunnelRow dataRow = (SeaTunnelRow) transform.map(seaTunnelRow);
- if (dataRow != null) {
- Row copy = transformOutputRowConverter.convert(dataRow);
- out.collect(copy);
- }
- }
- },
- rowTypeInfo);
- return output;
- }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
deleted file mode 100644
index cc8229f26..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.utils;
-
-public class ConfigKeyName {
-
- private ConfigKeyName() {
- throw new IllegalStateException("Utility class");
- }
-
- public static final String TIME_CHARACTERISTIC = "execution.time-characteristic";
- public static final String BUFFER_TIMEOUT_MILLIS = "execution.buffer.timeout";
- public static final String PARALLELISM = "execution.parallelism";
- public static final String MAX_PARALLELISM = "execution.max-parallelism";
- @Deprecated public static final String CHECKPOINT_INTERVAL = "execution.checkpoint.interval";
- public static final String CHECKPOINT_MODE = "execution.checkpoint.mode";
- public static final String CHECKPOINT_TIMEOUT = "execution.checkpoint.timeout";
- public static final String CHECKPOINT_DATA_URI = "execution.checkpoint.data-uri";
- public static final String MAX_CONCURRENT_CHECKPOINTS = "execution.max-concurrent-checkpoints";
- public static final String CHECKPOINT_CLEANUP_MODE = "execution.checkpoint.cleanup-mode";
- public static final String MIN_PAUSE_BETWEEN_CHECKPOINTS = "execution.checkpoint.min-pause";
- public static final String FAIL_ON_CHECKPOINTING_ERRORS = "execution.checkpoint.fail-on-error";
- public static final String RESTART_STRATEGY = "execution.restart.strategy";
- public static final String RESTART_ATTEMPTS = "execution.restart.attempts";
- public static final String RESTART_DELAY_BETWEEN_ATTEMPTS =
- "execution.restart.delayBetweenAttempts";
- public static final String RESTART_FAILURE_INTERVAL = "execution.restart.failureInterval";
- public static final String RESTART_FAILURE_RATE = "execution.restart.failureRate";
- public static final String RESTART_DELAY_INTERVAL = "execution.restart.delayInterval";
- public static final String MAX_STATE_RETENTION_TIME = "execution.query.state.max-retention";
- public static final String MIN_STATE_RETENTION_TIME = "execution.query.state.min-retention";
- public static final String STATE_BACKEND = "execution.state.backend";
- public static final String PLANNER = "execution.planner";
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java
deleted file mode 100644
index f15638cbb..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.utils;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.common.config.CheckResult;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.PipelineOptions;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.concurrent.TimeUnit;
-
-@Slf4j
-public final class EnvironmentUtil {
-
- private EnvironmentUtil() {}
-
- public static void setRestartStrategy(Config config, ExecutionConfig executionConfig) {
- try {
- if (config.hasPath(ConfigKeyName.RESTART_STRATEGY)) {
- String restartStrategy = config.getString(ConfigKeyName.RESTART_STRATEGY);
- switch (restartStrategy.toLowerCase()) {
- case "no":
- executionConfig.setRestartStrategy(RestartStrategies.noRestart());
- break;
- case "fixed-delay":
- int attempts = config.getInt(ConfigKeyName.RESTART_ATTEMPTS);
- long delay = config.getLong(ConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS);
- executionConfig.setRestartStrategy(
- RestartStrategies.fixedDelayRestart(attempts, delay));
- break;
- case "failure-rate":
- long failureInterval =
- config.getLong(ConfigKeyName.RESTART_FAILURE_INTERVAL);
- int rate = config.getInt(ConfigKeyName.RESTART_FAILURE_RATE);
- long delayInterval = config.getLong(ConfigKeyName.RESTART_DELAY_INTERVAL);
- executionConfig.setRestartStrategy(
- RestartStrategies.failureRateRestart(
- rate,
- Time.of(failureInterval, TimeUnit.MILLISECONDS),
- Time.of(delayInterval, TimeUnit.MILLISECONDS)));
- break;
- default:
- log.warn(
- "set restart.strategy failed, unknown restart.strategy [{}],only support no,fixed-delay,failure-rate",
- restartStrategy);
- }
- }
- } catch (Exception e) {
- log.warn("set restart.strategy in config '{}' exception", config, e);
- }
- }
-
- public static CheckResult checkRestartStrategy(Config config) {
- if (config.hasPath(ConfigKeyName.RESTART_STRATEGY)) {
- String restartStrategy = config.getString(ConfigKeyName.RESTART_STRATEGY);
- switch (restartStrategy.toLowerCase()) {
- case "fixed-delay":
- if (!(config.hasPath(ConfigKeyName.RESTART_ATTEMPTS)
- && config.hasPath(ConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS))) {
- return CheckResult.error(
- String.format(
- "fixed-delay restart strategy must set [%s],[%s]",
- ConfigKeyName.RESTART_ATTEMPTS,
- ConfigKeyName.RESTART_DELAY_BETWEEN_ATTEMPTS));
- }
- break;
- case "failure-rate":
- if (!(config.hasPath(ConfigKeyName.RESTART_FAILURE_INTERVAL)
- && config.hasPath(ConfigKeyName.RESTART_FAILURE_RATE)
- && config.hasPath(ConfigKeyName.RESTART_DELAY_INTERVAL))) {
- return CheckResult.error(
- String.format(
- "failure-rate restart strategy must set [%s],[%s],[%s]",
- ConfigKeyName.RESTART_FAILURE_INTERVAL,
- ConfigKeyName.RESTART_FAILURE_RATE,
- ConfigKeyName.RESTART_DELAY_INTERVAL));
- }
- break;
- default:
- return CheckResult.success();
- }
- }
- return CheckResult.success();
- }
-
- public static void initConfiguration(Config config, Configuration configuration) {
- if (config.hasPath("pipeline")) {
- Config pipeline = config.getConfig("pipeline");
- if (pipeline.hasPath("jars")) {
- configuration.setString(PipelineOptions.JARS.key(), pipeline.getString("jars"));
- }
- if (pipeline.hasPath("classpaths")) {
- configuration.setString(
- PipelineOptions.CLASSPATHS.key(), pipeline.getString("classpaths"));
- }
- }
- }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
deleted file mode 100644
index ca1603cdf..000000000
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.utils;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.table.api.Table;
-import org.apache.flink.table.api.TableEnvironment;
-import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
-import org.apache.flink.types.Row;
-
-import java.util.Arrays;
-
-public final class TableUtil {
-
- private TableUtil() {}
-
- public static DataStream<Row> tableToDataStream(
- StreamTableEnvironment tableEnvironment, Table table, boolean isAppend) {
-
- TypeInformation<Row> typeInfo = table.getSchema().toRowType();
- if (isAppend) {
- return tableEnvironment.toAppendStream(table, typeInfo);
- }
- return tableEnvironment
- .toRetractStream(table, typeInfo)
- .filter(row -> row.f0)
- .map(row -> row.f1)
- .returns(typeInfo);
- }
-
- public static boolean tableExists(TableEnvironment tableEnvironment, String name) {
- return Arrays.asList(tableEnvironment.listTables()).contains(name);
- }
-}
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/pom.xml
similarity index 93%
copy from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml
copy to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/pom.xml
index 97fb280c6..938c088a2 100644
--- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/pom.xml
+++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/pom.xml
@@ -26,8 +26,10 @@
<version>${revision}</version>
</parent>
- <artifactId>seatunnel-flink-15-starter</artifactId>
- <name>SeaTunnel : Core : Flink Starter : 1.5</name>
+ <artifactId>seatunnel-flink-starter-common</artifactId>
+ <packaging>jar</packaging>
+
+ <name>SeaTunnel : Core : Flink Starter : Common</name>
<dependencies>
@@ -35,6 +37,7 @@
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-translation-flink-15</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/FlinkStarter.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/SeaTunnelFlink.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/args/FlinkCommandArgs.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkConfValidateCommand.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkTaskExecuteCommand.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkAbstractPluginExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/ConfigKeyName.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/EnvironmentUtil.java
diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
similarity index 100%
rename from seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
rename to seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/utils/TableUtil.java
diff --git a/seatunnel-core/seatunnel-spark-starter/pom.xml b/seatunnel-core/seatunnel-spark-starter/pom.xml
index 13f5ded0c..4502d72c9 100644
--- a/seatunnel-core/seatunnel-spark-starter/pom.xml
+++ b/seatunnel-core/seatunnel-spark-starter/pom.xml
@@ -33,6 +33,7 @@
<modules>
<module>seatunnel-spark-2-starter</module>
<module>seatunnel-spark-3-starter</module>
+ <module>seatunnel-spark-starter-common</module>
</modules>
<properties>
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/pom.xml b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/pom.xml
index e3773cff9..1dee3c5a3 100644
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/pom.xml
+++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/pom.xml
@@ -58,6 +58,18 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-spark-starter-common</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
</dependencies>
</project>
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/pom.xml b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/pom.xml
index 0e7fb1bb3..838723c7b 100644
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/pom.xml
+++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/pom.xml
@@ -35,12 +35,6 @@
<dependencies>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-translation-spark-3.3</artifactId>
- <version>${project.version}</version>
- </dependency>
-
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.binary.version}</artifactId>
@@ -62,6 +56,18 @@
<scope>${spark.scope}</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-translation-spark-3.3</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-spark-starter-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
</dependencies>
</project>
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
deleted file mode 100644
index 841383927..000000000
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.args;
-
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
-import org.apache.seatunnel.core.starter.command.Command;
-import org.apache.seatunnel.core.starter.command.ConfDecryptCommand;
-import org.apache.seatunnel.core.starter.command.ConfEncryptCommand;
-import org.apache.seatunnel.core.starter.spark.command.SparkConfValidateCommand;
-import org.apache.seatunnel.core.starter.spark.command.SparkTaskExecuteCommand;
-
-import com.beust.jcommander.IStringConverter;
-import com.beust.jcommander.Parameter;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-import java.util.ArrayList;
-import java.util.List;
-
-@EqualsAndHashCode(callSuper = true)
-@Data
-public class SparkCommandArgs extends AbstractCommandArgs {
-
- @Parameter(
- names = {"-e", "--deploy-mode"},
- description = "Spark deploy mode, support [cluster, client]",
- converter = SparkDeployModeConverter.class)
- private DeployMode deployMode = DeployMode.CLIENT;
-
- @Parameter(
- names = {"-m", "--master"},
- description =
- "Spark master, support [spark://host:port, mesos://host:port, yarn, "
- + "k8s://https://host:port, local], default local[*]")
- private String master = "local[*]";
-
- @Override
- public Command<?> buildCommand() {
- Common.setDeployMode(getDeployMode());
- if (checkConfig) {
- return new SparkConfValidateCommand(this);
- }
- if (encrypt) {
- return new ConfEncryptCommand(this);
- }
- if (decrypt) {
- return new ConfDecryptCommand(this);
- }
- return new SparkTaskExecuteCommand(this);
- }
-
- public static class SparkDeployModeConverter implements IStringConverter<DeployMode> {
- private static final List<DeployMode> DEPLOY_MODE_TYPE_LIST = new ArrayList<>();
-
- static {
- DEPLOY_MODE_TYPE_LIST.add(DeployMode.CLIENT);
- DEPLOY_MODE_TYPE_LIST.add(DeployMode.CLUSTER);
- }
-
- @Override
- public DeployMode convert(String value) {
- DeployMode deployMode = DeployMode.valueOf(value.toUpperCase());
- if (DEPLOY_MODE_TYPE_LIST.contains(deployMode)) {
- return deployMode;
- } else {
- throw new IllegalArgumentException(
- "SeaTunnel job on spark engine deploy mode only "
- + "support these options: [cluster, client]");
- }
- }
- }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkConfValidateCommand.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkConfValidateCommand.java
deleted file mode 100644
index 9da967970..000000000
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkConfValidateCommand.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.command;
-
-import org.apache.seatunnel.core.starter.command.Command;
-import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
-import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.starter.utils.FileUtils;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.nio.file.Path;
-
-/** Use to validate the configuration of the SeaTunnel API. */
-@Slf4j
-public class SparkConfValidateCommand implements Command<SparkCommandArgs> {
-
- private final SparkCommandArgs sparkCommandArgs;
-
- public SparkConfValidateCommand(SparkCommandArgs sparkCommandArgs) {
- this.sparkCommandArgs = sparkCommandArgs;
- }
-
- @Override
- public void execute() throws ConfigCheckException {
- Path configPath = FileUtils.getConfigPath(sparkCommandArgs);
- // TODO: validate the config by new api
- }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
deleted file mode 100644
index 463c1dc65..000000000
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.command;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigUtil;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.core.starter.command.Command;
-import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
-import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.starter.spark.execution.SparkExecution;
-import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
-import org.apache.seatunnel.core.starter.utils.FileUtils;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.nio.file.Path;
-
-import static org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist;
-
-@Slf4j
-public class SparkTaskExecuteCommand implements Command<SparkCommandArgs> {
-
- private final SparkCommandArgs sparkCommandArgs;
-
- public SparkTaskExecuteCommand(SparkCommandArgs sparkCommandArgs) {
- this.sparkCommandArgs = sparkCommandArgs;
- }
-
- @Override
- public void execute() throws CommandExecuteException {
- Path configFile = FileUtils.getConfigPath(sparkCommandArgs);
- checkConfigExist(configFile);
- Config config = ConfigBuilder.of(configFile);
- if (!sparkCommandArgs.getJobName().equals(Constants.LOGO)) {
- config =
- config.withValue(
- ConfigUtil.joinPath("env", "job.name"),
- ConfigValueFactory.fromAnyRef(sparkCommandArgs.getJobName()));
- }
- try {
- SparkExecution seaTunnelTaskExecution = new SparkExecution(config);
- seaTunnelTaskExecution.execute();
- } catch (Exception e) {
- log.error("Run SeaTunnel on spark failed.", e);
- throw new CommandExecuteException(e.getMessage());
- }
- }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
deleted file mode 100644
index d68aec3c2..000000000
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.CommonOptions;
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.utils.SerializationUtils;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
-import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
-
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.types.StructType;
-
-import com.google.common.collect.Lists;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-public class SourceExecuteProcessor
- extends SparkAbstractPluginExecuteProcessor<SeaTunnelSource<?, ?, ?>> {
- private static final String PLUGIN_TYPE = "source";
-
- public SourceExecuteProcessor(
- SparkRuntimeEnvironment sparkEnvironment,
- JobContext jobContext,
- List<? extends Config> sourceConfigs) {
- super(sparkEnvironment, jobContext, sourceConfigs);
- }
-
- @Override
- public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) {
- List<Dataset<Row>> sources = new ArrayList<>();
- for (int i = 0; i < plugins.size(); i++) {
- SeaTunnelSource<?, ?, ?> source = plugins.get(i);
- Config pluginConfig = pluginConfigs.get(i);
- int parallelism;
- if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
- parallelism = pluginConfig.getInt(CommonOptions.PARALLELISM.key());
- } else {
- parallelism =
- sparkRuntimeEnvironment
- .getSparkConf()
- .getInt(
- CommonOptions.PARALLELISM.key(),
- CommonOptions.PARALLELISM.defaultValue());
- }
- Dataset<Row> dataset =
- sparkRuntimeEnvironment
- .getSparkSession()
- .read()
- .format(SeaTunnelSource.class.getSimpleName())
- .option(CommonOptions.PARALLELISM.key(), parallelism)
- .option(
- Constants.SOURCE_SERIALIZATION,
- SerializationUtils.objectToString(source))
- .schema(
- (StructType)
- TypeConverterUtils.convert(source.getProducedType()))
- .load();
- sources.add(dataset);
- registerInputTempView(pluginConfigs.get(i), dataset);
- }
- return sources;
- }
-
- @Override
- protected List<SeaTunnelSource<?, ?, ?>> initializePlugins(
- List<? extends Config> pluginConfigs) {
- SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
- List<SeaTunnelSource<?, ?, ?>> sources = new ArrayList<>();
- Set<URL> jars = new HashSet<>();
- for (Config sourceConfig : pluginConfigs) {
- PluginIdentifier pluginIdentifier =
- PluginIdentifier.of(
- ENGINE_TYPE, PLUGIN_TYPE, sourceConfig.getString(PLUGIN_NAME));
- jars.addAll(
- sourcePluginDiscovery.getPluginJarPaths(Lists.newArrayList(pluginIdentifier)));
- SeaTunnelSource<?, ?, ?> seaTunnelSource =
- sourcePluginDiscovery.createPluginInstance(pluginIdentifier);
- seaTunnelSource.prepare(sourceConfig);
- seaTunnelSource.setJobContext(jobContext);
- sources.add(seaTunnelSource);
- }
- sparkRuntimeEnvironment.registerPlugin(new ArrayList<>(jars));
- return sources;
- }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
deleted file mode 100644
index ebfcaf6e9..000000000
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
-
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-
-import java.util.List;
-import java.util.Optional;
-
-public abstract class SparkAbstractPluginExecuteProcessor<T>
- implements PluginExecuteProcessor<Dataset<Row>, SparkRuntimeEnvironment> {
- protected SparkRuntimeEnvironment sparkRuntimeEnvironment;
- protected final List<? extends Config> pluginConfigs;
- protected final JobContext jobContext;
- protected final List<T> plugins;
- protected static final String ENGINE_TYPE = "seatunnel";
- protected static final String PLUGIN_NAME = "plugin_name";
- protected static final String RESULT_TABLE_NAME = "result_table_name";
- protected static final String SOURCE_TABLE_NAME = "source_table_name";
-
- protected SparkAbstractPluginExecuteProcessor(
- SparkRuntimeEnvironment sparkRuntimeEnvironment,
- JobContext jobContext,
- List<? extends Config> pluginConfigs) {
- this.sparkRuntimeEnvironment = sparkRuntimeEnvironment;
- this.jobContext = jobContext;
- this.pluginConfigs = pluginConfigs;
- this.plugins = initializePlugins(pluginConfigs);
- }
-
- @Override
- public void setRuntimeEnvironment(SparkRuntimeEnvironment sparkRuntimeEnvironment) {
- this.sparkRuntimeEnvironment = sparkRuntimeEnvironment;
- }
-
- protected abstract List<T> initializePlugins(List<? extends Config> pluginConfigs);
-
- protected void registerInputTempView(Config pluginConfig, Dataset<Row> dataStream) {
- if (pluginConfig.hasPath(RESULT_TABLE_NAME)) {
- String tableName = pluginConfig.getString(RESULT_TABLE_NAME);
- registerTempView(tableName, dataStream);
- }
- }
-
- protected Optional<Dataset<Row>> fromSourceTable(
- Config pluginConfig, SparkRuntimeEnvironment sparkRuntimeEnvironment) {
- if (!pluginConfig.hasPath(SOURCE_TABLE_NAME)) {
- return Optional.empty();
- }
- String sourceTableName = pluginConfig.getString(SOURCE_TABLE_NAME);
- return Optional.of(sparkRuntimeEnvironment.getSparkSession().read().table(sourceTableName));
- }
-
- private void registerTempView(String tableName, Dataset<Row> ds) {
- ds.createOrReplaceTempView(tableName);
- }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
deleted file mode 100644
index dab00cbdb..000000000
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.config.TypesafeConfigUtils;
-import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.core.starter.execution.PluginExecuteProcessor;
-import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
-import org.apache.seatunnel.core.starter.execution.TaskExecution;
-
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-@Slf4j
-public class SparkExecution implements TaskExecution {
- private final SparkRuntimeEnvironment sparkRuntimeEnvironment;
- private final PluginExecuteProcessor<Dataset<Row>, SparkRuntimeEnvironment>
- sourcePluginExecuteProcessor;
- private final PluginExecuteProcessor<Dataset<Row>, SparkRuntimeEnvironment>
- transformPluginExecuteProcessor;
- private final PluginExecuteProcessor<Dataset<Row>, SparkRuntimeEnvironment>
- sinkPluginExecuteProcessor;
-
- public SparkExecution(Config config) {
- this.sparkRuntimeEnvironment = SparkRuntimeEnvironment.getInstance(config);
- JobContext jobContext = new JobContext();
- jobContext.setJobMode(RuntimeEnvironment.getJobMode(config));
- this.sourcePluginExecuteProcessor =
- new SourceExecuteProcessor(
- sparkRuntimeEnvironment,
- jobContext,
- config.getConfigList(Constants.SOURCE));
- this.transformPluginExecuteProcessor =
- new TransformExecuteProcessor(
- sparkRuntimeEnvironment,
- jobContext,
- TypesafeConfigUtils.getConfigList(
- config, Constants.TRANSFORM, Collections.emptyList()));
- this.sinkPluginExecuteProcessor =
- new SinkExecuteProcessor(
- sparkRuntimeEnvironment, jobContext, config.getConfigList(Constants.SINK));
- }
-
- @Override
- public void execute() throws TaskExecuteException {
- List<Dataset<Row>> datasets = new ArrayList<>();
- datasets = sourcePluginExecuteProcessor.execute(datasets);
- datasets = transformPluginExecuteProcessor.execute(datasets);
- sinkPluginExecuteProcessor.execute(datasets);
- log.info("Spark Execution started");
- }
-
- public SparkRuntimeEnvironment getSparkRuntimeEnvironment() {
- return sparkRuntimeEnvironment;
- }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
deleted file mode 100644
index 7e31ca463..000000000
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.common.Constants;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.core.starter.enums.PluginType;
-import org.apache.seatunnel.core.starter.execution.RuntimeEnvironment;
-
-import org.apache.spark.SparkConf;
-import org.apache.spark.sql.SparkSession;
-import org.apache.spark.streaming.Seconds;
-import org.apache.spark.streaming.StreamingContext;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.net.URL;
-import java.util.List;
-
-@Slf4j
-public class SparkRuntimeEnvironment implements RuntimeEnvironment {
- private static final long DEFAULT_SPARK_STREAMING_DURATION = 5;
- private static final String PLUGIN_NAME_KEY = "plugin_name";
- private static volatile SparkRuntimeEnvironment INSTANCE = null;
-
- private SparkConf sparkConf;
-
- private SparkSession sparkSession;
-
- private StreamingContext streamingContext;
-
- private Config config;
-
- private boolean enableHive = false;
-
- private JobMode jobMode;
-
- private String jobName = Constants.LOGO;
-
- private SparkRuntimeEnvironment(Config config) {
- this.setEnableHive(checkIsContainHive(config));
- this.initialize(config);
- }
-
- public void setEnableHive(boolean enableHive) {
- this.enableHive = enableHive;
- }
-
- @Override
- public RuntimeEnvironment setConfig(Config config) {
- this.config = config;
- return this;
- }
-
- @Override
- public RuntimeEnvironment setJobMode(JobMode mode) {
- this.jobMode = mode;
- return this;
- }
-
- @Override
- public JobMode getJobMode() {
- return jobMode;
- }
-
- @Override
- public Config getConfig() {
- return this.config;
- }
-
- @Override
- public CheckResult checkConfig() {
- return CheckResult.success();
- }
-
- @Override
- public void registerPlugin(List<URL> pluginPaths) {
- log.info("register plugins :" + pluginPaths);
- // TODO we use --jar parameter to support submit multi-jar in spark cluster at now. Refactor
- // it to
- // support submit multi-jar in code or remove this logic.
- // this.sparkSession.conf().set("spark.jars",pluginPaths.stream().map(URL::getPath).collect(Collectors.joining(",")));
- }
-
- @Override
- public SparkRuntimeEnvironment prepare() {
- if (config.hasPath("job.name")) {
- this.jobName = config.getString("job.name");
- }
- sparkConf = createSparkConf();
- SparkSession.Builder builder = SparkSession.builder().config(sparkConf);
- if (enableHive) {
- builder.enableHiveSupport();
- }
- this.sparkSession = builder.getOrCreate();
- createStreamingContext();
- return this;
- }
-
- public SparkSession getSparkSession() {
- return this.sparkSession;
- }
-
- public StreamingContext getStreamingContext() {
- return this.streamingContext;
- }
-
- public SparkConf getSparkConf() {
- return this.sparkConf;
- }
-
- private SparkConf createSparkConf() {
- SparkConf sparkConf = new SparkConf();
- this.config
- .entrySet()
- .forEach(
- entry ->
- sparkConf.set(
- entry.getKey(),
- String.valueOf(entry.getValue().unwrapped())));
- sparkConf.setAppName(jobName);
- return sparkConf;
- }
-
- private void createStreamingContext() {
- SparkConf conf = this.sparkSession.sparkContext().getConf();
- long duration =
- conf.getLong("spark.stream.batchDuration", DEFAULT_SPARK_STREAMING_DURATION);
- if (this.streamingContext == null) {
- this.streamingContext =
- new StreamingContext(sparkSession.sparkContext(), Seconds.apply(duration));
- }
- }
-
- protected boolean checkIsContainHive(Config config) {
- List<? extends Config> sourceConfigList = config.getConfigList(PluginType.SOURCE.getType());
- for (Config c : sourceConfigList) {
- if (c.getString(PLUGIN_NAME_KEY).toLowerCase().contains("hive")) {
- return true;
- }
- }
- List<? extends Config> sinkConfigList = config.getConfigList(PluginType.SINK.getType());
- for (Config c : sinkConfigList) {
- if (c.getString(PLUGIN_NAME_KEY).toLowerCase().contains("hive")) {
- return true;
- }
- }
- return false;
- }
-
- public static SparkRuntimeEnvironment getInstance(Config config) {
- if (INSTANCE == null) {
- synchronized (SparkRuntimeEnvironment.class) {
- if (INSTANCE == null) {
- INSTANCE = new SparkRuntimeEnvironment(config);
- }
- }
- }
- return INSTANCE;
- }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
deleted file mode 100644
index f82c465ec..000000000
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.spark.execution;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelTransformPluginDiscovery;
-import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
-
-import org.apache.spark.api.java.function.MapPartitionsFunction;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
-import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder;
-import org.apache.spark.sql.catalyst.encoders.RowEncoder;
-import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
-import org.apache.spark.sql.types.StructType;
-
-import com.google.common.collect.Lists;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.stream.Collectors;
-
-@Slf4j
-public class TransformExecuteProcessor
- extends SparkAbstractPluginExecuteProcessor<SeaTunnelTransform> {
-
- private static final String PLUGIN_TYPE = "transform";
-
- protected TransformExecuteProcessor(
- SparkRuntimeEnvironment sparkRuntimeEnvironment,
- JobContext jobContext,
- List<? extends Config> pluginConfigs) {
- super(sparkRuntimeEnvironment, jobContext, pluginConfigs);
- }
-
- @Override
- protected List<SeaTunnelTransform> initializePlugins(List<? extends Config> pluginConfigs) {
- SeaTunnelTransformPluginDiscovery transformPluginDiscovery =
- new SeaTunnelTransformPluginDiscovery();
- List<URL> pluginJars = new ArrayList<>();
- List<SeaTunnelTransform> transforms =
- pluginConfigs.stream()
- .map(
- transformConfig -> {
- PluginIdentifier pluginIdentifier =
- PluginIdentifier.of(
- ENGINE_TYPE,
- PLUGIN_TYPE,
- transformConfig.getString(PLUGIN_NAME));
- pluginJars.addAll(
- transformPluginDiscovery.getPluginJarPaths(
- Lists.newArrayList(pluginIdentifier)));
- SeaTunnelTransform pluginInstance =
- transformPluginDiscovery.createPluginInstance(
- pluginIdentifier);
- pluginInstance.prepare(transformConfig);
- pluginInstance.setJobContext(jobContext);
- return pluginInstance;
- })
- .distinct()
- .collect(Collectors.toList());
- sparkRuntimeEnvironment.registerPlugin(pluginJars);
- return transforms;
- }
-
- @Override
- public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
- throws TaskExecuteException {
- if (plugins.isEmpty()) {
- return upstreamDataStreams;
- }
- Dataset<Row> input = upstreamDataStreams.get(0);
- List<Dataset<Row>> result = new ArrayList<>();
- for (int i = 0; i < plugins.size(); i++) {
- try {
- SeaTunnelTransform<SeaTunnelRow> transform = plugins.get(i);
- Config pluginConfig = pluginConfigs.get(i);
- Dataset<Row> stream =
- fromSourceTable(pluginConfig, sparkRuntimeEnvironment).orElse(input);
- input = sparkTransform(transform, stream);
- registerInputTempView(pluginConfig, input);
- result.add(input);
- } catch (Exception e) {
- throw new TaskExecuteException(
- String.format(
- "SeaTunnel transform task: %s execute error",
- plugins.get(i).getPluginName()),
- e);
- }
- }
- return result;
- }
-
- private Dataset<Row> sparkTransform(SeaTunnelTransform transform, Dataset<Row> stream)
- throws IOException {
- SeaTunnelDataType<?> seaTunnelDataType = TypeConverterUtils.convert(stream.schema());
- transform.setTypeInfo(seaTunnelDataType);
- StructType structType =
- (StructType) TypeConverterUtils.convert(transform.getProducedType());
- ExpressionEncoder<Row> encoder = RowEncoder.apply(structType);
- return stream.mapPartitions(
- (MapPartitionsFunction<Row, Row>)
- (Iterator<Row> rowIterator) -> {
- TransformIterator iterator =
- new TransformIterator(
- rowIterator, transform, structType);
- return iterator;
- },
- encoder)
- .filter(
- (Row row) -> {
- return row != null;
- });
- }
-
- private static class TransformIterator implements Iterator<Row>, Serializable {
- private Iterator<Row> sourceIterator;
- private SeaTunnelTransform<SeaTunnelRow> transform;
- private StructType structType;
-
- public TransformIterator(
- Iterator<Row> sourceIterator,
- SeaTunnelTransform<SeaTunnelRow> transform,
- StructType structType) {
- this.sourceIterator = sourceIterator;
- this.transform = transform;
- this.structType = structType;
- }
-
- @Override
- public boolean hasNext() {
- return sourceIterator.hasNext();
- }
-
- @Override
- public Row next() {
- Row row = sourceIterator.next();
- SeaTunnelRow seaTunnelRow = new SeaTunnelRow(((GenericRowWithSchema) row).values());
- seaTunnelRow = (SeaTunnelRow) transform.map(seaTunnelRow);
- if (seaTunnelRow == null) {
- return null;
- }
- return new GenericRowWithSchema(seaTunnelRow.getFields(), structType);
- }
- }
-}
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/pom.xml b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/pom.xml
similarity index 92%
copy from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/pom.xml
copy to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/pom.xml
index 0e7fb1bb3..ec4df4bfa 100644
--- a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/pom.xml
+++ b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/pom.xml
@@ -26,8 +26,9 @@
<version>${revision}</version>
</parent>
- <artifactId>seatunnel-spark-3-starter</artifactId>
- <name>SeaTunnel : Core : Spark Starter : 3.3</name>
+ <artifactId>seatunnel-spark-starter-common</artifactId>
+ <packaging>jar</packaging>
+ <name>SeaTunnel : Core : Spark Starter : Common</name>
<properties>
<scala.binary.version>2.12</scala.binary.version>
@@ -39,6 +40,7 @@
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-translation-spark-3.3</artifactId>
<version>${project.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SeaTunnelSpark.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkConfValidateCommand.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkConfValidateCommand.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkConfValidateCommand.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkConfValidateCommand.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkTaskExecuteCommand.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkAbstractPluginExecuteProcessor.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkRuntimeEnvironment.java
diff --git a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
similarity index 100%
rename from seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/seatunnel-spark-starter-common/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java