You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/25 11:26:40 UTC
[incubator-seatunnel] branch api-draft updated: [API-DRAFT]Add spark-core-starter module Code splitting for old and new engines (#1945)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 62d3468a [API-DRAFT]Add spark-core-starter module Code splitting for old and new engines (#1945)
62d3468a is described below
commit 62d3468aef075d53aede6b9a15bd6f5f82ef2ace
Author: Kirs <ki...@apache.org>
AuthorDate: Wed May 25 19:26:35 2022 +0800
[API-DRAFT]Add spark-core-starter module Code splitting for old and new engines (#1945)
* Add spark-core-starter module, code splitting for old and new engines
* Separation of old and new engines
---
seatunnel-core/pom.xml | 1 +
.../core/base/command/AbstractCommandArgs.java | 37 --
.../core/spark/command/SparkCommandBuilder.java | 21 +-
.../starter/config/AbstractExecutionContext.java | 17 +-
.../starter/flink/command/FlinkCommandBuilder.java | 2 +-
seatunnel-core/seatunnel-spark-starter/pom.xml | 189 +++++++++
.../src/main/bin/start-seatunnel-spark.sh | 45 +++
.../src/main/docker/Dockerfile | 37 ++
.../core/starter/spark/SeatunnelSpark.java} | 22 +-
.../seatunnel/core/starter/spark/SparkStarter.java | 429 +++++++++++++++++++++
.../core/starter/spark/args/SparkCommandArgs.java | 62 +++
.../command/SparkApiConfValidateCommand.java} | 22 +-
.../spark/command/SparkApiTaskExecuteCommand.java} | 22 +-
.../spark/command/SparkCommandBuilder.java} | 21 +-
.../spark/config/SparkApiConfigChecker.java} | 8 +-
.../starter/spark/config/SparkEnvironment.java} | 10 +-
.../execution/AbstractPluginExecuteProcessor.java | 2 +-
.../spark/execution/PluginExecuteProcessor.java | 2 +-
.../spark/execution/SinkExecuteProcessor.java | 2 +-
.../spark/execution/SourceExecuteProcessor.java | 2 +-
.../spark/execution/SparkTaskExecution.java} | 12 +-
.../spark/execution/TransformExecuteProcessor.java | 2 +-
.../starter/spark/utils/CommandLineUtils.java} | 25 +-
.../core/starter}/spark/SparkStarterTest.java | 2 +-
.../starter/spark/args/SparkCommandArgsTest.java | 67 ++++
.../starter/spark/utils/CommandLineUtilsTest.java} | 30 +-
.../src/test/resources/spark_application.conf | 66 ++++
.../seatunnel-spark-examples/pom.xml | 2 +-
.../seatunnel/example/spark/LocalSparkExample.java | 52 ---
.../example/spark/SeaTunnelApiExample.java | 12 +-
.../flink/FlinkTransformPluginDiscovery.java | 1 +
.../spark/SparkTransformPluginDiscovery.java | 1 +
32 files changed, 1013 insertions(+), 212 deletions(-)
diff --git a/seatunnel-core/pom.xml b/seatunnel-core/pom.xml
index e6c5cf0e..937fe767 100644
--- a/seatunnel-core/pom.xml
+++ b/seatunnel-core/pom.xml
@@ -37,5 +37,6 @@
<module>seatunnel-core-flink-sql</module>
<module>seatunnel-core-starter</module>
<module>seatunnel-flink-starter</module>
+ <module>seatunnel-spark-starter</module>
</modules>
</project>
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
index 107ec2b1..258b7456 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
@@ -19,10 +19,8 @@ package org.apache.seatunnel.core.base.command;
import org.apache.seatunnel.apis.base.command.CommandArgs;
import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.base.config.ApiType;
import org.apache.seatunnel.core.base.config.EngineType;
-import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
import java.util.Collections;
@@ -39,11 +37,6 @@ public abstract class AbstractCommandArgs implements CommandArgs {
description = "variable substitution, such as -i city=beijing, or -i date=20190318")
private List<String> variables = Collections.emptyList();
- @Parameter(names = {"-api", "--api-type"},
- converter = ApiTypeConverter.class,
- description = "Api type, engine or seatunnel")
- private ApiType apiType = ApiType.ENGINE_API;
-
// todo: use command type enum
@Parameter(names = {"-t", "--check"},
description = "check config")
@@ -86,14 +79,6 @@ public abstract class AbstractCommandArgs implements CommandArgs {
this.help = help;
}
- public ApiType getApiType() {
- return apiType;
- }
-
- public void setApiType(ApiType apiType) {
- this.apiType = apiType;
- }
-
public EngineType getEngineType() {
throw new UnsupportedOperationException("abstract class CommandArgs not support this method");
}
@@ -102,26 +87,4 @@ public abstract class AbstractCommandArgs implements CommandArgs {
throw new UnsupportedOperationException("abstract class CommandArgs not support this method");
}
- /**
- * Used to convert the api type string to the enum value.
- */
- private static class ApiTypeConverter implements IStringConverter<ApiType> {
-
- /**
- * If the '-api' is not set, then will not go into this convert method.
- *
- * @param value input value set by '-api' or '--api-type'
- * @return api type enum value
- */
- @Override
- public ApiType convert(String value) {
- for (ApiType apiType : ApiType.values()) {
- if (apiType.getApiType().equalsIgnoreCase(value)) {
- return apiType;
- }
- }
- throw new IllegalArgumentException(String.format("API type %s not supported", value));
- }
- }
-
}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
index 39512391..65c9582e 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
@@ -30,14 +30,8 @@ public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
throw new IllegalArgumentException(
String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
}
- switch (commandArgs.getApiType()) {
- case ENGINE_API:
- return new SparkApiCommandBuilder().buildCommand(commandArgs);
- case SEATUNNEL_API:
- return new SeaTunnelApiCommandBuilder().buildCommand(commandArgs);
- default:
- throw new IllegalArgumentException("Unsupported API type: " + commandArgs.getApiType());
- }
+
+ return new SparkApiCommandBuilder().buildCommand(commandArgs);
}
/**
@@ -55,16 +49,5 @@ public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
}
}
- /**
- * Used to generate command for seaTunnel API.
- */
- private static class SeaTunnelApiCommandBuilder extends SparkCommandBuilder {
- @Override
- public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
- return commandArgs.isCheckConfig() ? new SeaTunnelApiConfValidateCommand(commandArgs)
- : new SeaTunnelApiTaskExecuteCommand(commandArgs);
- }
- }
-
}
diff --git a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/AbstractExecutionContext.java b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/AbstractExecutionContext.java
index 6099594e..9538e545 100644
--- a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/AbstractExecutionContext.java
+++ b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/AbstractExecutionContext.java
@@ -79,16 +79,13 @@ public abstract class AbstractExecutionContext<ENVIRONMENT extends RuntimeEnv> {
@SuppressWarnings("checkstyle:Indentation")
protected List<PluginIdentifier> getPluginIdentifiers(PluginType... pluginTypes) {
- return Arrays.stream(pluginTypes).flatMap(new Function<PluginType, Stream<PluginIdentifier>>() {
- @Override
- public Stream<PluginIdentifier> apply(PluginType pluginType) {
- List<? extends Config> configList = config.getConfigList(pluginType.getType());
- return configList.stream()
- .map(pluginConfig -> PluginIdentifier
- .of(engine.getEngine(),
- pluginType.getType(),
- pluginConfig.getString("plugin_name")));
- }
+ return Arrays.stream(pluginTypes).flatMap((Function<PluginType, Stream<PluginIdentifier>>) pluginType -> {
+ List<? extends Config> configList = config.getConfigList(pluginType.getType());
+ return configList.stream()
+ .map(pluginConfig -> PluginIdentifier
+ .of(engine.getEngine(),
+ pluginType.getType(),
+ pluginConfig.getString("plugin_name")));
}).collect(Collectors.toList());
}
}
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
index 24661acf..f6e8e4fb 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
@@ -26,7 +26,7 @@ public class FlinkCommandBuilder implements CommandBuilder<FlinkCommandArgs> {
@Override
public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
- if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
+ if (Boolean.FALSE.equals(Common.setDeployMode(commandArgs.getDeployMode().getName()))) {
throw new IllegalArgumentException(
String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
}
diff --git a/seatunnel-core/seatunnel-spark-starter/pom.xml b/seatunnel-core/seatunnel-spark-starter/pom.xml
new file mode 100644
index 00000000..d65b5873
--- /dev/null
+++ b/seatunnel-core/seatunnel-spark-starter/pom.xml
@@ -0,0 +1,189 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-core</artifactId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-spark-starter</artifactId>
+
+ <properties>
+ <docker.repo>seatunnel-spark</docker.repo>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-core-starter</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api-spark</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-translation-spark</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transform-spark-split</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transform-spark-sql</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transform-spark-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transform-spark-replace</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-transform-spark-uuid</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <finalName>${project.name}</finalName>
+ </build>
+
+ <!-- todo <profiles>
+ <profile>
+ <id>docker</id>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>${exec-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>docker-build</id>
+ <phase>package</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <environmentVariables>
+ <DOCKER_BUILDKIT>1</DOCKER_BUILDKIT>
+ </environmentVariables>
+ <executable>docker</executable>
+ <workingDirectory>${project.basedir}</workingDirectory>
+ <arguments>
+ <argument>build</argument>
+ <argument>--no-cache</argument>
+ <argument>--build-arg</argument>
+ <argument>SPARK_VERSION=${spark.version}</argument>
+ <argument>--build-arg</argument>
+ <argument>HADOOP_VERSION=${hadoop.binary.version}</argument>
+ <argument>-t</argument>
+ <argument>${docker.hub}/${docker.repo}:${docker.tag}</argument>
+ <argument>-t</argument>
+ <argument>${docker.hub}/${docker.repo}:latest</argument>
+ <argument>.</argument>
+ <argument>--file=src/main/docker/Dockerfile</argument>
+ </arguments>
+ </configuration>
+ </execution>
+ <execution>
+ <id>docker-push</id>
+ <phase>deploy</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ <configuration>
+ <environmentVariables>
+ <DOCKER_BUILDKIT>1</DOCKER_BUILDKIT>
+ </environmentVariables>
+ <executable>docker</executable>
+ <workingDirectory>${project.basedir}</workingDirectory>
+ <arguments>
+ <argument>buildx</argument>
+ <argument>build</argument>
+ <argument>--no-cache</argument>
+ <argument>--build-arg</argument>
+ <argument>SPARK_VERSION=${spark.version}</argument>
+ <argument>--build-arg</argument>
+ <argument>HADOOP_VERSION=${hadoop.binary.version}</argument>
+ <argument>--push</argument>
+ <argument>-t</argument>
+ <argument>${docker.hub}/${docker.repo}:${docker.tag}</argument>
+ <argument>-t</argument>
+ <argument>${docker.hub}/${docker.repo}:latest</argument>
+ <argument>.</argument>
+ <argument>--file=src/main/docker/Dockerfile</argument>
+ </arguments>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>-->
+</project>
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/bin/start-seatunnel-spark.sh b/seatunnel-core/seatunnel-spark-starter/src/main/bin/start-seatunnel-spark.sh
new file mode 100755
index 00000000..ca6082bb
--- /dev/null
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/bin/start-seatunnel-spark.sh
@@ -0,0 +1,45 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+set -eu
+APP_DIR=$(cd $(dirname ${0})/../;pwd)
+CONF_DIR=${APP_DIR}/config
+APP_JAR=${APP_DIR}/lib/seatunnel-core-spark.jar
+
+if [ -f "${CONF_DIR}/seatunnel-env.sh" ]; then
+ . "${CONF_DIR}/seatunnel-env.sh"
+fi
+
+if [ $# == 0 ]
+then
+ args="-h"
+else
+ args=$@
+fi
+
+CMD=$(java -cp ${APP_JAR} org.apache.seatunnel.core.starter.spark.SparkStarter ${args} | tail -n 1) && EXIT_CODE=$? || EXIT_CODE=$?
+if [ ${EXIT_CODE} -eq 234 ]; then
+ # print usage
+ echo ${CMD}
+ exit 0
+elif [ ${EXIT_CODE} -eq 0 ]; then
+ echo "Execute SeaTunnel Spark Job: ${CMD}"
+ eval ${CMD}
+else
+ echo ${CMD}
+ exit ${EXIT_CODE}
+fi
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/docker/Dockerfile b/seatunnel-core/seatunnel-spark-starter/src/main/docker/Dockerfile
new file mode 100644
index 00000000..9a3b996d
--- /dev/null
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/docker/Dockerfile
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+ARG BASE_IMAGE=adoptopenjdk/openjdk8:jre
+
+FROM $BASE_IMAGE
+
+ARG SPARK_VERSION
+ARG HADOOP_VERSION
+
+RUN mkdir -p /spark ; cd /spark ; \
+ tar_file=spark-${SPARK_VERSION}-bin-hadoop${HADOOP_VERSION}.tgz ; \
+ curl -LsO https://archive.apache.org/dist/spark/spark-${SPARK_VERSION}/$tar_file ; \
+ tar -zxf $tar_file --strip 1 -C . ; \
+ rm $tar_file
+
+ENV SPARK_HOME=/spark
+
+WORKDIR /seatunnel
+
+COPY target/seatunnel-core-spark.jar /seatunnel/lib/
+COPY src/main/bin /seatunnel/bin/
+
+ENTRYPOINT [ "/seatunnel/bin/start-seatunnel-spark.sh" ]
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeatunnelSpark.java
similarity index 51%
copy from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
copy to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeatunnelSpark.java
index 3715c14b..57fdb5d6 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SeatunnelSpark.java
@@ -15,17 +15,21 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.spark.config;
+package org.apache.seatunnel.core.starter.spark;
-import org.apache.seatunnel.core.base.config.ConfigChecker;
-import org.apache.seatunnel.core.base.exception.ConfigCheckException;
+import org.apache.seatunnel.core.starter.Seatunnel;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.exception.CommandException;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.spark.command.SparkCommandBuilder;
+import org.apache.seatunnel.core.starter.spark.utils.CommandLineUtils;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+public class SeatunnelSpark {
-public class SeaTunnelApiConfigChecker implements ConfigChecker<SeaTunnelEnvironment> {
-
- @Override
- public void checkConfig(Config config) throws ConfigCheckException {
- // todo: implement
+ public static void main(String[] args) throws CommandException {
+ SparkCommandArgs sparkArgs = CommandLineUtils.parseSparkArgs(args);
+ Command<SparkCommandArgs> sparkCommand =
+ new SparkCommandBuilder().buildCommand(sparkArgs);
+ Seatunnel.run(sparkCommand);
}
}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
new file mode 100644
index 00000000..5ddcb268
--- /dev/null
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/SparkStarter.java
@@ -0,0 +1,429 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.spark;
+
+import static java.nio.file.FileVisitOption.FOLLOW_LINKS;
+
+import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.starter.Starter;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.core.starter.config.EngineType;
+import org.apache.seatunnel.core.starter.config.PluginType;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.utils.CompressionUtils;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.spark.SparkSinkPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.spark.SparkSourcePluginDiscovery;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.seatunnel.shade.com.typesafe.config.ConfigResolveOptions;
+
+import com.beust.jcommander.JCommander;
+import com.beust.jcommander.UnixStyleUsageFormatter;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * A Starter to generate spark-submit command for SeaTunnel job on spark.
+ */
+public class SparkStarter implements Starter {
+
+ private static final int USAGE_EXIT_CODE = 234;
+
+ private static final int PLUGIN_LIB_DIR_DEPTH = 3;
+
+ /**
+ * original commandline args
+ */
+ protected String[] args;
+
+ /**
+ * args parsed from {@link #args}
+ */
+ protected SparkCommandArgs commandArgs;
+
+ /**
+ * the spark application name
+ */
+ protected String appName;
+
+ /**
+ * jars to include on the spark driver and executor classpaths
+ */
+ protected List<Path> jars = new ArrayList<>();
+
+ /**
+ * files to be placed in the working directory of each spark executor
+ */
+ protected List<Path> files = new ArrayList<>();
+
+ /**
+ * spark configuration properties
+ */
+ protected Map<String, String> sparkConf;
+
+ private SparkStarter(String[] args, SparkCommandArgs commandArgs) {
+ this.args = args;
+ this.commandArgs = commandArgs;
+ }
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ public static void main(String[] args) throws IOException {
+ SparkStarter starter = getInstance(args);
+ List<String> command = starter.buildCommands();
+ System.out.println(String.join(" ", command));
+ }
+
+ /**
+ * method to get SparkStarter instance, will return
+ * {@link ClusterModeSparkStarter} or
+ * {@link ClientModeSparkStarter} depending on deploy mode.
+ */
+ static SparkStarter getInstance(String[] args) {
+ SparkCommandArgs commandArgs = parseCommandArgs(args);
+ DeployMode deployMode = commandArgs.getDeployMode();
+ switch (deployMode) {
+ case CLUSTER:
+ return new ClusterModeSparkStarter(args, commandArgs);
+ case CLIENT:
+ return new ClientModeSparkStarter(args, commandArgs);
+ default:
+ throw new IllegalArgumentException("Deploy mode " + deployMode + " not supported");
+ }
+ }
+
+ /**
+ * parse commandline args
+ */
+ private static SparkCommandArgs parseCommandArgs(String[] args) {
+ SparkCommandArgs commandArgs = new SparkCommandArgs();
+ JCommander commander = JCommander.newBuilder()
+ .programName("start-seatunnel-spark.sh")
+ .addObject(commandArgs)
+ .args(args)
+ .build();
+ if (commandArgs.isHelp()) {
+ commander.setUsageFormatter(new UnixStyleUsageFormatter(commander));
+ commander.usage();
+ System.exit(USAGE_EXIT_CODE);
+ }
+ return commandArgs;
+ }
+
+ @Override
+ public List<String> buildCommands() throws IOException {
+ setSparkConf();
+ Common.setDeployMode(commandArgs.getDeployMode().getName());
+ this.jars.addAll(getPluginsJarDependencies());
+ this.jars.addAll(listJars(Common.appLibDir()));
+ this.jars.addAll(getConnectorJarDependencies());
+ this.appName = this.sparkConf.getOrDefault("spark.app.name", Constants.LOGO);
+ return buildFinal();
+ }
+
+ /**
+ * parse spark configurations from SeaTunnel config file
+ */
+ private void setSparkConf() throws FileNotFoundException {
+ commandArgs.getVariables()
+ .stream()
+ .filter(Objects::nonNull)
+ .map(variable -> variable.split("=", 2))
+ .filter(pair -> pair.length == 2)
+ .forEach(pair -> System.setProperty(pair[0], pair[1]));
+ this.sparkConf = getSparkConf(commandArgs.getConfigFile());
+ String driverJavaOpts = this.sparkConf.getOrDefault("spark.driver.extraJavaOptions", "");
+ String executorJavaOpts = this.sparkConf.getOrDefault("spark.executor.extraJavaOptions", "");
+ if (!commandArgs.getVariables().isEmpty()) {
+ String properties = commandArgs.getVariables()
+ .stream()
+ .map(v -> "-D" + v)
+ .collect(Collectors.joining(" "));
+ driverJavaOpts += " " + properties;
+ executorJavaOpts += " " + properties;
+ this.sparkConf.put("spark.driver.extraJavaOptions", driverJavaOpts.trim());
+ this.sparkConf.put("spark.executor.extraJavaOptions", executorJavaOpts.trim());
+ }
+ }
+
+ /**
+ * Get spark configurations from SeaTunnel job config file.
+ */
+ static Map<String, String> getSparkConf(String configFile) throws FileNotFoundException {
+ File file = new File(configFile);
+ if (!file.exists()) {
+ throw new FileNotFoundException("config file '" + file + "' does not exists!");
+ }
+ Config appConfig = ConfigFactory.parseFile(file)
+ .resolve(ConfigResolveOptions.defaults().setAllowUnresolved(true))
+ .resolveWith(ConfigFactory.systemProperties(), ConfigResolveOptions.defaults().setAllowUnresolved(true));
+
+ return appConfig.getConfig("env")
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().unwrapped().toString()));
+ }
+
+ /**
+ * return plugin's dependent jars, which located in 'plugins/${pluginName}/lib/*'.
+ */
+ private List<Path> getPluginsJarDependencies() throws IOException {
+ Path pluginRootDir = Common.pluginRootDir();
+ if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) {
+ return Collections.emptyList();
+ }
+ try (Stream<Path> stream = Files.walk(pluginRootDir, PLUGIN_LIB_DIR_DEPTH, FOLLOW_LINKS)) {
+ return stream
+ .filter(it -> pluginRootDir.relativize(it).getNameCount() == PLUGIN_LIB_DIR_DEPTH)
+ .filter(it -> it.getParent().endsWith("lib"))
+ .filter(it -> it.getFileName().endsWith("jar"))
+ .collect(Collectors.toList());
+ }
+ }
+
+ /**
+ * return connector's jars, which located in 'connectors/spark/*'.
+ */
+ private List<Path> getConnectorJarDependencies() {
+ Path pluginRootDir = Common.connectorJarDir("spark");
+ if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) {
+ return Collections.emptyList();
+ }
+ Config config = new ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig();
+ List<URL> pluginJars = new ArrayList<>();
+ SparkSourcePluginDiscovery sparkSourcePluginDiscovery = new SparkSourcePluginDiscovery();
+ SparkSinkPluginDiscovery sparkSinkPluginDiscovery = new SparkSinkPluginDiscovery();
+ pluginJars.addAll(sparkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config, PluginType.SOURCE)));
+ pluginJars.addAll(sparkSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(config, PluginType.SINK)));
+ return pluginJars.stream().map(url -> new File(url.getPath()).toPath()).collect(Collectors.toList());
+ }
+
+ /**
+ * list jars in given directory
+ */
+ private List<Path> listJars(Path dir) throws IOException {
+ try (Stream<Path> stream = Files.list(dir)) {
+ return stream
+ .filter(it -> !Files.isDirectory(it))
+ .filter(it -> it.getFileName().endsWith("jar"))
+ .collect(Collectors.toList());
+ }
+ }
+
+ /**
+ * build final spark-submit commands
+ */
+ protected List<String> buildFinal() {
+ List<String> commands = new ArrayList<>();
+ commands.add("${SPARK_HOME}/bin/spark-submit");
+ appendOption(commands, "--class", SeatunnelSpark.class.getName());
+ appendOption(commands, "--name", this.appName);
+ appendOption(commands, "--master", this.commandArgs.getMaster());
+ appendOption(commands, "--deploy-mode", this.commandArgs.getDeployMode().getName());
+ appendJars(commands, this.jars);
+ appendFiles(commands, this.files);
+ appendSparkConf(commands, this.sparkConf);
+ appendAppJar(commands);
+ appendArgs(commands, args);
+ return commands;
+ }
+
+ /**
+ * append option to StringBuilder
+ */
+ protected void appendOption(List<String> commands, String option, String value) {
+ commands.add(option);
+ commands.add("\"" + value.replace("\"", "\\\"") + "\"");
+ }
+
+ /**
+ * append jars option to StringBuilder
+ */
+ protected void appendJars(List<String> commands, List<Path> paths) {
+ appendPaths(commands, "--jars", paths);
+ }
+
+ /**
+ * append files option to StringBuilder
+ */
+ protected void appendFiles(List<String> commands, List<Path> paths) {
+ appendPaths(commands, "--files", paths);
+ }
+
+ /**
+ * append comma-split paths option to StringBuilder
+ */
+ protected void appendPaths(List<String> commands, String option, List<Path> paths) {
+ if (!paths.isEmpty()) {
+ String values = paths.stream()
+ .map(Path::toString)
+ .collect(Collectors.joining(","));
+ appendOption(commands, option, values);
+ }
+ }
+
+ /**
+ * append spark configurations to StringBuilder
+ */
+ protected void appendSparkConf(List<String> commands, Map<String, String> sparkConf) {
+ for (Map.Entry<String, String> entry : sparkConf.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ appendOption(commands, "--conf", key + "=" + value);
+ }
+ }
+
+ /**
+ * append original commandline args to StringBuilder
+ */
+ protected void appendArgs(List<String> commands, String[] args) {
+ commands.addAll(Arrays.asList(args));
+ }
+
+ /**
+ * append appJar to StringBuilder
+ */
+ protected void appendAppJar(List<String> commands) {
+ commands.add(Common.appLibDir().resolve("seatunnel-core-spark.jar").toString());
+ }
+
+ @SuppressWarnings("checkstyle:Indentation")
+ private List<PluginIdentifier> getPluginIdentifiers(Config config, PluginType... pluginTypes) {
+ return Arrays.stream(pluginTypes).flatMap((Function<PluginType, Stream<PluginIdentifier>>) pluginType -> {
+ List<? extends Config> configList = config.getConfigList(pluginType.getType());
+ return configList.stream()
+ .map(pluginConfig -> PluginIdentifier
+ .of(EngineType.SPARK.getEngine(),
+ pluginType.getType(),
+ pluginConfig.getString("plugin_name")));
+ }).collect(Collectors.toList());
+ }
+
+ /**
+ * a Starter for building spark-submit commands with client mode options
+ */
+ private static class ClientModeSparkStarter extends SparkStarter {
+
+ /**
+ * client mode specified spark options
+ */
+ private enum ClientModeSparkConfigs {
+
+ /**
+ * Memory for driver in client mode
+ */
+ DriverMemory("--driver-memory", "spark.driver.memory"),
+
+ /**
+ * Extra Java options to pass to the driver in client mode
+ */
+ DriverJavaOptions("--driver-java-options", "spark.driver.extraJavaOptions"),
+
+ /**
+ * Extra library path entries to pass to the driver in client mode
+ */
+ DriverLibraryPath(" --driver-library-path", "spark.driver.extraLibraryPath"),
+
+ /**
+ * Extra class path entries to pass to the driver in client mode
+ */
+ DriverClassPath("--driver-class-path", "spark.driver.extraClassPath");
+
+ private final String optionName;
+
+ private final String propertyName;
+
+ private static final Map<String, ClientModeSparkConfigs> PROPERTY_NAME_MAP = new HashMap<>();
+
+ static {
+ for (ClientModeSparkConfigs config : values()) {
+ PROPERTY_NAME_MAP.put(config.propertyName, config);
+ }
+ }
+
+ ClientModeSparkConfigs(String optionName, String propertyName) {
+ this.optionName = optionName;
+ this.propertyName = propertyName;
+ }
+ }
+
+ private ClientModeSparkStarter(String[] args, SparkCommandArgs commandArgs) {
+ super(args, commandArgs);
+ }
+
+ @Override
+ protected void appendSparkConf(List<String> commands, Map<String, String> sparkConf) {
+ for (ClientModeSparkConfigs config : ClientModeSparkConfigs.values()) {
+ String driverJavaOptions = this.sparkConf.get(config.propertyName);
+ if (StringUtils.isNotBlank(driverJavaOptions)) {
+ appendOption(commands, config.optionName, driverJavaOptions);
+ }
+ }
+ for (Map.Entry<String, String> entry : sparkConf.entrySet()) {
+ String key = entry.getKey();
+ String value = entry.getValue();
+ if (ClientModeSparkConfigs.PROPERTY_NAME_MAP.containsKey(key)) {
+ continue;
+ }
+ appendOption(commands, "--conf", key + "=" + value);
+ }
+ }
+ }
+
+ /**
+ * a Starter for building spark-submit commands with cluster mode options
+ */
+ private static class ClusterModeSparkStarter extends SparkStarter {
+
+ private ClusterModeSparkStarter(String[] args, SparkCommandArgs commandArgs) {
+ super(args, commandArgs);
+ }
+
+ @Override
+ public List<String> buildCommands() throws IOException {
+ Common.setDeployMode(commandArgs.getDeployMode().getName());
+ Path pluginTarball = Common.pluginTarball();
+ if (Files.notExists(pluginTarball)) {
+ CompressionUtils.tarGzip(Common.pluginRootDir(), pluginTarball);
+ }
+ this.files.add(pluginTarball);
+ this.files.add(Paths.get(commandArgs.getConfigFile()));
+ return super.buildCommands();
+ }
+ }
+}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
new file mode 100644
index 00000000..eb5ef929
--- /dev/null
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgs.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.spark.args;
+
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
+import org.apache.seatunnel.core.starter.command.DeployModeConverter;
+import org.apache.seatunnel.core.starter.config.EngineType;
+
+import com.beust.jcommander.Parameter;
+
+public class SparkCommandArgs extends AbstractCommandArgs {
+
+ @Parameter(names = {"-e", "--deploy-mode"},
+ description = "Spark deploy mode",
+ required = true,
+ converter = DeployModeConverter.class)
+ private DeployMode deployMode;
+
+ @Parameter(names = {"-m", "--master"},
+ description = "Spark master",
+ required = true)
+ private String master = null;
+
+ public String getMaster() {
+ return master;
+ }
+
+ @Override
+ public EngineType getEngineType() {
+ return EngineType.SPARK;
+ }
+
+ @Override
+ public DeployMode getDeployMode() {
+ return deployMode;
+ }
+
+ public void setDeployMode(DeployMode deployMode) {
+ this.deployMode = deployMode;
+ }
+
+ public void setMaster(String master) {
+ this.master = master;
+ }
+
+}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiConfValidateCommand.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiConfValidateCommand.java
similarity index 66%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiConfValidateCommand.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiConfValidateCommand.java
index 624ee545..7b9d6782 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiConfValidateCommand.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.spark.command;
+package org.apache.seatunnel.core.starter.spark.command;
-import org.apache.seatunnel.core.base.command.Command;
-import org.apache.seatunnel.core.base.config.ConfigBuilder;
-import org.apache.seatunnel.core.base.exception.ConfigCheckException;
-import org.apache.seatunnel.core.base.utils.FileUtils;
-import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.spark.config.SeaTunnelApiConfigChecker;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.spark.config.SparkApiConfigChecker;
+import org.apache.seatunnel.core.starter.utils.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,13 +32,13 @@ import java.nio.file.Path;
/**
* Use to validate the configuration of the SeaTunnel API.
*/
-public class SeaTunnelApiConfValidateCommand implements Command<SparkCommandArgs> {
+public class SparkApiConfValidateCommand implements Command<SparkCommandArgs> {
- private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelApiConfValidateCommand.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SparkApiConfValidateCommand.class);
private final SparkCommandArgs sparkCommandArgs;
- public SeaTunnelApiConfValidateCommand(SparkCommandArgs sparkCommandArgs) {
+ public SparkApiConfValidateCommand(SparkCommandArgs sparkCommandArgs) {
this.sparkCommandArgs = sparkCommandArgs;
}
@@ -47,7 +47,7 @@ public class SeaTunnelApiConfValidateCommand implements Command<SparkCommandArgs
Path configPath = FileUtils.getConfigPath(sparkCommandArgs);
// todo: validate the config by new api
ConfigBuilder configBuilder = new ConfigBuilder(configPath);
- new SeaTunnelApiConfigChecker().checkConfig(configBuilder.getConfig());
+ new SparkApiConfigChecker().checkConfig(configBuilder.getConfig());
LOGGER.info("config OK !");
}
}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
similarity index 69%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
index a0e30a7b..ecdc8a92 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.spark.command;
+package org.apache.seatunnel.core.starter.spark.command;
-import org.apache.seatunnel.core.base.command.Command;
-import org.apache.seatunnel.core.base.config.ConfigBuilder;
-import org.apache.seatunnel.core.base.exception.CommandExecuteException;
-import org.apache.seatunnel.core.base.utils.FileUtils;
-import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.spark.execution.SeaTunnelTaskExecution;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.spark.execution.SparkTaskExecution;
+import org.apache.seatunnel.core.starter.utils.FileUtils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -35,13 +35,13 @@ import java.nio.file.Path;
* todo: do we need to move these class to a new module? since this may cause version conflict with the old flink version.
* This command is used to execute the Flink job by SeaTunnel new API.
*/
-public class SeaTunnelApiTaskExecuteCommand implements Command<SparkCommandArgs> {
+public class SparkApiTaskExecuteCommand implements Command<SparkCommandArgs> {
- private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelApiTaskExecuteCommand.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SparkApiTaskExecuteCommand.class);
private final SparkCommandArgs sparkCommandArgs;
- public SeaTunnelApiTaskExecuteCommand(SparkCommandArgs sparkCommandArgs) {
+ public SparkApiTaskExecuteCommand(SparkCommandArgs sparkCommandArgs) {
this.sparkCommandArgs = sparkCommandArgs;
}
@@ -50,7 +50,7 @@ public class SeaTunnelApiTaskExecuteCommand implements Command<SparkCommandArgs>
Path configFile = FileUtils.getConfigPath(sparkCommandArgs);
Config config = new ConfigBuilder(configFile).getConfig();
try {
- SeaTunnelTaskExecution seaTunnelTaskExecution = new SeaTunnelTaskExecution(config);
+ SparkTaskExecution seaTunnelTaskExecution = new SparkTaskExecution(config);
seaTunnelTaskExecution.execute();
} catch (Exception e) {
LOGGER.error("Run SeaTunnel on spark failed.", e);
diff --git a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
similarity index 65%
copy from seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
copy to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
index 24661acf..65d6e729 100644
--- a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
@@ -15,32 +15,33 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.starter.flink.command;
+package org.apache.seatunnel.core.starter.spark.command;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.command.CommandBuilder;
-import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
-public class FlinkCommandBuilder implements CommandBuilder<FlinkCommandArgs> {
+public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
@Override
- public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
- if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
+ public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
+ if (Boolean.FALSE.equals(Common.setDeployMode(commandArgs.getDeployMode().getName()))) {
throw new IllegalArgumentException(
String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
}
- return new FlinkApiCommandBuilder().buildCommand(commandArgs);
+ return new SeaTunnelApiCommandBuilder().buildCommand(commandArgs);
}
/**
* Used to generate command for seaTunnel API.
*/
- private static class FlinkApiCommandBuilder extends FlinkCommandBuilder {
+ private static class SeaTunnelApiCommandBuilder extends SparkCommandBuilder {
@Override
- public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
- return commandArgs.isCheckConfig() ? new FlinkApiConfValidateCommand(commandArgs)
- : new FlinkApiTaskExecuteCommand(commandArgs);
+ public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
+ return commandArgs.isCheckConfig() ? new SparkApiConfValidateCommand(commandArgs)
+ : new SparkApiTaskExecuteCommand(commandArgs);
}
}
}
+
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
similarity index 78%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
index 3715c14b..b08a4a75 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.spark.config;
+package org.apache.seatunnel.core.starter.spark.config;
-import org.apache.seatunnel.core.base.config.ConfigChecker;
-import org.apache.seatunnel.core.base.exception.ConfigCheckException;
+import org.apache.seatunnel.core.starter.config.ConfigChecker;
+import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-public class SeaTunnelApiConfigChecker implements ConfigChecker<SeaTunnelEnvironment> {
+public class SparkApiConfigChecker implements ConfigChecker<SparkEnvironment> {
@Override
public void checkConfig(Config config) throws ConfigCheckException {
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelEnvironment.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironment.java
similarity index 84%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelEnvironment.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironment.java
index 6f9e0135..f7d5b83d 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelEnvironment.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkEnvironment.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.spark.config;
+package org.apache.seatunnel.core.starter.spark.config;
import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.config.CheckResult;
@@ -27,10 +27,10 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import java.net.URL;
import java.util.List;
-public class SeaTunnelEnvironment implements RuntimeEnv {
+public class SparkEnvironment implements RuntimeEnv {
@Override
- public SeaTunnelEnvironment setConfig(Config config) {
+ public SparkEnvironment setConfig(Config config) {
return null;
}
@@ -45,12 +45,12 @@ public class SeaTunnelEnvironment implements RuntimeEnv {
}
@Override
- public SeaTunnelEnvironment prepare() {
+ public SparkEnvironment prepare() {
return null;
}
@Override
- public SeaTunnelEnvironment setJobMode(JobMode mode) {
+ public SparkEnvironment setJobMode(JobMode mode) {
return null;
}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/AbstractPluginExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
similarity index 97%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/AbstractPluginExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
index 2d222eb4..0dad6f68 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/AbstractPluginExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/AbstractPluginExecuteProcessor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.spark.execution;
+package org.apache.seatunnel.core.starter.spark.execution;
import static org.apache.seatunnel.apis.base.plugin.Plugin.RESULT_TABLE_NAME;
import static org.apache.seatunnel.apis.base.plugin.Plugin.SOURCE_TABLE_NAME;
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/PluginExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
similarity index 95%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/PluginExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
index 48d18488..51fc752c 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/PluginExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.spark.execution;
+package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SinkExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
similarity index 98%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SinkExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 08807f7c..2a5a4597 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SinkExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.spark.execution;
+package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.common.config.Common;
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SourceExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
similarity index 98%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SourceExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index c6e42a0a..dd7cec03 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SourceExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.spark.execution;
+package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.common.utils.SerializationUtils;
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SeaTunnelTaskExecution.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
similarity index 91%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SeaTunnelTaskExecution.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
index d25d1764..1e2b5aa4 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/SeaTunnelTaskExecution.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
@@ -15,12 +15,12 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.spark.execution;
+package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.common.constants.JobMode;
-import org.apache.seatunnel.core.base.config.EngineType;
-import org.apache.seatunnel.core.base.config.EnvironmentFactory;
+import org.apache.seatunnel.core.starter.config.EngineType;
+import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -33,9 +33,9 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
-public class SeaTunnelTaskExecution {
+public class SparkTaskExecution {
- private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelTaskExecution.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(SparkTaskExecution.class);
private final Config config;
private final SparkEnvironment sparkEnvironment;
@@ -43,7 +43,7 @@ public class SeaTunnelTaskExecution {
private final PluginExecuteProcessor transformPluginExecuteProcessor;
private final PluginExecuteProcessor sinkPluginExecuteProcessor;
- public SeaTunnelTaskExecution(Config config) {
+ public SparkTaskExecution(Config config) {
this.config = config;
this.sparkEnvironment = getSparkEnvironment(config);
this.sourcePluginExecuteProcessor = new SourceExecuteProcessor(sparkEnvironment, config.getConfigList("source"));
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
similarity index 98%
rename from seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/TransformExecuteProcessor.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index aa6e56c8..deec43b9 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/execution/TransformExecuteProcessor.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.spark.execution;
+package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSparkTransformPluginDiscovery;
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ApiType.java b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtils.java
similarity index 56%
rename from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ApiType.java
rename to seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtils.java
index 5468eee3..9e25030e 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ApiType.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtils.java
@@ -15,19 +15,24 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.config;
+package org.apache.seatunnel.core.starter.spark.utils;
-public enum ApiType {
- ENGINE_API("engine"),
- SEATUNNEL_API("seatunnel"),
- ;
- private final String apiType;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
- ApiType(String apiType) {
- this.apiType = apiType;
+import com.beust.jcommander.JCommander;
+
+public class CommandLineUtils {
+
+ private CommandLineUtils() {
+ throw new UnsupportedOperationException("CommandLineUtils is a utility class and cannot be instantiated");
}
- public String getApiType() {
- return apiType;
+ public static SparkCommandArgs parseSparkArgs(String[] args) {
+ SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
+ JCommander.newBuilder()
+ .addObject(sparkCommandArgs)
+ .build()
+ .parse(args);
+ return sparkCommandArgs;
}
}
diff --git a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/SparkStarterTest.java b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkStarterTest.java
similarity index 96%
copy from seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/SparkStarterTest.java
copy to seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkStarterTest.java
index 20d35b92..3b0bef3a 100644
--- a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/SparkStarterTest.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/SparkStarterTest.java
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.spark;
+package org.apache.seatunnel.core.starter.spark;
import static org.junit.Assert.assertEquals;
diff --git a/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgsTest.java b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgsTest.java
new file mode 100644
index 00000000..c36b1ce4
--- /dev/null
+++ b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/args/SparkCommandArgsTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.spark.args;
+
+import org.apache.seatunnel.common.config.DeployMode;
+
+import com.beust.jcommander.JCommander;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+
+public class SparkCommandArgsTest {
+
+ @Test
+ public void testParseSparkArgs() {
+ String[] args = {"-c", "app.conf", "-e", "client", "-m", "yarn", "-i", "city=shijiazhuang", "-i", "name=Tom"};
+ SparkCommandArgs sparkArgs = new SparkCommandArgs();
+ JCommander.newBuilder()
+ .addObject(sparkArgs)
+ .build()
+ .parse(args);
+ Assert.assertEquals("app.conf", sparkArgs.getConfigFile());
+ Assert.assertEquals(DeployMode.CLIENT, sparkArgs.getDeployMode());
+ Assert.assertEquals("yarn", sparkArgs.getMaster());
+ Assert.assertEquals(Arrays.asList("city=shijiazhuang", "name=Tom"), sparkArgs.getVariables());
+ }
+
+ @Test
+ public void testHelp() {
+ String[] args = {"-h"};
+ SparkCommandArgs sparkArgs = new SparkCommandArgs();
+ JCommander commander = JCommander.newBuilder()
+ .addObject(sparkArgs)
+ .build();
+ commander.parse(args);
+ if (sparkArgs.isHelp()) {
+ commander.usage();
+ }
+ }
+
+ @Test
+ public void testDashDash() {
+ String[] args = {"-c", "app.conf", "-e", "client", "-m", "yarn", "-i", "city=guojizhuang", "--"};
+ SparkCommandArgs sparkArgs = new SparkCommandArgs();
+ JCommander.newBuilder()
+ .addObject(sparkArgs)
+ .build()
+ .parse(args);
+ }
+
+}
diff --git a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/SparkStarterTest.java b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtilsTest.java
similarity index 50%
rename from seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/SparkStarterTest.java
rename to seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtilsTest.java
index 20d35b92..61cafeeb 100644
--- a/seatunnel-core/seatunnel-core-spark/src/test/java/org/apache/seatunnel/core/spark/SparkStarterTest.java
+++ b/seatunnel-core/seatunnel-spark-starter/src/test/java/org/apache/seatunnel/core/starter/spark/utils/CommandLineUtilsTest.java
@@ -15,26 +15,28 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.spark;
+package org.apache.seatunnel.core.starter.spark.utils;
-import static org.junit.Assert.assertEquals;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import com.beust.jcommander.ParameterException;
+import org.junit.Assert;
import org.junit.Test;
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.Map;
+public class CommandLineUtilsTest {
-public class SparkStarterTest {
+ @Test
+ public void testParseSparkArgs() {
+ String[] args = {"-c", "app.conf", "-e", "cluster", "-m", "local[*]"};
+ SparkCommandArgs commandLineArgs = CommandLineUtils.parseSparkArgs(args);
+
+ Assert.assertEquals("app.conf", commandLineArgs.getConfigFile());
+ Assert.assertEquals("cluster", commandLineArgs.getDeployMode().getName());
+ }
@Test
- public void testGetSparkConf() throws URISyntaxException, FileNotFoundException {
- URI uri = ClassLoader.getSystemResource("spark_application.conf").toURI();
- String file = new File(uri).toString();
- Map<String, String> sparkConf = SparkStarter.getSparkConf(file);
- assertEquals("SeaTunnel", sparkConf.get("spark.app.name"));
- assertEquals("1", sparkConf.get("spark.executor.cores"));
+ public void testParseSparkArgsException() {
+ String[] args = {"-c", "app.conf", "-e", "cluster2xxx", "-m", "local[*]"};
+ Assert.assertThrows(ParameterException.class, () -> CommandLineUtils.parseSparkArgs(args));
}
}
diff --git a/seatunnel-core/seatunnel-spark-starter/src/test/resources/spark_application.conf b/seatunnel-core/seatunnel-spark-starter/src/test/resources/spark_application.conf
new file mode 100644
index 00000000..bf853a4c
--- /dev/null
+++ b/seatunnel-core/seatunnel-spark-starter/src/test/resources/spark_application.conf
@@ -0,0 +1,66 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+######
+###### This config file is a demonstration of batch processing in seatunnel config
+######
+
+env {
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ "spark.executor.cores" = 1
+ "spark.executor.memory" = "1g"
+ "spark.streaming.batchDuration" = 5
+}
+
+source {
+
+ fakeStream {
+ content = ["Hello World, InterestingLab"]
+ }
+
+}
+
+transform {
+
+ split {
+ fields = ["msg", "name"]
+ delimiter = ","
+ }
+
+ sql1 {
+ sql = "sql1"
+ }
+
+ sql2 {
+ sql = "sql2"
+ }
+
+ sql3 {
+ sql = "sql3"
+ }
+
+ json {
+ sql = "sql3"
+ }
+
+}
+
+sink {
+ Console {}
+ c.Console {}
+}
diff --git a/seatunnel-examples/seatunnel-spark-examples/pom.xml b/seatunnel-examples/seatunnel-spark-examples/pom.xml
index 7f586827..1c3b3dfd 100644
--- a/seatunnel-examples/seatunnel-spark-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-spark-examples/pom.xml
@@ -34,7 +34,7 @@
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-core-spark</artifactId>
+ <artifactId>seatunnel-spark-starter</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
deleted file mode 100644
index aa121a90..00000000
--- a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.example.spark;
-
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.base.Seatunnel;
-import org.apache.seatunnel.core.base.command.Command;
-import org.apache.seatunnel.core.base.exception.CommandException;
-import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
-
-import java.io.FileNotFoundException;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.file.Paths;
-
-public class LocalSparkExample {
-
- public static void main(String[] args) throws URISyntaxException, FileNotFoundException, CommandException {
- String configFile = getTestConfigFile("/examples/spark.batch.conf");
- SparkCommandArgs sparkArgs = new SparkCommandArgs();
- sparkArgs.setConfigFile(configFile);
- sparkArgs.setCheckConfig(false);
- sparkArgs.setVariables(null);
- sparkArgs.setDeployMode(DeployMode.CLIENT);
- Command<SparkCommandArgs> sparkCommand = new SparkCommandBuilder().buildCommand(sparkArgs);
- Seatunnel.run(sparkCommand);
- }
-
- public static String getTestConfigFile(String configFile) throws URISyntaxException, FileNotFoundException {
- URL resource = LocalSparkExample.class.getResource(configFile);
- if (resource == null) {
- throw new FileNotFoundException("Could not find config file: " + configFile);
- }
- return Paths.get(resource.toURI()).toString();
- }
-}
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/SeaTunnelApiExample.java b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/SeaTunnelApiExample.java
index 1cdf40f8..970da462 100644
--- a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/SeaTunnelApiExample.java
+++ b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/SeaTunnelApiExample.java
@@ -18,11 +18,11 @@
package org.apache.seatunnel.example.spark;
import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.base.Seatunnel;
-import org.apache.seatunnel.core.base.command.Command;
-import org.apache.seatunnel.core.base.exception.CommandException;
-import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
+import org.apache.seatunnel.core.starter.Seatunnel;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.exception.CommandException;
+import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.spark.command.SparkCommandBuilder;
import java.io.FileNotFoundException;
import java.net.URISyntaxException;
@@ -44,7 +44,7 @@ public class SeaTunnelApiExample {
}
public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException {
- URL resource = LocalSparkExample.class.getResource(configFile);
+ URL resource = SeaTunnelApiExample.class.getResource(configFile);
if (resource == null) {
throw new FileNotFoundException("Can't find config file: " + configFile);
}
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
index cda5ab6b..cc77b49a 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/flink/FlinkTransformPluginDiscovery.java
@@ -31,6 +31,7 @@ public class FlinkTransformPluginDiscovery extends AbstractPluginDiscovery<BaseF
super("flink");
}
+ @Override
public List<URL> getPluginJarPaths(List<PluginIdentifier> pluginIdentifiers) {
return new ArrayList<>();
}
diff --git a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkTransformPluginDiscovery.java b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkTransformPluginDiscovery.java
index 849fabbd..cc125e26 100644
--- a/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkTransformPluginDiscovery.java
+++ b/seatunnel-plugin-discovery/src/main/java/org/apache/seatunnel/plugin/discovery/spark/SparkTransformPluginDiscovery.java
@@ -34,6 +34,7 @@ public class SparkTransformPluginDiscovery extends AbstractPluginDiscovery<BaseS
super("spark");
}
+ @Override
public List<URL> getPluginJarPaths(List<PluginIdentifier> pluginIdentifiers) {
return Collections.emptyList();
}