You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/13 07:52:14 UTC

[incubator-seatunnel] branch api-draft updated: Add SeaTunnelAPIExample (#1867)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 96bd7031 Add SeaTunnelAPIExample (#1867)
96bd7031 is described below

commit 96bd7031fb59c65dabe31c4a3fe803f43512186c
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri May 13 15:52:09 2022 +0800

    Add SeaTunnelAPIExample (#1867)
    
    * Add command implementation
    * Add SeaTunnelAPIExample
---
 .../seatunnel/console/sink/ConsoleSink.java        |  29 -----
 .../org.apache.seatunnel.api.sink.SeaTunnelSink    |  17 +++
 .../seatunnel/fake/source/FakeSource.java          |   5 +-
 .../seatunnel/fake/source/FakeSourceReader.java    |  22 +++-
 ...org.apache.seatunnel.api.source.SeaTunnelSource |  17 +++
 .../org/apache/seatunnel/core/base/Seatunnel.java  |   3 +-
 .../core/base/command/AbstractCommandArgs.java     |   1 +
 .../seatunnel/core/base/command/Command.java       |   3 +-
 .../{command/Command.java => config/ApiType.java}  |  26 ++---
 .../seatunnel/core/base/config/ConfigBuilder.java  |  19 +---
 .../Command.java => config/ConfigChecker.java}     |  20 ++--
 .../core/base/config/EnvironmentFactory.java       |   1 +
 .../CommandException.java}                         |  23 ++--
 .../CommandExecuteException.java}                  |  23 ++--
 .../ConfigCheckException.java}                     |  21 ++--
 seatunnel-core/seatunnel-core-flink/pom.xml        |  26 +++++
 .../seatunnel/core/flink/SeatunnelFlink.java       |   3 +-
 .../core/flink/args/FlinkCommandArgs.java          |  36 ++++++
 ...mmand.java => FlinkApiConfValidateCommand.java} |  14 ++-
 ...ommand.java => FlinkApiTaskExecuteCommand.java} |  13 ++-
 .../core/flink/command/FlinkCommandBuilder.java    |  32 +++++-
 ...d.java => SeaTunnelApiConfValidateCommand.java} |  27 ++---
 .../command/SeaTunnelApiTaskExecuteCommand.java    | 121 +++++++++++++++++++++
 .../core/flink/config/FlinkApiConfigChecker.java   |  46 ++++++++
 .../flink/config/SeaTunnelApiConfigChecker.java}   |  22 ++--
 .../core/flink/config/SeaTunnelApiEnvironment.java |  48 ++++----
 .../seatunnel/core/spark/SeatunnelSpark.java       |   3 +-
 .../apache/seatunnel/core/spark/SparkStarter.java  |   2 +-
 .../spark/command/SparkConfValidateCommand.java    |   8 +-
 .../spark/command/SparkTaskExecuteCommand.java     |   7 +-
 .../spark/config/SeaTunnelApiConfigChecker.java}   |  22 ++--
 .../core/spark/config/SeaTunnelEnvironment.java    |  66 +++++++++++
 .../core/spark/config/SparkApiConfigChecker.java   |  46 ++++++++
 .../seatunnel-flink-examples/pom.xml               |  10 ++
 .../seatunnel/example/flink/LocalFlinkExample.java |   3 +-
 ...lFlinkExample.java => SeaTunnelApiExample.java} |   7 +-
 .../seatunnel/example/spark/LocalSparkExample.java |   3 +-
 .../translation/flink/sink/FlinkSinkWriter.java    |   3 +
 38 files changed, 589 insertions(+), 209 deletions(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index 947b3125..d7eab44b 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -17,16 +17,12 @@
 
 package org.apache.seatunnel.connectors.seatunnel.console.sink;
 
-import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
 
 import java.util.List;
-import java.util.Optional;
 
 public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, ConsoleCommitInfo, ConsoleAggregatedCommitInfo> {
 
@@ -40,29 +36,4 @@ public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState, Co
         SinkWriter.Context context, List<ConsoleState> states) {
         return restoreWriter(context, states);
     }
-
-    @Override
-    public Optional<Serializer<ConsoleState>> getWriterStateSerializer() {
-        return getWriterStateSerializer();
-    }
-
-    @Override
-    public Optional<SinkCommitter<ConsoleCommitInfo>> createCommitter() {
-        return createCommitter();
-    }
-
-    @Override
-    public Optional<Serializer<ConsoleCommitInfo>> getCommitInfoSerializer() {
-        return getCommitInfoSerializer();
-    }
-
-    @Override
-    public Optional<SinkAggregatedCommitter<ConsoleCommitInfo, ConsoleAggregatedCommitInfo>> createAggregatedCommitter() {
-        return createAggregatedCommitter();
-    }
-
-    @Override
-    public Optional<Serializer<ConsoleAggregatedCommitInfo>> getAggregatedCommitInfoSerializer() {
-        return getAggregatedCommitInfoSerializer();
-    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink
new file mode 100644
index 00000000..12b49983
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink
\ No newline at end of file
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 0c3fb31b..9f18668b 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -22,9 +22,10 @@ import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
 
-public class FakeSource implements SeaTunnelSource<FakeSourceEvent, FakeSourceSplit, FakeState> {
+public class FakeSource implements SeaTunnelSource<SeaTunnelRow, FakeSourceSplit, FakeState> {
 
     @Override
     public Boundedness getBoundedness() {
@@ -32,7 +33,7 @@ public class FakeSource implements SeaTunnelSource<FakeSourceEvent, FakeSourceSp
     }
 
     @Override
-    public SourceReader<FakeSourceEvent, FakeSourceSplit> createReader(SourceReader.Context readerContext) {
+    public SourceReader<SeaTunnelRow, FakeSourceSplit> createReader(SourceReader.Context readerContext) {
         return new FakeSourceReader(readerContext);
     }
 
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index fa1c28c4..bf2cb2f6 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -19,18 +19,27 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source;
 
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
 
-public class FakeSourceReader implements SourceReader<FakeSourceEvent, FakeSourceSplit> {
+public class FakeSourceReader implements SourceReader<SeaTunnelRow, FakeSourceSplit> {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceReader.class);
 
     private final SourceReader.Context context;
 
+    private final String[] names = {"Wenjun", "Fanjia", "Zongwen", "CalvinKirs"};
+    private final int[] ages = {1024, 2048, 4096, 8192};
+    private final Random random = ThreadLocalRandom.current();
+
     public FakeSourceReader(SourceReader.Context context) {
         this.context = context;
     }
@@ -47,8 +56,15 @@ public class FakeSourceReader implements SourceReader<FakeSourceEvent, FakeSourc
 
     @Override
     @SuppressWarnings("magicnumber")
-    public void pollNext(Collector<FakeSourceEvent> output) {
-        output.collect(new FakeSourceEvent("Tom", 19, System.currentTimeMillis()));
+    public void pollNext(Collector<SeaTunnelRow> output) throws InterruptedException {
+        Thread.sleep(1000L);
+        int i = random.nextInt(names.length);
+        Map<String, Object> fieldMap = new HashMap<>(4);
+        fieldMap.put("name", names[i]);
+        fieldMap.put("age", ages[i]);
+        fieldMap.put("timestamp", System.currentTimeMillis());
+        SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{names[i], ages[i], System.currentTimeMillis()}, fieldMap);
+        output.collect(seaTunnelRow);
     }
 
     @Override
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource
new file mode 100644
index 00000000..b21a0581
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource
\ No newline at end of file
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
index 644df716..e4587284 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.base;
 import org.apache.seatunnel.apis.base.command.CommandArgs;
 import org.apache.seatunnel.common.config.ConfigRuntimeException;
 import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.slf4j.Logger;
@@ -34,7 +35,7 @@ public class Seatunnel {
      * @param command commandArgs
      * @param <T>         commandType
      */
-    public static <T extends CommandArgs> void run(Command<T> command) {
+    public static <T extends CommandArgs> void run(Command<T> command) throws CommandException {
         try {
             command.execute();
         } catch (ConfigRuntimeException e) {
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
index 7f46fb03..a19fe285 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
@@ -37,6 +37,7 @@ public abstract class AbstractCommandArgs implements CommandArgs {
         description = "variable substitution, such as -i city=beijing, or -i date=20190318")
     private List<String> variables = Collections.emptyList();
 
+    // todo: use command type enum
     @Parameter(names = {"-t", "--check"},
             description = "check config")
     private boolean checkConfig = false;
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
index 5cd80fc5..cca16c30 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.core.base.command;
 
 import org.apache.seatunnel.apis.base.command.CommandArgs;
+import org.apache.seatunnel.core.base.exception.CommandException;
 
 /**
  * Command interface.
@@ -30,6 +31,6 @@ public interface Command<T extends CommandArgs> {
     /**
      * Execute command
      */
-    void execute();
+    void execute() throws CommandException;
 
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ApiType.java
similarity index 73%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ApiType.java
index 5cd80fc5..5468eee3 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ApiType.java
@@ -15,21 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.base.config;
 
-import org.apache.seatunnel.apis.base.command.CommandArgs;
+public enum ApiType {
+    ENGINE_API("engine"),
+    SEATUNNEL_API("seatunnel"),
+    ;
+    private final String apiType;
 
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
-
-    /**
-     * Execute command
-     */
-    void execute();
+    ApiType(String apiType) {
+        this.apiType = apiType;
+    }
 
+    public String getApiType() {
+        return apiType;
+    }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
index 605613b0..758ec8c8 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.core.base.config;
 
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
 import org.apache.seatunnel.common.config.ConfigRuntimeException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -35,18 +34,16 @@ import java.nio.file.Path;
  *
  * @param <ENVIRONMENT> environment type.
  */
-public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
+public class ConfigBuilder {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(ConfigBuilder.class);
 
     private static final String PLUGIN_NAME_KEY = "plugin_name";
     private final Path configFile;
-    private final EngineType engine;
     private final Config config;
 
-    public ConfigBuilder(Path configFile, EngineType engine) {
+    public ConfigBuilder(Path configFile) {
         this.configFile = configFile;
-        this.engine = engine;
         this.config = load();
     }
 
@@ -75,16 +72,4 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
         return config;
     }
 
-    /**
-     * check if config is valid.
-     **/
-    public void checkConfig() {
-        // check environment
-        ENVIRONMENT environment = new EnvironmentFactory<ENVIRONMENT>(config, engine).getEnvironment();
-        // check plugins
-        PluginFactory<ENVIRONMENT> pluginFactory = new PluginFactory<>(config, engine);
-        pluginFactory.createPlugins(PluginType.SOURCE);
-        pluginFactory.createPlugins(PluginType.TRANSFORM);
-        pluginFactory.createPlugins(PluginType.SINK);
-    }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigChecker.java
similarity index 60%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigChecker.java
index 5cd80fc5..30e4243e 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigChecker.java
@@ -15,21 +15,25 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.base.config;
 
-import org.apache.seatunnel.apis.base.command.CommandArgs;
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 /**
- * Command interface.
+ * Check the config is valid.
  *
- * @param <T> args type
+ * @param <ENVIRONMENT> the environment type.
  */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
+public interface ConfigChecker<ENVIRONMENT extends RuntimeEnv> {
 
     /**
-     * Execute command
+     * Check if the config is validated, if check fails, throw exception.
+     *
+     * @param config given config.
      */
-    void execute();
+    void checkConfig(Config config) throws ConfigCheckException;
 
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
index ff450836..9daa23b7 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
@@ -43,6 +43,7 @@ public class EnvironmentFactory<ENVIRONMENT extends RuntimeEnv> {
         this.engine = engine;
     }
 
+    // todo:put this method into submodule to avoid dependency on the engine
     public synchronized ENVIRONMENT getEnvironment() {
         Config envConfig = config.getConfig("env");
         boolean enableHive = checkIsContainHive();
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandException.java
similarity index 73%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandException.java
index 5cd80fc5..33f2013b 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandException.java
@@ -15,21 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.base.exception;
 
-import org.apache.seatunnel.apis.base.command.CommandArgs;
-
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
-
-    /**
-     * Execute command
-     */
-    void execute();
+public class CommandException extends Exception {
+    public CommandException(String message) {
+        super(message);
+    }
 
+    public CommandException(String message, Throwable cause) {
+        super(message, cause);
+    }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandExecuteException.java
similarity index 72%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandExecuteException.java
index 5cd80fc5..a3a8ee42 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandExecuteException.java
@@ -15,21 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.base.exception;
 
-import org.apache.seatunnel.apis.base.command.CommandArgs;
-
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
-
-    /**
-     * Execute command
-     */
-    void execute();
+public class CommandExecuteException extends CommandException {
+    public CommandExecuteException(String message) {
+        super(message);
+    }
 
+    public CommandExecuteException(String message, Throwable cause) {
+        super(message, cause);
+    }
 }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/ConfigCheckException.java
similarity index 73%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/ConfigCheckException.java
index 5cd80fc5..22d193bf 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/ConfigCheckException.java
@@ -15,21 +15,16 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.base.exception;
 
-import org.apache.seatunnel.apis.base.command.CommandArgs;
+public class ConfigCheckException extends CommandException {
 
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
+    public ConfigCheckException(String message) {
+        super(message);
+    }
 
-    /**
-     * Execute command
-     */
-    void execute();
+    public ConfigCheckException(String message, Throwable cause) {
+        super(message, cause);
+    }
 
 }
diff --git a/seatunnel-core/seatunnel-core-flink/pom.xml b/seatunnel-core/seatunnel-core-flink/pom.xml
index d985628a..d5101a41 100644
--- a/seatunnel-core/seatunnel-core-flink/pom.xml
+++ b/seatunnel-core/seatunnel-core-flink/pom.xml
@@ -42,6 +42,32 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-api</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-translation-flink</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- todo: use another module to execute new API?        -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-api-flink</artifactId>
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
index 45792d00..f1db4a15 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.flink;
 
 import org.apache.seatunnel.core.base.Seatunnel;
 import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.flink.command.FlinkCommandBuilder;
 import org.apache.seatunnel.core.flink.config.FlinkJobType;
@@ -26,7 +27,7 @@ import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
 
 public class SeatunnelFlink {
 
-    public static void main(String[] args) {
+    public static void main(String[] args) throws CommandException {
         FlinkCommandArgs flinkCommandArgs = CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
         Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
             .buildCommand(flinkCommandArgs);
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
index fb892f9c..5b27fcfd 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.flink.args;
 
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.base.command.AbstractCommandArgs;
+import org.apache.seatunnel.core.base.config.ApiType;
 import org.apache.seatunnel.core.base.config.EngineType;
 import org.apache.seatunnel.core.flink.config.FlinkRunMode;
 
@@ -34,6 +35,11 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
         description = "job run mode, run or run-application")
     private FlinkRunMode runMode = FlinkRunMode.RUN;
 
+    @Parameter(names = {"-api", "--api-type"},
+        converter = ApiTypeConverter.class,
+        description = "Api type, engine or seatunnel")
+    private ApiType apiType = ApiType.ENGINE_API;
+
     /**
      * Undefined parameters parsed will be stored here as flink command parameters.
      */
@@ -65,6 +71,14 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
         this.flinkParams = flinkParams;
     }
 
+    public ApiType getApiType() {
+        return apiType;
+    }
+
+    public void setApiType(ApiType apiType) {
+        this.apiType = apiType;
+    }
+
     /**
      * Used to convert the run mode string to the enum value.
      */
@@ -86,4 +100,26 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
         }
     }
 
+    /**
+     * Used to convert the api type string to the enum value.
+     */
+    private static class ApiTypeConverter implements IStringConverter<ApiType> {
+
+        /**
+         * If the '-api' is not set, then will not go into this convert method.
+         *
+         * @param value input value set by '-api' or '--api-type'
+         * @return api type enum value
+         */
+        @Override
+        public ApiType convert(String value) {
+            for (ApiType apiType : ApiType.values()) {
+                if (apiType.getApiType().equalsIgnoreCase(value)) {
+                    return apiType;
+                }
+            }
+            throw new IllegalArgumentException(String.format("API type %s not supported", value));
+        }
+    }
+
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkConfValidateCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java
similarity index 74%
rename from seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkConfValidateCommand.java
rename to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java
index 5868e252..f03522a0 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java
@@ -19,9 +19,10 @@ package org.apache.seatunnel.core.flink.command;
 
 import org.apache.seatunnel.core.base.command.Command;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
 import org.apache.seatunnel.core.base.utils.FileUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.core.flink.config.FlinkApiConfigChecker;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,20 +32,21 @@ import java.nio.file.Path;
 /**
  * Used to check the Flink conf is validated.
  */
-public class FlinkConfValidateCommand implements Command<FlinkCommandArgs> {
+public class FlinkApiConfValidateCommand implements Command<FlinkCommandArgs> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkConfValidateCommand.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkApiConfValidateCommand.class);
 
     private final FlinkCommandArgs flinkCommandArgs;
 
-    public FlinkConfValidateCommand(FlinkCommandArgs flinkCommandArgs) {
+    public FlinkApiConfValidateCommand(FlinkCommandArgs flinkCommandArgs) {
         this.flinkCommandArgs = flinkCommandArgs;
     }
 
     @Override
-    public void execute() {
+    public void execute() throws ConfigCheckException {
         Path configPath = FileUtils.getConfigPath(flinkCommandArgs);
-        new ConfigBuilder<FlinkEnvironment>(configPath, flinkCommandArgs.getEngineType()).checkConfig();
+        ConfigBuilder configBuilder = new ConfigBuilder(configPath);
+        new FlinkApiConfigChecker().checkConfig(configBuilder.getConfig());
         LOGGER.info("config OK !");
     }
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
similarity index 84%
rename from seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
rename to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
index 32d13440..345f8414 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.core.base.config.ConfigBuilder;
 import org.apache.seatunnel.core.base.config.EngineType;
 import org.apache.seatunnel.core.base.config.ExecutionContext;
 import org.apache.seatunnel.core.base.config.ExecutionFactory;
+import org.apache.seatunnel.core.base.exception.CommandExecuteException;
 import org.apache.seatunnel.core.base.utils.FileUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.flink.FlinkEnvironment;
@@ -36,22 +37,22 @@ import java.nio.file.Path;
 import java.util.List;
 
 /**
- * Used to execute Flink Job.
+ * Used to execute Flink Job by Flink API.
  */
-public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommandArgs, FlinkEnvironment> {
+public class FlinkApiTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommandArgs, FlinkEnvironment> {
 
     private final FlinkCommandArgs flinkCommandArgs;
 
-    public FlinkTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
+    public FlinkApiTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
         this.flinkCommandArgs = flinkCommandArgs;
     }
 
     @Override
-    public void execute() {
+    public void execute() throws CommandExecuteException {
         EngineType engine = flinkCommandArgs.getEngineType();
         Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
 
-        Config config = new ConfigBuilder<>(configFile, engine).getConfig();
+        Config config = new ConfigBuilder(configFile).getConfig();
         ExecutionContext<FlinkEnvironment> executionContext = new ExecutionContext<>(config, engine);
         List<BaseSource<FlinkEnvironment>> sources = executionContext.getSources();
         List<BaseTransform<FlinkEnvironment>> transforms = executionContext.getTransforms();
@@ -68,7 +69,7 @@ public class FlinkTaskExecuteCommand extends BaseTaskExecuteCommand<FlinkCommand
             execution.start(sources, transforms, sinks);
             close(sources, transforms, sinks);
         } catch (Exception e) {
-            throw new RuntimeException("Execute Flink task error", e);
+            throw new CommandExecuteException("Execute Flink task error", e);
         }
     }
 
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
index 595f01a5..590e7d09 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
@@ -30,7 +30,35 @@ public class FlinkCommandBuilder implements CommandBuilder<FlinkCommandArgs> {
             throw new IllegalArgumentException(
                 String.format("Deploy mode: %s is Illegal", commandArgs.getDeployMode()));
         }
-        return commandArgs.isCheckConfig() ? new FlinkConfValidateCommand(commandArgs)
-            : new FlinkTaskExecuteCommand(commandArgs);
+        switch (commandArgs.getApiType()) {
+            case ENGINE_API:
+                return new FlinkApiCommandBuilder().buildCommand(commandArgs);
+            case SEATUNNEL_API:
+                return new SeaTunnelApiCommandBuilder().buildCommand(commandArgs);
+            default:
+                throw new IllegalArgumentException("Unsupported API type: " + commandArgs.getApiType());
+        }
+    }
+
+    /**
+     * Used to generate command for engine API.
+     */
+    private static class FlinkApiCommandBuilder extends FlinkCommandBuilder {
+        @Override
+        public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
+            return commandArgs.isCheckConfig() ? new FlinkApiConfValidateCommand(commandArgs)
+                : new FlinkApiTaskExecuteCommand(commandArgs);
+        }
+    }
+
+    /**
+     * Used to generate command for seaTunnel API.
+     */
+    private static class SeaTunnelApiCommandBuilder extends FlinkCommandBuilder {
+        @Override
+        public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs commandArgs) {
+            return commandArgs.isCheckConfig() ? new SeaTunnelApiConfValidateCommand(commandArgs)
+                : new SeaTunnelApiTaskExecuteCommand(commandArgs);
+        }
     }
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiConfValidateCommand.java
similarity index 64%
rename from seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelTaskExecuteCommand.java
rename to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiConfValidateCommand.java
index c58884f5..e9c93411 100644
--- a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiConfValidateCommand.java
@@ -19,11 +19,10 @@ package org.apache.seatunnel.core.flink.command;
 
 import org.apache.seatunnel.core.base.command.Command;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
-import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
 import org.apache.seatunnel.core.base.utils.FileUtils;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.core.flink.config.SeaTunnelApiConfigChecker;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,26 +30,24 @@ import org.slf4j.LoggerFactory;
 import java.nio.file.Path;
 
 /**
- * Used to execute a SeaTunnelTask. This command is used to execute the Flink job by new API.
+ * Use to validate the configuration of the SeaTunnel API.
  */
-public class SeaTunnelTaskExecuteCommand implements Command<FlinkCommandArgs> {
+public class SeaTunnelApiConfValidateCommand implements Command<FlinkCommandArgs> {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkConfValidateCommand.class);
+    private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelApiConfValidateCommand.class);
 
     private final FlinkCommandArgs flinkCommandArgs;
 
-    public SeaTunnelTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
+    public SeaTunnelApiConfValidateCommand(FlinkCommandArgs flinkCommandArgs) {
         this.flinkCommandArgs = flinkCommandArgs;
     }
 
     @Override
-    public void execute() {
-        EngineType engine = flinkCommandArgs.getEngineType();
-        Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
-
-        Config config = new ConfigBuilder<>(configFile, engine).getConfig();
-        // initialize the new plugin.
-        // translate plugin to flink source/sink
-        // execute the flink job
+    public void execute() throws ConfigCheckException {
+        Path configPath = FileUtils.getConfigPath(flinkCommandArgs);
+        // todo: validate the config by new api
+        ConfigBuilder configBuilder = new ConfigBuilder(configPath);
+        new SeaTunnelApiConfigChecker().checkConfig(configBuilder.getConfig());
+        LOGGER.info("config OK !");
     }
 }
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
new file mode 100644
index 00000000..e767df5f
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.command;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.exception.CommandExecuteException;
+import org.apache.seatunnel.core.base.utils.FileUtils;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
+import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
+import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+/**
+ * This command is used to execute the Flink job by SeaTunnel new API.
+ */
+public class SeaTunnelApiTaskExecuteCommand implements Command<FlinkCommandArgs> {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkApiConfValidateCommand.class);
+
+    private final FlinkCommandArgs flinkCommandArgs;
+
+    public SeaTunnelApiTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
+        this.flinkCommandArgs = flinkCommandArgs;
+    }
+
+    @Override
+    public void execute() throws CommandExecuteException {
+        EngineType engine = flinkCommandArgs.getEngineType();
+        Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
+
+        Config config = new ConfigBuilder(configFile).getConfig();
+        // todo: add basic type
+        SeaTunnelParallelSource source = getSource();
+        Sink<WrappedRow, Object, Object, Object> flinkSink = getSink();
+        // execute the flink job
+        FlinkEnvironment flinkEnvironment = getFlinkEnvironment(config);
+        StreamExecutionEnvironment streamExecutionEnvironment = flinkEnvironment.getStreamExecutionEnvironment();
+        DataStreamSource<WrappedRow> dataStream = streamExecutionEnvironment.addSource(source);
+        dataStream.sinkTo(flinkSink);
+        try {
+            streamExecutionEnvironment.execute("SeaTunnelAPITaskExecuteCommand");
+        } catch (Exception e) {
+            throw new CommandExecuteException("SeaTunnelAPITaskExecuteCommand execute failed", e);
+        }
+    }
+
+    private SeaTunnelParallelSource getSource() {
+        return new SeaTunnelParallelSource(loadSourcePlugin());
+    }
+
+    private Sink<WrappedRow, Object, Object, Object> getSink() {
+        SeaTunnelSink<SeaTunnelRow, Object, Object, Object> sink = loadSinkPlugin();
+        FlinkSinkConverter<SeaTunnelRow, WrappedRow, Object, Object, Object> flinkSinkConverter = new FlinkSinkConverter<>();
+        return flinkSinkConverter.convert(sink, Collections.emptyMap());
+    }
+
+    private <T, SplitT extends SourceSplit, StateT> SeaTunnelSource<T, SplitT, StateT> loadSourcePlugin() {
+        // todo: use FactoryUtils to load the plugin
+        ServiceLoader<SeaTunnelSource> serviceLoader = ServiceLoader.load(SeaTunnelSource.class);
+        Iterator<SeaTunnelSource> iterator = serviceLoader.iterator();
+        if (iterator.hasNext()) {
+            return iterator.next();
+        }
+        throw new IllegalArgumentException("Cannot find the plugin.");
+    }
+
+    private <IN, StateT, CommitInfoT, AggregatedCommitInfoT> SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT> loadSinkPlugin() {
+        // todo: use FactoryUtils to load the plugin
+        ServiceLoader<SeaTunnelSink> serviceLoader = ServiceLoader.load(SeaTunnelSink.class);
+        Iterator<SeaTunnelSink> iterator = serviceLoader.iterator();
+        if (iterator.hasNext()) {
+            return iterator.next();
+        }
+        throw new IllegalArgumentException("Cannot find the plugin.");
+    }
+
+    private FlinkEnvironment getFlinkEnvironment(Config config) {
+        FlinkEnvironment flinkEnvironment = new FlinkEnvironment();
+        flinkEnvironment.setJobMode(JobMode.STREAMING);
+        flinkEnvironment.setConfig(config);
+        flinkEnvironment.prepare();
+
+        return flinkEnvironment;
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java
new file mode 100644
index 00000000..7dca74ec
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.config;
+
+import org.apache.seatunnel.core.base.config.ConfigChecker;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.EnvironmentFactory;
+import org.apache.seatunnel.core.base.config.PluginFactory;
+import org.apache.seatunnel.core.base.config.PluginType;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+public class FlinkApiConfigChecker implements ConfigChecker<FlinkEnvironment> {
+
+    @Override
+    public void checkConfig(Config config) throws ConfigCheckException {
+        try {
+            // check environment
+            FlinkEnvironment environment = new EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
+            // check plugins
+            PluginFactory<FlinkEnvironment> pluginFactory = new PluginFactory<>(config, EngineType.FLINK);
+            pluginFactory.createPlugins(PluginType.SOURCE);
+            pluginFactory.createPlugins(PluginType.TRANSFORM);
+            pluginFactory.createPlugins(PluginType.SINK);
+        } catch (Exception ex) {
+            throw new ConfigCheckException("Config check fail", ex);
+        }
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiConfigChecker.java
similarity index 64%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiConfigChecker.java
index 5cd80fc5..7f0a722f 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiConfigChecker.java
@@ -15,21 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.flink.config;
 
-import org.apache.seatunnel.apis.base.command.CommandArgs;
+import org.apache.seatunnel.core.base.config.ConfigChecker;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
 
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-    /**
-     * Execute command
-     */
-    void execute();
+public class SeaTunnelApiConfigChecker implements ConfigChecker<SeaTunnelApiEnvironment> {
 
+    @Override
+    public void checkConfig(Config config) throws ConfigCheckException {
+        // todo: implement
+    }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiEnvironment.java
similarity index 55%
copy from seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
copy to seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiEnvironment.java
index fa1c28c4..7d0d5956 100644
--- a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiEnvironment.java
@@ -15,59 +15,61 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
+package org.apache.seatunnel.core.flink.config;
 
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.JobMode;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URL;
 import java.util.List;
 
-public class FakeSourceReader implements SourceReader<FakeSourceEvent, FakeSourceSplit> {
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceReader.class);
+public class SeaTunnelApiEnvironment implements RuntimeEnv {
 
-    private final SourceReader.Context context;
+    private static final Logger LOGGER = LoggerFactory.getLogger(SeaTunnelApiEnvironment.class);
 
-    public FakeSourceReader(SourceReader.Context context) {
-        this.context = context;
-    }
+    private Config config;
 
     @Override
-    public void open() {
-
+    public SeaTunnelApiEnvironment setConfig(Config config) {
+        this.config = config;
+        return this;
     }
 
     @Override
-    public void close() {
-
+    public Config getConfig() {
+        return config;
     }
 
     @Override
-    @SuppressWarnings("magicnumber")
-    public void pollNext(Collector<FakeSourceEvent> output) {
-        output.collect(new FakeSourceEvent("Tom", 19, System.currentTimeMillis()));
+    public CheckResult checkConfig() {
+        // todo
+        return null;
     }
 
     @Override
-    public List<FakeSourceSplit> snapshotState(long checkpointId) {
+    public SeaTunnelApiEnvironment prepare() {
+        // todo
         return null;
     }
 
     @Override
-    public void addSplits(List<FakeSourceSplit> splits) {
-
+    public SeaTunnelApiEnvironment setJobMode(JobMode mode) {
+        return null;
     }
 
     @Override
-    public void handleNoMoreSplits() {
-
+    public JobMode getJobMode() {
+        return null;
     }
 
     @Override
-    public void notifyCheckpointComplete(long checkpointId) {
+    public void registerPlugin(List<URL> pluginPaths) {
 
     }
 }
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
index d88cc7ec..05a1a1d1 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
@@ -19,13 +19,14 @@ package org.apache.seatunnel.core.spark;
 
 import org.apache.seatunnel.core.base.Seatunnel;
 import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
 import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
 import org.apache.seatunnel.core.spark.utils.CommandLineUtils;
 
 public class SeatunnelSpark {
 
-    public static void main(String[] args) {
+    public static void main(String[] args) throws CommandException {
         SparkCommandArgs sparkArgs = CommandLineUtils.parseSparkArgs(args);
         Command<SparkCommandArgs> sparkCommand =
             new SparkCommandBuilder().buildCommand(sparkArgs);
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index 81f44ba9..41b53080 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -220,7 +220,7 @@ public class SparkStarter implements Starter {
         if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir)) {
             return Collections.emptyList();
         }
-        Config config = new ConfigBuilder<>(Paths.get(commandArgs.getConfigFile()), EngineType.SPARK).getConfig();
+        Config config = new ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig();
         PluginFactory<RuntimeEnv> pluginFactory = new PluginFactory<>(config, EngineType.SPARK);
         return pluginFactory.getPluginJarPaths().stream().map(url -> new File(url.getPath()).toPath()).collect(Collectors.toList());
     }
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
index 5629e51d..0f4a9307 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
@@ -19,9 +19,10 @@ package org.apache.seatunnel.core.spark.command;
 
 import org.apache.seatunnel.core.base.command.Command;
 import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
 import org.apache.seatunnel.core.base.utils.FileUtils;
 import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.spark.SparkEnvironment;
+import org.apache.seatunnel.core.spark.config.SparkApiConfigChecker;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,9 +43,10 @@ public class SparkConfValidateCommand implements Command<SparkCommandArgs> {
     }
 
     @Override
-    public void execute() {
+    public void execute() throws ConfigCheckException {
         Path confPath = FileUtils.getConfigPath(sparkCommandArgs);
-        new ConfigBuilder<SparkEnvironment>(confPath, sparkCommandArgs.getEngineType()).checkConfig();
+        ConfigBuilder configBuilder = new ConfigBuilder(confPath);
+        new SparkApiConfigChecker().checkConfig(configBuilder.getConfig());
         LOGGER.info("config OK !");
     }
 }
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
index c8aebd4b..9a051416 100644
--- a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.core.base.config.ConfigBuilder;
 import org.apache.seatunnel.core.base.config.EngineType;
 import org.apache.seatunnel.core.base.config.ExecutionContext;
 import org.apache.seatunnel.core.base.config.ExecutionFactory;
+import org.apache.seatunnel.core.base.exception.CommandExecuteException;
 import org.apache.seatunnel.core.base.utils.FileUtils;
 import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.spark.SparkEnvironment;
@@ -44,11 +45,11 @@ public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommand
     }
 
     @Override
-    public void execute() {
+    public void execute() throws CommandExecuteException {
         EngineType engine = sparkCommandArgs.getEngineType();
         Path confFile = FileUtils.getConfigPath(sparkCommandArgs);
 
-        Config config = new ConfigBuilder<>(confFile, engine).getConfig();
+        Config config = new ConfigBuilder(confFile).getConfig();
         ExecutionContext<SparkEnvironment> executionContext = new ExecutionContext<>(config, engine);
 
         List<BaseSource<SparkEnvironment>> sources = executionContext.getSources();
@@ -66,7 +67,7 @@ public class SparkTaskExecuteCommand extends BaseTaskExecuteCommand<SparkCommand
             execution.start(sources, transforms, sinks);
             close(sources, transforms, sinks);
         } catch (Exception e) {
-            throw new RuntimeException("Execute Spark task error", e);
+            throw new CommandExecuteException("Execute Spark task error", e);
         }
     }
 
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
similarity index 64%
copy from seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
index 5cd80fc5..3715c14b 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
@@ -15,21 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.spark.config;
 
-import org.apache.seatunnel.apis.base.command.CommandArgs;
+import org.apache.seatunnel.core.base.config.ConfigChecker;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
 
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-    /**
-     * Execute command
-     */
-    void execute();
+public class SeaTunnelApiConfigChecker implements ConfigChecker<SeaTunnelEnvironment> {
 
+    @Override
+    public void checkConfig(Config config) throws ConfigCheckException {
+        // todo: implement
+    }
 }
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelEnvironment.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelEnvironment.java
new file mode 100644
index 00000000..6f9e0135
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelEnvironment.java
@@ -0,0 +1,66 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.spark.config;
+
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.JobMode;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.net.URL;
+import java.util.List;
+
+public class SeaTunnelEnvironment implements RuntimeEnv {
+
+    @Override
+    public SeaTunnelEnvironment setConfig(Config config) {
+        return null;
+    }
+
+    @Override
+    public Config getConfig() {
+        return null;
+    }
+
+    @Override
+    public CheckResult checkConfig() {
+        return null;
+    }
+
+    @Override
+    public SeaTunnelEnvironment prepare() {
+        return null;
+    }
+
+    @Override
+    public SeaTunnelEnvironment setJobMode(JobMode mode) {
+        return null;
+    }
+
+    @Override
+    public JobMode getJobMode() {
+        return null;
+    }
+
+    @Override
+    public void registerPlugin(List<URL> pluginPaths) {
+
+    }
+}
diff --git a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java
new file mode 100644
index 00000000..26de5794
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.spark.config;
+
+import org.apache.seatunnel.core.base.config.ConfigChecker;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.EnvironmentFactory;
+import org.apache.seatunnel.core.base.config.PluginFactory;
+import org.apache.seatunnel.core.base.config.PluginType;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+public class SparkApiConfigChecker implements ConfigChecker<FlinkEnvironment> {
+
+    @Override
+    public void checkConfig(Config config) throws ConfigCheckException {
+        try {
+            // check environment
+            FlinkEnvironment environment = new EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
+            // check plugins
+            PluginFactory<FlinkEnvironment> pluginFactory = new PluginFactory<>(config, EngineType.FLINK);
+            pluginFactory.createPlugins(PluginType.SOURCE);
+            pluginFactory.createPlugins(PluginType.TRANSFORM);
+            pluginFactory.createPlugins(PluginType.SINK);
+        } catch (Exception ex) {
+            throw new ConfigCheckException("Config check fail", ex);
+        }
+    }
+}
diff --git a/seatunnel-examples/seatunnel-flink-examples/pom.xml b/seatunnel-examples/seatunnel-flink-examples/pom.xml
index 79fbe8f1..bd5caf5e 100644
--- a/seatunnel-examples/seatunnel-flink-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-examples/pom.xml
@@ -56,6 +56,16 @@
             <artifactId>seatunnel-connector-flink-console</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connectors-seatunnel-fake</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-connectors-seatunnel-console</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!--   seatunnel connectors   -->
 
         <!--flink-->
diff --git a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
index a740b8ee..b41236e5 100644
--- a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
+++ b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.example.flink;
 
 import org.apache.seatunnel.core.base.Seatunnel;
 import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.flink.command.FlinkCommandBuilder;
 
@@ -29,7 +30,7 @@ import java.nio.file.Paths;
 
 public class LocalFlinkExample {
 
-    public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
+    public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException {
         String configFile = getTestConfigFile("/examples/fake_to_console.conf");
         FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
         flinkCommandArgs.setConfigFile(configFile);
diff --git a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java
similarity index 88%
copy from seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
copy to seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java
index a740b8ee..3f13e4b5 100644
--- a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
+++ b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.example.flink;
 
 import org.apache.seatunnel.core.base.Seatunnel;
 import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.config.ApiType;
+import org.apache.seatunnel.core.base.exception.CommandException;
 import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
 import org.apache.seatunnel.core.flink.command.FlinkCommandBuilder;
 
@@ -27,14 +29,15 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.Paths;
 
-public class LocalFlinkExample {
+public class SeaTunnelApiExample {
 
-    public static void main(String[] args) throws FileNotFoundException, URISyntaxException {
+    public static void main(String[] args) throws FileNotFoundException, URISyntaxException, CommandException {
         String configFile = getTestConfigFile("/examples/fake_to_console.conf");
         FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
         flinkCommandArgs.setConfigFile(configFile);
         flinkCommandArgs.setCheckConfig(false);
         flinkCommandArgs.setVariables(null);
+        flinkCommandArgs.setApiType(ApiType.SEATUNNEL_API);
         Command<FlinkCommandArgs> flinkCommand =
             new FlinkCommandBuilder().buildCommand(flinkCommandArgs);
         Seatunnel.run(flinkCommand);
diff --git a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
index 5469ec73..aa121a90 100644
--- a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
+++ b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.example.spark;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.core.base.Seatunnel;
 import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
 import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
 import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
 
@@ -30,7 +31,7 @@ import java.nio.file.Paths;
 
 public class LocalSparkExample {
 
-    public static void main(String[] args) throws URISyntaxException, FileNotFoundException {
+    public static void main(String[] args) throws URISyntaxException, FileNotFoundException, CommandException {
         String configFile = getTestConfigFile("/examples/spark.batch.conf");
         SparkCommandArgs sparkArgs = new SparkCommandArgs();
         sparkArgs.setConfigFile(configFile);
diff --git a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 354de014..bfdb2db4 100644
--- a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++ b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.translation.flink.sink;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.translation.flink.serialization.FlinkRowSerialization;
+import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
 
 import org.apache.flink.api.connector.sink.SinkWriter;
 import org.apache.flink.types.Row;
@@ -41,6 +42,8 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT> implements SinkWriter<
     public void write(InputT element, org.apache.flink.api.connector.sink.SinkWriter.Context context) throws IOException {
         if (element instanceof Row) {
             sinkWriter.write(rowSerialization.deserialize((Row) element));
+        } else if (element instanceof WrappedRow) {
+            sinkWriter.write(rowSerialization.deserialize(((WrappedRow) element).getRow()));
         } else {
             throw new InvalidClassException("only support Flink Row at now, the element Class is " + element.getClass());
         }