You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/19 03:23:02 UTC
[incubator-seatunnel] branch api-draft updated: [Api-draft] Add spark example (#1913)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new b3ca59a0 [Api-draft] Add spark example (#1913)
b3ca59a0 is described below
commit b3ca59a09dc02b56a2f7bbc5da6269a8b9885401
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Thu May 19 11:22:56 2022 +0800
[Api-draft] Add spark example (#1913)
* add spark example
* add dependencies
---
.../seatunnel/fake/source/FakeSourceReader.java | 3 +-
.../core/base/command/AbstractCommandArgs.java | 37 +++++++
.../core/flink/args/FlinkCommandArgs.java | 40 +-------
seatunnel-core/seatunnel-core-spark/pom.xml | 26 +++++
.../command/SeaTunnelApiConfValidateCommand.java | 53 ++++++++++
.../command/SeaTunnelApiTaskExecuteCommand.java | 111 +++++++++++++++++++++
.../core/spark/command/SparkCommandBuilder.java | 38 ++++++-
.../seatunnel-spark-examples/pom.xml | 11 ++
.../example/spark/SeaTunnelApiExample.java | 55 ++++++++++
.../translation/spark/sink/SparkSink.java | 2 +-
.../spark/source/SeaTunnelSourceSupport.java | 2 +-
.../micro/MicroBatchParallelSourceReader.java | 2 +-
.../spark/utils/TypeConverterUtils.java | 16 ++-
...org.apache.spark.sql.sources.DataSourceRegister | 19 +++-
tools/dependencies/known-dependencies.txt | 1 +
15 files changed, 369 insertions(+), 47 deletions(-)
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index be8b04f1..5352235e 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -59,7 +59,8 @@ public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSp
@SuppressWarnings("magicnumber")
public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
// Generate a random number of rows to emit.
- for (int i = 0; i < random.nextInt(10); i++) {
+ int size = random.nextInt(10);
+ for (int i = 0; i < size; i++) {
int randomIndex = random.nextInt(names.length);
Map<String, Object> fieldMap = new HashMap<>(4);
fieldMap.put("name", names[randomIndex]);
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
index a19fe285..e049103b 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
@@ -19,8 +19,10 @@ package org.apache.seatunnel.core.base.command;
import org.apache.seatunnel.apis.base.command.CommandArgs;
import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.base.config.ApiType;
import org.apache.seatunnel.core.base.config.EngineType;
+import com.beust.jcommander.IStringConverter;
import com.beust.jcommander.Parameter;
import java.util.Collections;
@@ -42,6 +44,11 @@ public abstract class AbstractCommandArgs implements CommandArgs {
description = "check config")
private boolean checkConfig = false;
+ @Parameter(names = {"-api", "--api-type"},
+ converter = ApiTypeConverter.class,
+ description = "Api type, engine or seatunnel")
+ private ApiType apiType = ApiType.ENGINE_API;
+
@Parameter(names = {"-h", "--help"},
help = true,
description = "Show the usage message")
@@ -63,6 +70,14 @@ public abstract class AbstractCommandArgs implements CommandArgs {
this.variables = variables;
}
+ public ApiType getApiType() {
+ return apiType;
+ }
+
+ public void setApiType(ApiType apiType) {
+ this.apiType = apiType;
+ }
+
public boolean isCheckConfig() {
return checkConfig;
}
@@ -87,4 +102,26 @@ public abstract class AbstractCommandArgs implements CommandArgs {
throw new UnsupportedOperationException("abstract class CommandArgs not support this method");
}
+ /**
+ * Used to convert the api type string to the enum value.
+ */
+ private static class ApiTypeConverter implements IStringConverter<ApiType> {
+
+ /**
+ * If the '-api' is not set, then will not go into this convert method.
+ *
+ * @param value input value set by '-api' or '--api-type'
+ * @return api type enum value
+ */
+ @Override
+ public ApiType convert(String value) {
+ for (ApiType apiType : ApiType.values()) {
+ if (apiType.getApiType().equalsIgnoreCase(value)) {
+ return apiType;
+ }
+ }
+ throw new IllegalArgumentException(String.format("API type %s not supported", value));
+ }
+ }
+
}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
index 5b27fcfd..f6221c80 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.core.flink.args;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.base.command.AbstractCommandArgs;
-import org.apache.seatunnel.core.base.config.ApiType;
import org.apache.seatunnel.core.base.config.EngineType;
import org.apache.seatunnel.core.flink.config.FlinkRunMode;
@@ -31,15 +30,10 @@ import java.util.List;
public class FlinkCommandArgs extends AbstractCommandArgs {
@Parameter(names = {"-r", "--run-mode"},
- converter = RunModeConverter.class,
- description = "job run mode, run or run-application")
+ converter = RunModeConverter.class,
+ description = "job run mode, run or run-application")
private FlinkRunMode runMode = FlinkRunMode.RUN;
- @Parameter(names = {"-api", "--api-type"},
- converter = ApiTypeConverter.class,
- description = "Api type, engine or seatunnel")
- private ApiType apiType = ApiType.ENGINE_API;
-
/**
* Undefined parameters parsed will be stored here as flink command parameters.
*/
@@ -71,14 +65,6 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
this.flinkParams = flinkParams;
}
- public ApiType getApiType() {
- return apiType;
- }
-
- public void setApiType(ApiType apiType) {
- this.apiType = apiType;
- }
-
/**
* Used to convert the run mode string to the enum value.
*/
@@ -100,26 +86,4 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
}
}
- /**
- * Used to convert the api type string to the enum value.
- */
- private static class ApiTypeConverter implements IStringConverter<ApiType> {
-
- /**
- * If the '-api' is not set, then will not go into this convert method.
- *
- * @param value input value set by '-api' or '--api-type'
- * @return api type enum value
- */
- @Override
- public ApiType convert(String value) {
- for (ApiType apiType : ApiType.values()) {
- if (apiType.getApiType().equalsIgnoreCase(value)) {
- return apiType;
- }
- }
- throw new IllegalArgumentException(String.format("API type %s not supported", value));
- }
- }
-
}
diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml b/seatunnel-core/seatunnel-core-spark/pom.xml
index 86410e07..13968336 100644
--- a/seatunnel-core/seatunnel-core-spark/pom.xml
+++ b/seatunnel-core/seatunnel-core-spark/pom.xml
@@ -34,6 +34,13 @@
</properties>
<dependencies>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-core-base</artifactId>
@@ -46,6 +53,25 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_${scala.binary.version}</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-translation-spark</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiConfValidateCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiConfValidateCommand.java
new file mode 100644
index 00000000..624ee545
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiConfValidateCommand.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.spark.command;
+
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
+import org.apache.seatunnel.core.base.utils.FileUtils;
+import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.spark.config.SeaTunnelApiConfigChecker;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+
+/**
+ * Use to validate the configuration of the SeaTunnel API.
+ */
+public class SeaTunnelApiConfValidateCommand implements Command<SparkCommandArgs> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelApiConfValidateCommand.class);
+
+ private final SparkCommandArgs sparkCommandArgs;
+
+ public SeaTunnelApiConfValidateCommand(SparkCommandArgs sparkCommandArgs) {
+ this.sparkCommandArgs = sparkCommandArgs;
+ }
+
+ @Override
+ public void execute() throws ConfigCheckException {
+ Path configPath = FileUtils.getConfigPath(sparkCommandArgs);
+ // todo: validate the config by new api
+ ConfigBuilder configBuilder = new ConfigBuilder(configPath);
+ new SeaTunnelApiConfigChecker().checkConfig(configBuilder.getConfig());
+ LOGGER.info("config OK !");
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
new file mode 100644
index 00000000..55f731d9
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.spark.command;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.utils.SerializationUtils;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.EnvironmentFactory;
+import org.apache.seatunnel.core.base.exception.CommandExecuteException;
+import org.apache.seatunnel.core.base.utils.FileUtils;
+import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
+import org.apache.seatunnel.spark.SparkEnvironment;
+import org.apache.seatunnel.translation.spark.sink.SparkSinkInjector;
+import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+
+/**
+ * todo: do we need to move these class to a new module? since this may cause version conflict with the old flink version.
+ * This command is used to execute the Flink job by SeaTunnel new API.
+ */
+public class SeaTunnelApiTaskExecuteCommand implements Command<SparkCommandArgs> {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelApiTaskExecuteCommand.class);
+
+ private final SparkCommandArgs sparkCommandArgs;
+
+ public SeaTunnelApiTaskExecuteCommand(SparkCommandArgs sparkCommandArgs) {
+ this.sparkCommandArgs = sparkCommandArgs;
+ }
+
+ @Override
+ public void execute() throws CommandExecuteException {
+ Path configFile = FileUtils.getConfigPath(sparkCommandArgs);
+
+ Config config = new ConfigBuilder(configFile).getConfig();
+ SparkEnvironment sparkEnvironment = getSparkEnvironment(config);
+ SeaTunnelSource<?, ?, ?> source = getSource(config);
+ Dataset<Row> dataset = sparkEnvironment.getSparkSession().read().format("SeaTunnelSource")
+ .option("source.serialization", SerializationUtils.objectToString(source))
+ .schema(TypeConverterUtils.convertRow(source.getRowTypeInfo())).load();
+ SeaTunnelSink<?, ?, ?, ?> sink = getSink(config);
+ try {
+ SparkSinkInjector.inject(dataset.write(), sink, new HashMap<>(Common.COLLECTION_SIZE)).option(
+ "checkpointLocation", "/tmp").save();
+ } catch (Exception e) {
+ LOGGER.error("run seatunnel on spark failed.", e);
+ }
+ }
+
+ private SeaTunnelSource<?, ?, ?> getSource(Config config) {
+ PluginIdentifier pluginIdentifier = getSourcePluginIdentifier();
+ // todo: use FactoryUtils to load the plugin
+ SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
+ return sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+ }
+
+ private SeaTunnelSink<?, ?, ?, ?> getSink(Config config) {
+ PluginIdentifier pluginIdentifier = getSinkPluginIdentifier();
+ SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
+ return sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+ }
+
+ private PluginIdentifier getSourcePluginIdentifier() {
+ return PluginIdentifier.of("seatunnel", "source", "FakeSource");
+ }
+
+ private PluginIdentifier getSinkPluginIdentifier() {
+ return PluginIdentifier.of("seatunnel", "sink", "Console");
+ }
+
+ private SparkEnvironment getSparkEnvironment(Config config) {
+ SparkEnvironment sparkEnvironment = (SparkEnvironment) new EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
+ sparkEnvironment.setJobMode(JobMode.STREAMING);
+ sparkEnvironment.setConfig(config);
+ sparkEnvironment.prepare();
+
+ return sparkEnvironment;
+ }
+}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
index 4ea8ed1e..39512391 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
@@ -28,10 +28,42 @@ public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
throw new IllegalArgumentException(
- String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
+ String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
+ }
+ switch (commandArgs.getApiType()) {
+ case ENGINE_API:
+ return new SparkApiCommandBuilder().buildCommand(commandArgs);
+ case SEATUNNEL_API:
+ return new SeaTunnelApiCommandBuilder().buildCommand(commandArgs);
+ default:
+ throw new IllegalArgumentException("Unsupported API type: " + commandArgs.getApiType());
+ }
+ }
+
+ /**
+ * Used to generate command for engine API.
+ */
+ private static class SparkApiCommandBuilder extends SparkCommandBuilder {
+ @Override
+ public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
+ if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
+ throw new IllegalArgumentException(
+ String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
+ }
+ return commandArgs.isCheckConfig() ? new SparkConfValidateCommand(commandArgs)
+ : new SparkTaskExecuteCommand(commandArgs);
+ }
+ }
+
+ /**
+ * Used to generate command for seaTunnel API.
+ */
+ private static class SeaTunnelApiCommandBuilder extends SparkCommandBuilder {
+ @Override
+ public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
+ return commandArgs.isCheckConfig() ? new SeaTunnelApiConfValidateCommand(commandArgs)
+ : new SeaTunnelApiTaskExecuteCommand(commandArgs);
}
- return commandArgs.isCheckConfig() ? new SparkConfValidateCommand(commandArgs)
- : new SparkTaskExecuteCommand(commandArgs);
}
}
diff --git a/seatunnel-examples/seatunnel-spark-examples/pom.xml b/seatunnel-examples/seatunnel-spark-examples/pom.xml
index 6b9d2c79..7f586827 100644
--- a/seatunnel-examples/seatunnel-spark-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-spark-examples/pom.xml
@@ -49,6 +49,17 @@
<artifactId>seatunnel-connector-spark-console</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-seatunnel-fake</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-seatunnel-console</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- seatunnel connectors -->
<!--spark-->
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/SeaTunnelApiExample.java b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/SeaTunnelApiExample.java
new file mode 100644
index 00000000..f1d9b5b2
--- /dev/null
+++ b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/SeaTunnelApiExample.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.example.spark;
+
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.base.Seatunnel;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.config.ApiType;
+import org.apache.seatunnel.core.base.exception.CommandException;
+import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
+
+import java.io.FileNotFoundException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+
+public class SeaTunnelApiExample {
+
+ public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException {
+ String configFile = getTestConfigFile("/examples/spark.batch.conf");
+ SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
+ sparkCommandArgs.setConfigFile(configFile);
+ sparkCommandArgs.setCheckConfig(false);
+ sparkCommandArgs.setVariables(null);
+ sparkCommandArgs.setApiType(ApiType.SEATUNNEL_API);
+ sparkCommandArgs.setDeployMode(DeployMode.CLIENT);
+ Command<SparkCommandArgs> flinkCommand =
+ new SparkCommandBuilder().buildCommand(sparkCommandArgs);
+ Seatunnel.run(flinkCommand);
+ }
+
+ public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException {
+ URL resource = LocalSparkExample.class.getResource(configFile);
+ if (resource == null) {
+ throw new FileNotFoundException("Can't find config file: " + configFile);
+ }
+ return Paths.get(resource.toURI()).toString();
+ }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
index 5a07efb2..e2250fd9 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
@@ -38,7 +38,7 @@ import java.util.Optional;
public class SparkSink<InputT, StateT, CommitInfoT, AggregatedCommitInfoT> implements WriteSupport,
StreamWriteSupport, DataSourceV2 {
- private SeaTunnelSink<InputT, StateT, CommitInfoT, AggregatedCommitInfoT> sink;
+ private volatile SeaTunnelSink<InputT, StateT, CommitInfoT, AggregatedCommitInfoT> sink;
private Map<String, String> configuration;
private void init(DataSourceOptions options) {
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index deab83d9..bba5b661 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -86,7 +86,7 @@ public class SeaTunnelSourceSupport implements DataSourceV2, ReadSupport, MicroB
}
private static StructType checkRowType(Optional<StructType> rowTypeOptional) {
- return rowTypeOptional.orElseThrow(SeaTunnelSourceSupport::createUnspecifiedRowTypeException);
+ return rowTypeOptional.orElse(null);
}
private static RuntimeException createUnspecifiedRowTypeException() {
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
index 3fb269bb..23b29c7f 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
@@ -83,7 +83,7 @@ public class MicroBatchParallelSourceReader implements MicroBatchReader {
@Override
public StructType readSchema() {
- return rowType;
+ return this.rowType;
}
@Override
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java
index 932cd22a..bac7cbb4 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.PojoType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
import org.apache.seatunnel.api.table.type.TimestampType;
import org.apache.seatunnel.translation.spark.types.ArrayTypeConverter;
import org.apache.seatunnel.translation.spark.types.BasicTypeConverter;
@@ -28,7 +29,10 @@ import org.apache.seatunnel.translation.spark.types.PojoTypeConverter;
import org.apache.seatunnel.translation.spark.types.TimestampTypeConverter;
import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.ObjectType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
import java.math.BigDecimal;
import java.math.BigInteger;
@@ -42,7 +46,7 @@ public class TypeConverterUtils {
public static <T> DataType convert(SeaTunnelDataType<T> seaTunnelDataType) {
if (seaTunnelDataType instanceof BasicType) {
- convertBasicType((BasicType<T>) seaTunnelDataType);
+ return convertBasicType((BasicType<T>) seaTunnelDataType);
}
if (seaTunnelDataType instanceof TimestampType) {
return TimestampTypeConverter.INSTANCE.convert((TimestampType) seaTunnelDataType);
@@ -124,4 +128,14 @@ public class TypeConverterUtils {
PojoTypeConverter<T> pojoTypeConverter = new PojoTypeConverter<>();
return pojoTypeConverter.convert(pojoType);
}
+
+ public static StructType convertRow(SeaTunnelRowTypeInfo typeInfo) {
+ StructField[] fields = new StructField[typeInfo.getFieldNames().length];
+ for (int i = 0; i < typeInfo.getFieldNames().length; i++) {
+ fields[i] = new StructField(typeInfo.getFieldNames()[i],
+ convert(typeInfo.getSeaTunnelDataTypes()[i]), true, Metadata.empty());
+ }
+ return new StructType(fields);
+ }
+
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/seatunnel-translation/seatunnel-translation-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 2a5363d0..a432e9b9 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1 +1,18 @@
-org.apache.seatunnel.translation.spark.source.SeaTunnelSourceSupport
\ No newline at end of file
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.translation.spark.source.SeaTunnelSourceSupport
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 0b2f9822..05279dfa 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -570,6 +570,7 @@ okio-1.4.0.jar
okio-2.8.0.jar
opencsv-4.6.jar
orc-core-1.5.6.jar
+orc-shims-1.5.2.jar
orc-shims-1.5.6.jar
osgi-resource-locator-1.0.1.jar
paranamer-2.3.jar