You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/19 03:23:02 UTC

[incubator-seatunnel] branch api-draft updated: [Api-draft] Add spark example (#1913)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new b3ca59a0 [Api-draft] Add spark example (#1913)
b3ca59a0 is described below

commit b3ca59a09dc02b56a2f7bbc5da6269a8b9885401
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Thu May 19 11:22:56 2022 +0800

    [Api-draft] Add spark example (#1913)
    
    * add spark example
    * add dependencies
---
 .../seatunnel/fake/source/FakeSourceReader.java    |   3 +-
 .../core/base/command/AbstractCommandArgs.java     |  37 +++++++
 .../core/flink/args/FlinkCommandArgs.java          |  40 +-------
 seatunnel-core/seatunnel-core-spark/pom.xml        |  26 +++++
 .../command/SeaTunnelApiConfValidateCommand.java   |  53 ++++++++++
 .../command/SeaTunnelApiTaskExecuteCommand.java    | 111 +++++++++++++++++++++
 .../core/spark/command/SparkCommandBuilder.java    |  38 ++++++-
 .../seatunnel-spark-examples/pom.xml               |  11 ++
 .../example/spark/SeaTunnelApiExample.java         |  55 ++++++++++
 .../translation/spark/sink/SparkSink.java          |   2 +-
 .../spark/source/SeaTunnelSourceSupport.java       |   2 +-
 .../micro/MicroBatchParallelSourceReader.java      |   2 +-
 .../spark/utils/TypeConverterUtils.java            |  16 ++-
 ...org.apache.spark.sql.sources.DataSourceRegister |  19 +++-
 tools/dependencies/known-dependencies.txt          |   1 +
 15 files changed, 369 insertions(+), 47 deletions(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index be8b04f1..5352235e 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -59,7 +59,8 @@ public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSp
     @SuppressWarnings("magicnumber")
     public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
         // Generate a random number of rows to emit.
-        for (int i = 0; i < random.nextInt(10); i++) {
+        int size = random.nextInt(10);
+        for (int i = 0; i < size; i++) {
             int randomIndex = random.nextInt(names.length);
             Map<String, Object> fieldMap = new HashMap<>(4);
             fieldMap.put("name", names[randomIndex]);
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
index a19fe285..e049103b 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
@@ -19,8 +19,10 @@ package org.apache.seatunnel.core.base.command;
 
 import org.apache.seatunnel.apis.base.command.CommandArgs;
 import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.base.config.ApiType;
 import org.apache.seatunnel.core.base.config.EngineType;
 
+import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.Parameter;
 
 import java.util.Collections;
@@ -42,6 +44,11 @@ public abstract class AbstractCommandArgs implements CommandArgs {
             description = "check config")
     private boolean checkConfig = false;
 
+    @Parameter(names = {"-api", "--api-type"},
+            converter = ApiTypeConverter.class,
+            description = "Api type, engine or seatunnel")
+    private ApiType apiType = ApiType.ENGINE_API;
+
     @Parameter(names = {"-h", "--help"},
             help = true,
             description = "Show the usage message")
@@ -63,6 +70,14 @@ public abstract class AbstractCommandArgs implements CommandArgs {
         this.variables = variables;
     }
 
+    public ApiType getApiType() {
+        return apiType;
+    }
+
+    public void setApiType(ApiType apiType) {
+        this.apiType = apiType;
+    }
+
     public boolean isCheckConfig() {
         return checkConfig;
     }
@@ -87,4 +102,26 @@ public abstract class AbstractCommandArgs implements CommandArgs {
         throw new UnsupportedOperationException("abstract class CommandArgs not support this method");
     }
 
+    /**
+     * Used to convert the api type string to the enum value.
+     */
+    private static class ApiTypeConverter implements IStringConverter<ApiType> {
+
+        /**
+         * If the '-api' is not set, then will not go into this convert method.
+         *
+         * @param value input value set by '-api' or '--api-type'
+         * @return api type enum value
+         */
+        @Override
+        public ApiType convert(String value) {
+            for (ApiType apiType : ApiType.values()) {
+                if (apiType.getApiType().equalsIgnoreCase(value)) {
+                    return apiType;
+                }
+            }
+            throw new IllegalArgumentException(String.format("API type %s not supported", value));
+        }
+    }
+
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
index 5b27fcfd..f6221c80 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
@@ -19,7 +19,6 @@ package org.apache.seatunnel.core.flink.args;
 
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.base.command.AbstractCommandArgs;
-import org.apache.seatunnel.core.base.config.ApiType;
 import org.apache.seatunnel.core.base.config.EngineType;
 import org.apache.seatunnel.core.flink.config.FlinkRunMode;
 
@@ -31,15 +30,10 @@ import java.util.List;
 public class FlinkCommandArgs extends AbstractCommandArgs {
 
     @Parameter(names = {"-r", "--run-mode"},
-        converter = RunModeConverter.class,
-        description = "job run mode, run or run-application")
+            converter = RunModeConverter.class,
+            description = "job run mode, run or run-application")
     private FlinkRunMode runMode = FlinkRunMode.RUN;
 
-    @Parameter(names = {"-api", "--api-type"},
-        converter = ApiTypeConverter.class,
-        description = "Api type, engine or seatunnel")
-    private ApiType apiType = ApiType.ENGINE_API;
-
     /**
      * Undefined parameters parsed will be stored here as flink command parameters.
      */
@@ -71,14 +65,6 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
         this.flinkParams = flinkParams;
     }
 
-    public ApiType getApiType() {
-        return apiType;
-    }
-
-    public void setApiType(ApiType apiType) {
-        this.apiType = apiType;
-    }
-
     /**
      * Used to convert the run mode string to the enum value.
      */
@@ -100,26 +86,4 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
         }
     }
 
-    /**
-     * Used to convert the api type string to the enum value.
-     */
-    private static class ApiTypeConverter implements IStringConverter<ApiType> {
-
-        /**
-         * If the '-api' is not set, then will not go into this convert method.
-         *
-         * @param value input value set by '-api' or '--api-type'
-         * @return api type enum value
-         */
-        @Override
-        public ApiType convert(String value) {
-            for (ApiType apiType : ApiType.values()) {
-                if (apiType.getApiType().equalsIgnoreCase(value)) {
-                    return apiType;
-                }
-            }
-            throw new IllegalArgumentException(String.format("API type %s not supported", value));
-        }
-    }
-
 }
diff --git a/seatunnel-core/seatunnel-core-spark/pom.xml b/seatunnel-core/seatunnel-core-spark/pom.xml
index 86410e07..13968336 100644
--- a/seatunnel-core/seatunnel-core-spark/pom.xml
+++ b/seatunnel-core/seatunnel-core-spark/pom.xml
@@ -34,6 +34,13 @@
     </properties>
 
     <dependencies>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-core-base</artifactId>
@@ -46,6 +53,25 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_${scala.binary.version}</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-translation-spark</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiConfValidateCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiConfValidateCommand.java
new file mode 100644
index 00000000..624ee545
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiConfValidateCommand.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.spark.command;
+
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
+import org.apache.seatunnel.core.base.utils.FileUtils;
+import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.spark.config.SeaTunnelApiConfigChecker;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+
+/**
+ * Use to validate the configuration of the SeaTunnel API.
+ */
+public class SeaTunnelApiConfValidateCommand implements Command<SparkCommandArgs> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelApiConfValidateCommand.class);
+
+    private final SparkCommandArgs sparkCommandArgs;
+
+    public SeaTunnelApiConfValidateCommand(SparkCommandArgs sparkCommandArgs) {
+        this.sparkCommandArgs = sparkCommandArgs;
+    }
+
+    @Override
+    public void execute() throws ConfigCheckException {
+        Path configPath = FileUtils.getConfigPath(sparkCommandArgs);
+        // todo: validate the config by new api
+        ConfigBuilder configBuilder = new ConfigBuilder(configPath);
+        new SeaTunnelApiConfigChecker().checkConfig(configBuilder.getConfig());
+        LOGGER.info("config OK !");
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
new file mode 100644
index 00000000..55f731d9
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SeaTunnelApiTaskExecuteCommand.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.spark.command;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.common.utils.SerializationUtils;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.EnvironmentFactory;
+import org.apache.seatunnel.core.base.exception.CommandExecuteException;
+import org.apache.seatunnel.core.base.utils.FileUtils;
+import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
+import org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSourcePluginDiscovery;
+import org.apache.seatunnel.spark.SparkEnvironment;
+import org.apache.seatunnel.translation.spark.sink.SparkSinkInjector;
+import org.apache.seatunnel.translation.spark.utils.TypeConverterUtils;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+
+/**
+ * todo: do we need to move these class to a new module? since this may cause version conflict with the old flink version.
+ * This command is used to execute the Flink job by SeaTunnel new API.
+ */
+public class SeaTunnelApiTaskExecuteCommand implements Command<SparkCommandArgs> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelApiTaskExecuteCommand.class);
+
+    private final SparkCommandArgs sparkCommandArgs;
+
+    public SeaTunnelApiTaskExecuteCommand(SparkCommandArgs sparkCommandArgs) {
+        this.sparkCommandArgs = sparkCommandArgs;
+    }
+
+    @Override
+    public void execute() throws CommandExecuteException {
+        Path configFile = FileUtils.getConfigPath(sparkCommandArgs);
+
+        Config config = new ConfigBuilder(configFile).getConfig();
+        SparkEnvironment sparkEnvironment = getSparkEnvironment(config);
+        SeaTunnelSource<?, ?, ?> source = getSource(config);
+        Dataset<Row> dataset = sparkEnvironment.getSparkSession().read().format("SeaTunnelSource")
+                .option("source.serialization", SerializationUtils.objectToString(source))
+                .schema(TypeConverterUtils.convertRow(source.getRowTypeInfo())).load();
+        SeaTunnelSink<?, ?, ?, ?> sink = getSink(config);
+        try {
+            SparkSinkInjector.inject(dataset.write(), sink, new HashMap<>(Common.COLLECTION_SIZE)).option(
+                    "checkpointLocation", "/tmp").save();
+        } catch (Exception e) {
+            LOGGER.error("run seatunnel on spark failed.", e);
+        }
+    }
+
+    private SeaTunnelSource<?, ?, ?> getSource(Config config) {
+        PluginIdentifier pluginIdentifier = getSourcePluginIdentifier();
+        // todo: use FactoryUtils to load the plugin
+        SeaTunnelSourcePluginDiscovery sourcePluginDiscovery = new SeaTunnelSourcePluginDiscovery();
+        return sourcePluginDiscovery.getPluginInstance(pluginIdentifier);
+    }
+
+    private SeaTunnelSink<?, ?, ?, ?> getSink(Config config) {
+        PluginIdentifier pluginIdentifier = getSinkPluginIdentifier();
+        SeaTunnelSinkPluginDiscovery sinkPluginDiscovery = new SeaTunnelSinkPluginDiscovery();
+        return sinkPluginDiscovery.getPluginInstance(pluginIdentifier);
+    }
+
+    private PluginIdentifier getSourcePluginIdentifier() {
+        return PluginIdentifier.of("seatunnel", "source", "FakeSource");
+    }
+
+    private PluginIdentifier getSinkPluginIdentifier() {
+        return PluginIdentifier.of("seatunnel", "sink", "Console");
+    }
+
+    private SparkEnvironment getSparkEnvironment(Config config) {
+        SparkEnvironment sparkEnvironment = (SparkEnvironment) new EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
+        sparkEnvironment.setJobMode(JobMode.STREAMING);
+        sparkEnvironment.setConfig(config);
+        sparkEnvironment.prepare();
+
+        return sparkEnvironment;
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
index 4ea8ed1e..39512391 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
@@ -28,10 +28,42 @@ public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
     public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
         if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
             throw new IllegalArgumentException(
-                String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
+                    String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
+        }
+        switch (commandArgs.getApiType()) {
+            case ENGINE_API:
+                return new SparkApiCommandBuilder().buildCommand(commandArgs);
+            case SEATUNNEL_API:
+                return new SeaTunnelApiCommandBuilder().buildCommand(commandArgs);
+            default:
+                throw new IllegalArgumentException("Unsupported API type: " + commandArgs.getApiType());
+        }
+    }
+
+    /**
+     * Used to generate command for engine API.
+     */
+    private static class SparkApiCommandBuilder extends SparkCommandBuilder {
+        @Override
+        public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
+            if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
+                throw new IllegalArgumentException(
+                        String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
+            }
+            return commandArgs.isCheckConfig() ? new SparkConfValidateCommand(commandArgs)
+                    : new SparkTaskExecuteCommand(commandArgs);
+        }
+    }
+
+    /**
+     * Used to generate command for seaTunnel API.
+     */
+    private static class SeaTunnelApiCommandBuilder extends SparkCommandBuilder {
+        @Override
+        public Command<SparkCommandArgs> buildCommand(SparkCommandArgs commandArgs) {
+            return commandArgs.isCheckConfig() ? new SeaTunnelApiConfValidateCommand(commandArgs)
+                    : new SeaTunnelApiTaskExecuteCommand(commandArgs);
         }
-        return commandArgs.isCheckConfig() ? new SparkConfValidateCommand(commandArgs)
-            : new SparkTaskExecuteCommand(commandArgs);
     }
 
 }
diff --git a/seatunnel-examples/seatunnel-spark-examples/pom.xml b/seatunnel-examples/seatunnel-spark-examples/pom.xml
index 6b9d2c79..7f586827 100644
--- a/seatunnel-examples/seatunnel-spark-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-spark-examples/pom.xml
@@ -49,6 +49,17 @@
             <artifactId>seatunnel-connector-spark-console</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connectors-seatunnel-fake</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connectors-seatunnel-console</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!--   seatunnel connectors   -->
 
         <!--spark-->
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/SeaTunnelApiExample.java b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/SeaTunnelApiExample.java
new file mode 100644
index 00000000..f1d9b5b2
--- /dev/null
+++ b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/SeaTunnelApiExample.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.example.spark;
+
+import org.apache.seatunnel.common.config.DeployMode;
+import org.apache.seatunnel.core.base.Seatunnel;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.config.ApiType;
+import org.apache.seatunnel.core.base.exception.CommandException;
+import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
+
+import java.io.FileNotFoundException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+
+public class SeaTunnelApiExample {
+
+    public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException {
+        String configFile = getTestConfigFile("/examples/spark.batch.conf");
+        SparkCommandArgs sparkCommandArgs = new SparkCommandArgs();
+        sparkCommandArgs.setConfigFile(configFile);
+        sparkCommandArgs.setCheckConfig(false);
+        sparkCommandArgs.setVariables(null);
+        sparkCommandArgs.setApiType(ApiType.SEATUNNEL_API);
+        sparkCommandArgs.setDeployMode(DeployMode.CLIENT);
+        Command<SparkCommandArgs> flinkCommand =
+                new SparkCommandBuilder().buildCommand(sparkCommandArgs);
+        Seatunnel.run(flinkCommand);
+    }
+
+    public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException {
+        URL resource = LocalSparkExample.class.getResource(configFile);
+        if (resource == null) {
+            throw new FileNotFoundException("Can't find config file: " + configFile);
+        }
+        return Paths.get(resource.toURI()).toString();
+    }
+}
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
index 5a07efb2..e2250fd9 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/sink/SparkSink.java
@@ -38,7 +38,7 @@ import java.util.Optional;
 public class SparkSink<InputT, StateT, CommitInfoT, AggregatedCommitInfoT> implements WriteSupport,
         StreamWriteSupport, DataSourceV2 {
 
-    private SeaTunnelSink<InputT, StateT, CommitInfoT, AggregatedCommitInfoT> sink;
+    private volatile SeaTunnelSink<InputT, StateT, CommitInfoT, AggregatedCommitInfoT> sink;
     private Map<String, String> configuration;
 
     private void init(DataSourceOptions options) {
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index deab83d9..bba5b661 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -86,7 +86,7 @@ public class SeaTunnelSourceSupport implements DataSourceV2, ReadSupport, MicroB
     }
 
     private static StructType checkRowType(Optional<StructType> rowTypeOptional) {
-        return rowTypeOptional.orElseThrow(SeaTunnelSourceSupport::createUnspecifiedRowTypeException);
+        return rowTypeOptional.orElse(null);
     }
 
     private static RuntimeException createUnspecifiedRowTypeException() {
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
index 3fb269bb..23b29c7f 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/source/micro/MicroBatchParallelSourceReader.java
@@ -83,7 +83,7 @@ public class MicroBatchParallelSourceReader implements MicroBatchReader {
 
     @Override
     public StructType readSchema() {
-        return rowType;
+        return this.rowType;
     }
 
     @Override
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java
index 932cd22a..bac7cbb4 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/java/org/apache/seatunnel/translation/spark/utils/TypeConverterUtils.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.PojoType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
 import org.apache.seatunnel.api.table.type.TimestampType;
 import org.apache.seatunnel.translation.spark.types.ArrayTypeConverter;
 import org.apache.seatunnel.translation.spark.types.BasicTypeConverter;
@@ -28,7 +29,10 @@ import org.apache.seatunnel.translation.spark.types.PojoTypeConverter;
 import org.apache.seatunnel.translation.spark.types.TimestampTypeConverter;
 
 import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Metadata;
 import org.apache.spark.sql.types.ObjectType;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
 
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -42,7 +46,7 @@ public class TypeConverterUtils {
 
     public static <T> DataType convert(SeaTunnelDataType<T> seaTunnelDataType) {
         if (seaTunnelDataType instanceof BasicType) {
-            convertBasicType((BasicType<T>) seaTunnelDataType);
+            return convertBasicType((BasicType<T>) seaTunnelDataType);
         }
         if (seaTunnelDataType instanceof TimestampType) {
             return TimestampTypeConverter.INSTANCE.convert((TimestampType) seaTunnelDataType);
@@ -124,4 +128,14 @@ public class TypeConverterUtils {
         PojoTypeConverter<T> pojoTypeConverter = new PojoTypeConverter<>();
         return pojoTypeConverter.convert(pojoType);
     }
+
+    public static StructType convertRow(SeaTunnelRowTypeInfo typeInfo) {
+        StructField[] fields = new StructField[typeInfo.getFieldNames().length];
+        for (int i = 0; i < typeInfo.getFieldNames().length; i++) {
+            fields[i] = new StructField(typeInfo.getFieldNames()[i],
+                    convert(typeInfo.getSeaTunnelDataTypes()[i]), true, Metadata.empty());
+        }
+        return new StructType(fields);
+    }
+
 }
diff --git a/seatunnel-translation/seatunnel-translation-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister b/seatunnel-translation/seatunnel-translation-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 2a5363d0..a432e9b9 100644
--- a/seatunnel-translation/seatunnel-translation-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ b/seatunnel-translation/seatunnel-translation-spark/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -1 +1,18 @@
-org.apache.seatunnel.translation.spark.source.SeaTunnelSourceSupport
\ No newline at end of file
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.seatunnel.translation.spark.source.SeaTunnelSourceSupport
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 0b2f9822..05279dfa 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -570,6 +570,7 @@ okio-1.4.0.jar
 okio-2.8.0.jar
 opencsv-4.6.jar
 orc-core-1.5.6.jar
+orc-shims-1.5.2.jar
 orc-shims-1.5.6.jar
 osgi-resource-locator-1.0.1.jar
 paranamer-2.3.jar