You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/03/28 02:08:35 UTC

[incubator-seatunnel] branch dev updated: [Feature][Core]implament StructuredStreamingExecution (#1400)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5e7fc37  [Feature][Core]implament StructuredStreamingExecution (#1400)
5e7fc37 is described below

commit 5e7fc371d90f50e882d605e56d648872ce68ff62
Author: kid-xiong <41...@users.noreply.github.com>
AuthorDate: Mon Mar 28 10:08:29 2022 +0800

    [Feature][Core]implament StructuredStreamingExecution (#1400)
    
    * implament StructuredStreamingExecution
    
    * fix bug:The executionenvironment should be created before the batchtableenvironment
    
    Co-authored-by: kid-xiong <ri...@apache.com>
---
 .../java/org/apache/seatunnel/env/Execution.java   |  1 +
 .../java/org/apache/seatunnel/env/RuntimeEnv.java  |  3 +
 .../java/org/apache/seatunnel/plugin/Plugin.java   |  1 +
 .../apache/seatunnel/flink/FlinkEnvironment.java   | 32 +++++++----
 .../seatunnel/flink/batch/FlinkBatchExecution.java |  1 +
 .../apache/seatunnel/spark/SparkEnvironment.java   | 67 ++++++++++++++++++++++
 .../seatunnel/spark/batch/SparkBatchExecution.java | 61 +++-----------------
 .../StructuredStreamingExecution.java              | 22 ++++++-
 .../spark/stream/SparkStreamingExecution.scala     | 11 ++--
 .../apache/seatunnel/common/constants/JobMode.java | 21 +------
 .../seatunnel/flink/source/FileSourceTest.java     |  3 +-
 .../org/apache/seatunnel/config/ConfigBuilder.java | 30 ++++++----
 12 files changed, 152 insertions(+), 101 deletions(-)

diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
index 192903c..560f7dd 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
@@ -41,4 +41,5 @@ public interface Execution<
      * @param sinks      sink plugin list
      */
     void start(List<SR> sources, List<TF> transforms, List<SK> sinks) throws Exception;
+
 }
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
index 5c8ea3c..b1fc83d 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.env;
 
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.JobMode;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -34,4 +35,6 @@ public interface RuntimeEnv {
 
     RuntimeEnv prepare();
 
+    RuntimeEnv setJobMode(JobMode mode);
+
 }
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
index 488c4d4..35237b6 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
@@ -39,6 +39,7 @@ import java.io.Serializable;
  *
  * }</pre>
  */
+
 public interface Plugin<T extends RuntimeEnv> extends Serializable, AutoCloseable {
     String RESULT_TABLE_NAME = "result_table_name";
     String SOURCE_TABLE_NAME = "source_table_name";
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
index 5875436..647240a 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.flink;
 
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.env.RuntimeEnv;
 import org.apache.seatunnel.flink.util.ConfigKeyName;
 import org.apache.seatunnel.flink.util.EnvironmentUtil;
@@ -55,7 +56,7 @@ public class FlinkEnvironment implements RuntimeEnv {
 
     private BatchTableEnvironment batchTableEnvironment;
 
-    private boolean isStreaming;
+    private JobMode jobMode;
 
     private String jobName = "seatunnel";
 
@@ -76,8 +77,9 @@ public class FlinkEnvironment implements RuntimeEnv {
     }
 
     @Override
+
     public FlinkEnvironment prepare() {
-        if (isStreaming) {
+        if (isStreaming()) {
             createStreamEnvironment();
             createStreamTableEnvironment();
         } else {
@@ -95,11 +97,11 @@ public class FlinkEnvironment implements RuntimeEnv {
     }
 
     public boolean isStreaming() {
-        return isStreaming;
+        return JobMode.STREAMING.equals(jobMode);
     }
 
-    public FlinkEnvironment setStreaming(boolean isStreaming) {
-        this.isStreaming = isStreaming;
+    public FlinkEnvironment setJobMode(JobMode jobMode) {
+        this.jobMode = jobMode;
         return this;
     }
 
@@ -115,7 +117,8 @@ public class FlinkEnvironment implements RuntimeEnv {
         // use blink and streammode
         EnvironmentSettings.Builder envBuilder = EnvironmentSettings.newInstance()
                 .inStreamingMode();
-        if (this.config.hasPath(ConfigKeyName.PLANNER) && "blink".equals(this.config.getString(ConfigKeyName.PLANNER))) {
+        if (this.config.hasPath(ConfigKeyName.PLANNER) && "blink"
+                .equals(this.config.getString(ConfigKeyName.PLANNER))) {
             envBuilder.useBlinkPlanner();
         } else {
             envBuilder.useOldPlanner();
@@ -124,7 +127,8 @@ public class FlinkEnvironment implements RuntimeEnv {
 
         tableEnvironment = StreamTableEnvironment.create(getStreamExecutionEnvironment(), environmentSettings);
         TableConfig config = tableEnvironment.getConfig();
-        if (this.config.hasPath(ConfigKeyName.MAX_STATE_RETENTION_TIME) && this.config.hasPath(ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
+        if (this.config.hasPath(ConfigKeyName.MAX_STATE_RETENTION_TIME) && this.config
+                .hasPath(ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
             long max = this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME);
             long min = this.config.getLong(ConfigKeyName.MIN_STATE_RETENTION_TIME);
             config.setIdleStateRetentionTime(Time.seconds(min), Time.seconds(max));
@@ -190,7 +194,9 @@ public class FlinkEnvironment implements RuntimeEnv {
                     environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
                     break;
                 default:
-                    LOGGER.warn("set time-characteristic failed, unknown time-characteristic [{}],only support event-time,ingestion-time,processing-time", timeType);
+                    LOGGER.warn(
+                            "set time-characteristic failed, unknown time-characteristic [{}],only support event-time,ingestion-time,processing-time",
+                            timeType);
                     break;
             }
         }
@@ -212,7 +218,9 @@ public class FlinkEnvironment implements RuntimeEnv {
                         checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
                         break;
                     default:
-                        LOGGER.warn("set checkpoint.mode failed, unknown checkpoint.mode [{}],only support exactly-once,at-least-once", mode);
+                        LOGGER.warn(
+                                "set checkpoint.mode failed, unknown checkpoint.mode [{}],only support exactly-once,at-least-once",
+                                mode);
                         break;
                 }
             }
@@ -244,9 +252,11 @@ public class FlinkEnvironment implements RuntimeEnv {
             if (config.hasPath(ConfigKeyName.CHECKPOINT_CLEANUP_MODE)) {
                 boolean cleanup = config.getBoolean(ConfigKeyName.CHECKPOINT_CLEANUP_MODE);
                 if (cleanup) {
-                    checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
+                    checkpointConfig.enableExternalizedCheckpoints(
+                            CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
                 } else {
-                    checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+                    checkpointConfig.enableExternalizedCheckpoints(
+                            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
                 }
             }
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index fac7c7c..3da634f 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -48,6 +48,7 @@ public class FlinkBatchExecution implements Execution<FlinkBatchSource, FlinkBat
     }
 
     @Override
+
     public void start(List<FlinkBatchSource> sources, List<FlinkBatchTransform> transforms, List<FlinkBatchSink> sinks) throws Exception {
         List<DataSet<Row>> data = new ArrayList<>();
 
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
index f0b638b..083891d 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
@@ -17,13 +17,20 @@
 
 package org.apache.seatunnel.spark;
 
+import static org.apache.seatunnel.plugin.Plugin.RESULT_TABLE_NAME;
+import static org.apache.seatunnel.plugin.Plugin.SOURCE_TABLE_NAME;
+
 import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.ConfigRuntimeException;
+import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.env.RuntimeEnv;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.streaming.Seconds;
 import org.apache.spark.streaming.StreamingContext;
@@ -40,6 +47,8 @@ public class SparkEnvironment implements RuntimeEnv {
 
     private boolean enableHive = false;
 
+    private JobMode jobMode;
+
     public SparkEnvironment setEnableHive(boolean enableHive) {
         this.enableHive = enableHive;
         return this;
@@ -52,6 +61,12 @@ public class SparkEnvironment implements RuntimeEnv {
     }
 
     @Override
+    public RuntimeEnv setJobMode(JobMode mode) {
+        this.jobMode = mode;
+        return this;
+    }
+
+    @Override
     public Config getConfig() {
         return this.config;
     }
@@ -94,4 +109,56 @@ public class SparkEnvironment implements RuntimeEnv {
             this.streamingContext = new StreamingContext(sparkSession.sparkContext(), Seconds.apply(duration));
         }
     }
+
+    public static void registerTempView(String tableName, Dataset<Row> ds) {
+        ds.createOrReplaceTempView(tableName);
+    }
+
+    public static Dataset<Row> registerInputTempView(BaseSparkSource<Dataset<Row>> source, SparkEnvironment environment) {
+        Config config = source.getConfig();
+        if (config.hasPath(RESULT_TABLE_NAME)) {
+            String tableName = config.getString(RESULT_TABLE_NAME);
+            Dataset<Row> data = source.getData(environment);
+            registerTempView(tableName, data);
+            return data;
+        } else {
+            throw new ConfigRuntimeException("Plugin[" + source.getClass().getName() + "] " +
+                    "must be registered as dataset/table, please set \"" + RESULT_TABLE_NAME + "\" config");
+        }
+    }
+
+    public static Dataset<Row> transformProcess(SparkEnvironment environment, BaseSparkTransform transform, Dataset<Row> ds) {
+        Dataset<Row> fromDs;
+        Config config = transform.getConfig();
+        if (config.hasPath(SOURCE_TABLE_NAME)) {
+            String sourceTableName = config.getString(SOURCE_TABLE_NAME);
+            fromDs = environment.getSparkSession().read().table(sourceTableName);
+        } else {
+            fromDs = ds;
+        }
+        return transform.process(fromDs, environment);
+    }
+
+    public static void registerTransformTempView(BaseSparkTransform transform, Dataset<Row> ds) {
+        Config config = transform.getConfig();
+        if (config.hasPath(RESULT_TABLE_NAME)) {
+            String resultTableName = config.getString(RESULT_TABLE_NAME);
+            registerTempView(resultTableName, ds);
+        }
+    }
+
+    public  static <T extends Object> T sinkProcess(SparkEnvironment environment, BaseSparkSink<T> sink, Dataset<Row> ds) {
+        Dataset<Row> fromDs;
+        Config config = sink.getConfig();
+        if (config.hasPath(SOURCE_TABLE_NAME)) {
+            String sourceTableName = config.getString(SOURCE_TABLE_NAME);
+            fromDs = environment.getSparkSession().read().table(sourceTableName);
+        } else {
+            fromDs = ds;
+        }
+        return sink.output(fromDs, environment);
+    }
 }
+
+
+
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
index b884293..b94e80a 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
@@ -17,10 +17,7 @@
 
 package org.apache.seatunnel.spark.batch;
 
-import org.apache.seatunnel.common.config.ConfigRuntimeException;
 import org.apache.seatunnel.env.Execution;
-import org.apache.seatunnel.spark.BaseSparkSink;
-import org.apache.seatunnel.spark.BaseSparkSource;
 import org.apache.seatunnel.spark.BaseSparkTransform;
 import org.apache.seatunnel.spark.SparkEnvironment;
 
@@ -42,64 +39,19 @@ public class SparkBatchExecution implements Execution<SparkBatchSource, BaseSpar
         this.environment = environment;
     }
 
-    public static void registerTempView(String tableName, Dataset<Row> ds) {
-        ds.createOrReplaceTempView(tableName);
-    }
-
-    public static void registerInputTempView(BaseSparkSource<Dataset<Row>> source, SparkEnvironment environment) {
-        Config config = source.getConfig();
-        if (config.hasPath(RESULT_TABLE_NAME)) {
-            String tableName = config.getString(RESULT_TABLE_NAME);
-            registerTempView(tableName, source.getData(environment));
-        } else {
-            throw new ConfigRuntimeException("Plugin[" + source.getClass().getName() + "] " +
-                "must be registered as dataset/table, please set \"" + RESULT_TABLE_NAME + "\" config");
-        }
-    }
-
-    public static Dataset<Row> transformProcess(SparkEnvironment environment, BaseSparkTransform transform, Dataset<Row> ds) {
-        Dataset<Row> fromDs;
-        Config config = transform.getConfig();
-        if (config.hasPath(SOURCE_TABLE_NAME)) {
-            String sourceTableName = config.getString(SOURCE_TABLE_NAME);
-            fromDs = environment.getSparkSession().read().table(sourceTableName);
-        } else {
-            fromDs = ds;
-        }
-        return transform.process(fromDs, environment);
-    }
-
-    public static void registerTransformTempView(BaseSparkTransform transform, Dataset<Row> ds) {
-        Config config = transform.getConfig();
-        if (config.hasPath(RESULT_TABLE_NAME)) {
-            String resultTableName = config.getString(RESULT_TABLE_NAME);
-            registerTempView(resultTableName, ds);
-        }
-    }
-
-    public static void sinkProcess(SparkEnvironment environment, BaseSparkSink<?> sink, Dataset<Row> ds) {
-        Dataset<Row> fromDs;
-        Config config = sink.getConfig();
-        if (config.hasPath(SOURCE_TABLE_NAME)) {
-            String sourceTableName = config.getString(SOURCE_TABLE_NAME);
-            fromDs = environment.getSparkSession().read().table(sourceTableName);
-        } else {
-            fromDs = ds;
-        }
-        sink.output(fromDs, environment);
-    }
-
     @Override
     public void start(List<SparkBatchSource> sources, List<BaseSparkTransform> transforms, List<SparkBatchSink> sinks) {
-        sources.forEach(source -> registerInputTempView(source, environment));
+
+        sources.forEach(source -> SparkEnvironment.registerInputTempView(source, environment));
+
         if (!sources.isEmpty()) {
             Dataset<Row> ds = sources.get(0).getData(environment);
             for (BaseSparkTransform transform : transforms) {
-                ds = transformProcess(environment, transform, ds);
-                registerTransformTempView(transform, ds);
+                ds = SparkEnvironment.transformProcess(environment, transform, ds);
+                SparkEnvironment.registerTransformTempView(transform, ds);
             }
             for (SparkBatchSink sink : sinks) {
-                sinkProcess(environment, sink, ds);
+                SparkEnvironment.sinkProcess(environment, sink, ds);
             }
         }
     }
@@ -115,3 +67,4 @@ public class SparkBatchExecution implements Execution<SparkBatchSource, BaseSpar
     }
 
 }
+
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
index d50f9cc..ced6903 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
@@ -24,7 +24,11 @@ import org.apache.seatunnel.spark.SparkEnvironment;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
 import java.util.List;
+import java.util.stream.Collectors;
 
 public class StructuredStreamingExecution implements Execution<StructuredStreamingSource, BaseSparkTransform, StructuredStreamingSink, SparkEnvironment> {
 
@@ -37,8 +41,24 @@ public class StructuredStreamingExecution implements Execution<StructuredStreami
     }
 
     @Override
-    public void start(List<StructuredStreamingSource> sources, List<BaseSparkTransform> transforms, List<StructuredStreamingSink> sinks) {
+    public void start(List<StructuredStreamingSource> sources, List<BaseSparkTransform> transforms,
+        List<StructuredStreamingSink> sinks) throws Exception {
+
+        List<Dataset<Row>> datasetList = sources.stream().map(s ->
+                SparkEnvironment.registerInputTempView(s, sparkEnvironment)
+        ).collect(Collectors.toList());
+        if (datasetList.size() > 0) {
+            Dataset<Row> ds = datasetList.get(0);
+            for (BaseSparkTransform tf : transforms) {
+                ds = SparkEnvironment.transformProcess(sparkEnvironment, tf, ds);
+                SparkEnvironment.registerTransformTempView(tf, ds);
+            }
 
+            for (StructuredStreamingSink sink : sinks) {
+                SparkEnvironment.sinkProcess(sparkEnvironment, sink, ds).start();
+            }
+            sparkEnvironment.getSparkSession().streams().awaitAnyTermination();
+        }
     }
 
     @Override
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
index 6687c51..69e5b2c 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
@@ -19,7 +19,6 @@ package org.apache.seatunnel.spark.stream
 import org.apache.seatunnel.env.Execution
 import org.apache.seatunnel.plugin.Plugin
 import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
-import org.apache.seatunnel.spark.batch.SparkBatchExecution
 import org.apache.seatunnel.spark.{BaseSparkSink, BaseSparkSource, BaseSparkTransform, SparkEnvironment}
 import org.apache.spark.sql.{Dataset, Row}
 
@@ -35,7 +34,7 @@ class SparkStreamingExecution(sparkEnvironment: SparkEnvironment)
     val source = sources.get(0).asInstanceOf[SparkStreamingSource[_]]
 
     sources.subList(1, sources.size()).foreach(s => {
-      SparkBatchExecution.registerInputTempView(
+      SparkEnvironment.registerInputTempView(
         s.asInstanceOf[BaseSparkSource[Dataset[Row]]],
         sparkEnvironment)
     })
@@ -44,21 +43,21 @@ class SparkStreamingExecution(sparkEnvironment: SparkEnvironment)
       dataset => {
         val conf = source.getConfig
         if (conf.hasPath(Plugin.RESULT_TABLE_NAME)) {
-          SparkBatchExecution.registerTempView(
+          SparkEnvironment.registerTempView(
             conf.getString(Plugin.RESULT_TABLE_NAME),
             dataset)
         }
         var ds = dataset
         for (tf <- transforms) {
-          ds = SparkBatchExecution.transformProcess(sparkEnvironment, tf, ds)
-          SparkBatchExecution.registerTransformTempView(tf, ds)
+          ds = SparkEnvironment.transformProcess(sparkEnvironment, tf, ds)
+          SparkEnvironment.registerTransformTempView(tf, ds)
         }
 
         source.beforeOutput()
 
         if (ds.take(1).length > 0) {
           sinks.foreach(sink => {
-            SparkBatchExecution.sinkProcess(sparkEnvironment, sink, ds)
+            SparkEnvironment.sinkProcess(sparkEnvironment, sink, ds)
           })
         }
 
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/JobMode.java
similarity index 69%
copy from seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
copy to seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/JobMode.java
index 5c8ea3c..a7aa326 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/JobMode.java
@@ -15,23 +15,8 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.env;
-
-import org.apache.seatunnel.common.config.CheckResult;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-/**
- * engine related runtime environment
- */
-public interface RuntimeEnv {
-
-    RuntimeEnv setConfig(Config config);
-
-    Config getConfig();
-
-    CheckResult checkConfig();
-
-    RuntimeEnv prepare();
+package org.apache.seatunnel.common.constants;
 
+public enum JobMode {
+    BATCH, STREAMING, STRUCTURED_STREAMING
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/flink/source/FileSourceTest.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/flink/source/FileSourceTest.java
index b7913d6..0431af9 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/flink/source/FileSourceTest.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/flink/source/FileSourceTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.flink.source;
 
+import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -62,7 +63,7 @@ public class FileSourceTest {
         Config rootConfig = getRootConfig(configFile);
 
         return new FlinkEnvironment()
-            .setStreaming(false)
+            .setJobMode(JobMode.BATCH)
             .setConfig(rootConfig)
             .prepare();
     }
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
index cad5440..d98f8bf 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.apis.BaseSink;
 import org.apache.seatunnel.apis.BaseSource;
 import org.apache.seatunnel.apis.BaseTransform;
 import org.apache.seatunnel.common.config.ConfigRuntimeException;
+import org.apache.seatunnel.common.constants.JobMode;
 import org.apache.seatunnel.env.Execution;
 import org.apache.seatunnel.env.RuntimeEnv;
 import org.apache.seatunnel.flink.FlinkEnvironment;
@@ -30,6 +31,7 @@ import org.apache.seatunnel.plugin.Plugin;
 import org.apache.seatunnel.spark.SparkEnvironment;
 import org.apache.seatunnel.spark.batch.SparkBatchExecution;
 import org.apache.seatunnel.spark.stream.SparkStreamingExecution;
+import org.apache.seatunnel.spark.structuredstream.StructuredStreamingExecution;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -56,7 +58,7 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
     private final EngineType engine;
     private final ConfigPackage configPackage;
     private final Config config;
-    private boolean streaming;
+    private JobMode jobMode;
     private Config envConfig;
     private boolean enableHive;
     private final ENVIRONMENT env;
@@ -98,10 +100,15 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
         return env;
     }
 
-    private boolean checkIsStreaming() {
-        List<? extends Config> sourceConfigList = config.getConfigList(PluginType.SOURCE.getType());
+    private void setJobMode(Config envConfig) {
+        if (envConfig.hasPath("job.mode")) {
+            jobMode = envConfig.getEnum(JobMode.class, "job.mode");
+        } else {
+            //Compatible with previous logic
+            List<? extends Config> sourceConfigList = config.getConfigList(PluginType.SOURCE.getType());
+            jobMode = sourceConfigList.get(0).getString(PLUGIN_NAME_KEY).toLowerCase().endsWith("stream") ? JobMode.STREAMING : JobMode.BATCH;
+        }
 
-        return sourceConfigList.get(0).getString(PLUGIN_NAME_KEY).toLowerCase().endsWith("stream");
     }
 
     private boolean checkIsContainHive() {
@@ -197,7 +204,6 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
 
     private ENVIRONMENT createEnv() {
         envConfig = config.getConfig("env");
-        streaming = checkIsStreaming();
         enableHive = checkIsContainHive();
         ENVIRONMENT env;
         switch (engine) {
@@ -205,12 +211,13 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
                 env = (ENVIRONMENT) new SparkEnvironment().setEnableHive(enableHive);
                 break;
             case FLINK:
-                env = (ENVIRONMENT) new FlinkEnvironment().setStreaming(streaming);
+                env = (ENVIRONMENT) new FlinkEnvironment();
                 break;
             default:
                 throw new IllegalArgumentException("Engine: " + engine + " is not supported");
         }
-        env.setConfig(envConfig).prepare();
+        setJobMode(envConfig);
+        env.setConfig(envConfig).setJobMode(jobMode).prepare();
         return env;
     }
 
@@ -219,23 +226,26 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
         switch (engine) {
             case SPARK:
                 SparkEnvironment sparkEnvironment = (SparkEnvironment) env;
-                if (streaming) {
+                if (JobMode.STREAMING.equals(jobMode)) {
                     execution = new SparkStreamingExecution(sparkEnvironment);
+                } else if (JobMode.STRUCTURED_STREAMING.equals(jobMode)) {
+                    execution = new StructuredStreamingExecution(sparkEnvironment);
                 } else {
                     execution = new SparkBatchExecution(sparkEnvironment);
                 }
                 break;
             case FLINK:
                 FlinkEnvironment flinkEnvironment = (FlinkEnvironment) env;
-                if (streaming) {
+                if (JobMode.STREAMING.equals(jobMode)) {
                     execution = new FlinkStreamExecution(flinkEnvironment);
                 } else {
                     execution = new FlinkBatchExecution(flinkEnvironment);
                 }
                 break;
             default:
-                break;
+                throw new IllegalArgumentException("No suitable engine");
         }
+        LOGGER.info("current execution is [{}]", execution.getClass().getName());
         return (Execution<BaseSource<ENVIRONMENT>, BaseTransform<ENVIRONMENT>, BaseSink<ENVIRONMENT>, ENVIRONMENT>) execution;
     }