You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/03/28 02:08:35 UTC
[incubator-seatunnel] branch dev updated: [Feature][Core]implament StructuredStreamingExecution (#1400)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5e7fc37 [Feature][Core]implament StructuredStreamingExecution (#1400)
5e7fc37 is described below
commit 5e7fc371d90f50e882d605e56d648872ce68ff62
Author: kid-xiong <41...@users.noreply.github.com>
AuthorDate: Mon Mar 28 10:08:29 2022 +0800
[Feature][Core]implament StructuredStreamingExecution (#1400)
* implament StructuredStreamingExecution
* fix bug:The executionenvironment should be created before the batchtableenvironment
Co-authored-by: kid-xiong <ri...@apache.com>
---
.../java/org/apache/seatunnel/env/Execution.java | 1 +
.../java/org/apache/seatunnel/env/RuntimeEnv.java | 3 +
.../java/org/apache/seatunnel/plugin/Plugin.java | 1 +
.../apache/seatunnel/flink/FlinkEnvironment.java | 32 +++++++----
.../seatunnel/flink/batch/FlinkBatchExecution.java | 1 +
.../apache/seatunnel/spark/SparkEnvironment.java | 67 ++++++++++++++++++++++
.../seatunnel/spark/batch/SparkBatchExecution.java | 61 +++-----------------
.../StructuredStreamingExecution.java | 22 ++++++-
.../spark/stream/SparkStreamingExecution.scala | 11 ++--
.../apache/seatunnel/common/constants/JobMode.java | 21 +------
.../seatunnel/flink/source/FileSourceTest.java | 3 +-
.../org/apache/seatunnel/config/ConfigBuilder.java | 30 ++++++----
12 files changed, 152 insertions(+), 101 deletions(-)
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
index 192903c..560f7dd 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/Execution.java
@@ -41,4 +41,5 @@ public interface Execution<
* @param sinks sink plugin list
*/
void start(List<SR> sources, List<TF> transforms, List<SK> sinks) throws Exception;
+
}
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
index 5c8ea3c..b1fc83d 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.env;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -34,4 +35,6 @@ public interface RuntimeEnv {
RuntimeEnv prepare();
+ RuntimeEnv setJobMode(JobMode mode);
+
}
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
index 488c4d4..35237b6 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
+++ b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
@@ -39,6 +39,7 @@ import java.io.Serializable;
*
* }</pre>
*/
+
public interface Plugin<T extends RuntimeEnv> extends Serializable, AutoCloseable {
String RESULT_TABLE_NAME = "result_table_name";
String SOURCE_TABLE_NAME = "source_table_name";
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
index 5875436..647240a 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/FlinkEnvironment.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.flink;
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.env.RuntimeEnv;
import org.apache.seatunnel.flink.util.ConfigKeyName;
import org.apache.seatunnel.flink.util.EnvironmentUtil;
@@ -55,7 +56,7 @@ public class FlinkEnvironment implements RuntimeEnv {
private BatchTableEnvironment batchTableEnvironment;
- private boolean isStreaming;
+ private JobMode jobMode;
private String jobName = "seatunnel";
@@ -76,8 +77,9 @@ public class FlinkEnvironment implements RuntimeEnv {
}
@Override
+
public FlinkEnvironment prepare() {
- if (isStreaming) {
+ if (isStreaming()) {
createStreamEnvironment();
createStreamTableEnvironment();
} else {
@@ -95,11 +97,11 @@ public class FlinkEnvironment implements RuntimeEnv {
}
public boolean isStreaming() {
- return isStreaming;
+ return JobMode.STREAMING.equals(jobMode);
}
- public FlinkEnvironment setStreaming(boolean isStreaming) {
- this.isStreaming = isStreaming;
+ public FlinkEnvironment setJobMode(JobMode jobMode) {
+ this.jobMode = jobMode;
return this;
}
@@ -115,7 +117,8 @@ public class FlinkEnvironment implements RuntimeEnv {
// use blink and streammode
EnvironmentSettings.Builder envBuilder = EnvironmentSettings.newInstance()
.inStreamingMode();
- if (this.config.hasPath(ConfigKeyName.PLANNER) && "blink".equals(this.config.getString(ConfigKeyName.PLANNER))) {
+ if (this.config.hasPath(ConfigKeyName.PLANNER) && "blink"
+ .equals(this.config.getString(ConfigKeyName.PLANNER))) {
envBuilder.useBlinkPlanner();
} else {
envBuilder.useOldPlanner();
@@ -124,7 +127,8 @@ public class FlinkEnvironment implements RuntimeEnv {
tableEnvironment = StreamTableEnvironment.create(getStreamExecutionEnvironment(), environmentSettings);
TableConfig config = tableEnvironment.getConfig();
- if (this.config.hasPath(ConfigKeyName.MAX_STATE_RETENTION_TIME) && this.config.hasPath(ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
+ if (this.config.hasPath(ConfigKeyName.MAX_STATE_RETENTION_TIME) && this.config
+ .hasPath(ConfigKeyName.MIN_STATE_RETENTION_TIME)) {
long max = this.config.getLong(ConfigKeyName.MAX_STATE_RETENTION_TIME);
long min = this.config.getLong(ConfigKeyName.MIN_STATE_RETENTION_TIME);
config.setIdleStateRetentionTime(Time.seconds(min), Time.seconds(max));
@@ -190,7 +194,9 @@ public class FlinkEnvironment implements RuntimeEnv {
environment.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
break;
default:
- LOGGER.warn("set time-characteristic failed, unknown time-characteristic [{}],only support event-time,ingestion-time,processing-time", timeType);
+ LOGGER.warn(
+ "set time-characteristic failed, unknown time-characteristic [{}],only support event-time,ingestion-time,processing-time",
+ timeType);
break;
}
}
@@ -212,7 +218,9 @@ public class FlinkEnvironment implements RuntimeEnv {
checkpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
break;
default:
- LOGGER.warn("set checkpoint.mode failed, unknown checkpoint.mode [{}],only support exactly-once,at-least-once", mode);
+ LOGGER.warn(
+ "set checkpoint.mode failed, unknown checkpoint.mode [{}],only support exactly-once,at-least-once",
+ mode);
break;
}
}
@@ -244,9 +252,11 @@ public class FlinkEnvironment implements RuntimeEnv {
if (config.hasPath(ConfigKeyName.CHECKPOINT_CLEANUP_MODE)) {
boolean cleanup = config.getBoolean(ConfigKeyName.CHECKPOINT_CLEANUP_MODE);
if (cleanup) {
- checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
+ checkpointConfig.enableExternalizedCheckpoints(
+ CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
} else {
- checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+ checkpointConfig.enableExternalizedCheckpoints(
+ CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
}
}
diff --git a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
index fac7c7c..3da634f 100644
--- a/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
+++ b/seatunnel-apis/seatunnel-api-flink/src/main/java/org/apache/seatunnel/flink/batch/FlinkBatchExecution.java
@@ -48,6 +48,7 @@ public class FlinkBatchExecution implements Execution<FlinkBatchSource, FlinkBat
}
@Override
+
public void start(List<FlinkBatchSource> sources, List<FlinkBatchTransform> transforms, List<FlinkBatchSink> sinks) throws Exception {
List<DataSet<Row>> data = new ArrayList<>();
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
index f0b638b..083891d 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/SparkEnvironment.java
@@ -17,13 +17,20 @@
package org.apache.seatunnel.spark;
+import static org.apache.seatunnel.plugin.Plugin.RESULT_TABLE_NAME;
+import static org.apache.seatunnel.plugin.Plugin.SOURCE_TABLE_NAME;
+
import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.config.ConfigRuntimeException;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.env.RuntimeEnv;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.spark.SparkConf;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
@@ -40,6 +47,8 @@ public class SparkEnvironment implements RuntimeEnv {
private boolean enableHive = false;
+ private JobMode jobMode;
+
public SparkEnvironment setEnableHive(boolean enableHive) {
this.enableHive = enableHive;
return this;
@@ -52,6 +61,12 @@ public class SparkEnvironment implements RuntimeEnv {
}
@Override
+ public RuntimeEnv setJobMode(JobMode mode) {
+ this.jobMode = mode;
+ return this;
+ }
+
+ @Override
public Config getConfig() {
return this.config;
}
@@ -94,4 +109,56 @@ public class SparkEnvironment implements RuntimeEnv {
this.streamingContext = new StreamingContext(sparkSession.sparkContext(), Seconds.apply(duration));
}
}
+
+ public static void registerTempView(String tableName, Dataset<Row> ds) {
+ ds.createOrReplaceTempView(tableName);
+ }
+
+ public static Dataset<Row> registerInputTempView(BaseSparkSource<Dataset<Row>> source, SparkEnvironment environment) {
+ Config config = source.getConfig();
+ if (config.hasPath(RESULT_TABLE_NAME)) {
+ String tableName = config.getString(RESULT_TABLE_NAME);
+ Dataset<Row> data = source.getData(environment);
+ registerTempView(tableName, data);
+ return data;
+ } else {
+ throw new ConfigRuntimeException("Plugin[" + source.getClass().getName() + "] " +
+ "must be registered as dataset/table, please set \"" + RESULT_TABLE_NAME + "\" config");
+ }
+ }
+
+ public static Dataset<Row> transformProcess(SparkEnvironment environment, BaseSparkTransform transform, Dataset<Row> ds) {
+ Dataset<Row> fromDs;
+ Config config = transform.getConfig();
+ if (config.hasPath(SOURCE_TABLE_NAME)) {
+ String sourceTableName = config.getString(SOURCE_TABLE_NAME);
+ fromDs = environment.getSparkSession().read().table(sourceTableName);
+ } else {
+ fromDs = ds;
+ }
+ return transform.process(fromDs, environment);
+ }
+
+ public static void registerTransformTempView(BaseSparkTransform transform, Dataset<Row> ds) {
+ Config config = transform.getConfig();
+ if (config.hasPath(RESULT_TABLE_NAME)) {
+ String resultTableName = config.getString(RESULT_TABLE_NAME);
+ registerTempView(resultTableName, ds);
+ }
+ }
+
+ public static <T extends Object> T sinkProcess(SparkEnvironment environment, BaseSparkSink<T> sink, Dataset<Row> ds) {
+ Dataset<Row> fromDs;
+ Config config = sink.getConfig();
+ if (config.hasPath(SOURCE_TABLE_NAME)) {
+ String sourceTableName = config.getString(SOURCE_TABLE_NAME);
+ fromDs = environment.getSparkSession().read().table(sourceTableName);
+ } else {
+ fromDs = ds;
+ }
+ return sink.output(fromDs, environment);
+ }
}
+
+
+
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
index b884293..b94e80a 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/batch/SparkBatchExecution.java
@@ -17,10 +17,7 @@
package org.apache.seatunnel.spark.batch;
-import org.apache.seatunnel.common.config.ConfigRuntimeException;
import org.apache.seatunnel.env.Execution;
-import org.apache.seatunnel.spark.BaseSparkSink;
-import org.apache.seatunnel.spark.BaseSparkSource;
import org.apache.seatunnel.spark.BaseSparkTransform;
import org.apache.seatunnel.spark.SparkEnvironment;
@@ -42,64 +39,19 @@ public class SparkBatchExecution implements Execution<SparkBatchSource, BaseSpar
this.environment = environment;
}
- public static void registerTempView(String tableName, Dataset<Row> ds) {
- ds.createOrReplaceTempView(tableName);
- }
-
- public static void registerInputTempView(BaseSparkSource<Dataset<Row>> source, SparkEnvironment environment) {
- Config config = source.getConfig();
- if (config.hasPath(RESULT_TABLE_NAME)) {
- String tableName = config.getString(RESULT_TABLE_NAME);
- registerTempView(tableName, source.getData(environment));
- } else {
- throw new ConfigRuntimeException("Plugin[" + source.getClass().getName() + "] " +
- "must be registered as dataset/table, please set \"" + RESULT_TABLE_NAME + "\" config");
- }
- }
-
- public static Dataset<Row> transformProcess(SparkEnvironment environment, BaseSparkTransform transform, Dataset<Row> ds) {
- Dataset<Row> fromDs;
- Config config = transform.getConfig();
- if (config.hasPath(SOURCE_TABLE_NAME)) {
- String sourceTableName = config.getString(SOURCE_TABLE_NAME);
- fromDs = environment.getSparkSession().read().table(sourceTableName);
- } else {
- fromDs = ds;
- }
- return transform.process(fromDs, environment);
- }
-
- public static void registerTransformTempView(BaseSparkTransform transform, Dataset<Row> ds) {
- Config config = transform.getConfig();
- if (config.hasPath(RESULT_TABLE_NAME)) {
- String resultTableName = config.getString(RESULT_TABLE_NAME);
- registerTempView(resultTableName, ds);
- }
- }
-
- public static void sinkProcess(SparkEnvironment environment, BaseSparkSink<?> sink, Dataset<Row> ds) {
- Dataset<Row> fromDs;
- Config config = sink.getConfig();
- if (config.hasPath(SOURCE_TABLE_NAME)) {
- String sourceTableName = config.getString(SOURCE_TABLE_NAME);
- fromDs = environment.getSparkSession().read().table(sourceTableName);
- } else {
- fromDs = ds;
- }
- sink.output(fromDs, environment);
- }
-
@Override
public void start(List<SparkBatchSource> sources, List<BaseSparkTransform> transforms, List<SparkBatchSink> sinks) {
- sources.forEach(source -> registerInputTempView(source, environment));
+
+ sources.forEach(source -> SparkEnvironment.registerInputTempView(source, environment));
+
if (!sources.isEmpty()) {
Dataset<Row> ds = sources.get(0).getData(environment);
for (BaseSparkTransform transform : transforms) {
- ds = transformProcess(environment, transform, ds);
- registerTransformTempView(transform, ds);
+ ds = SparkEnvironment.transformProcess(environment, transform, ds);
+ SparkEnvironment.registerTransformTempView(transform, ds);
}
for (SparkBatchSink sink : sinks) {
- sinkProcess(environment, sink, ds);
+ SparkEnvironment.sinkProcess(environment, sink, ds);
}
}
}
@@ -115,3 +67,4 @@ public class SparkBatchExecution implements Execution<SparkBatchSource, BaseSpar
}
}
+
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
index d50f9cc..ced6903 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/java/org/apache/seatunnel/spark/structuredstream/StructuredStreamingExecution.java
@@ -24,7 +24,11 @@ import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+
import java.util.List;
+import java.util.stream.Collectors;
public class StructuredStreamingExecution implements Execution<StructuredStreamingSource, BaseSparkTransform, StructuredStreamingSink, SparkEnvironment> {
@@ -37,8 +41,24 @@ public class StructuredStreamingExecution implements Execution<StructuredStreami
}
@Override
- public void start(List<StructuredStreamingSource> sources, List<BaseSparkTransform> transforms, List<StructuredStreamingSink> sinks) {
+ public void start(List<StructuredStreamingSource> sources, List<BaseSparkTransform> transforms,
+ List<StructuredStreamingSink> sinks) throws Exception {
+
+ List<Dataset<Row>> datasetList = sources.stream().map(s ->
+ SparkEnvironment.registerInputTempView(s, sparkEnvironment)
+ ).collect(Collectors.toList());
+ if (datasetList.size() > 0) {
+ Dataset<Row> ds = datasetList.get(0);
+ for (BaseSparkTransform tf : transforms) {
+ ds = SparkEnvironment.transformProcess(sparkEnvironment, tf, ds);
+ SparkEnvironment.registerTransformTempView(tf, ds);
+ }
+ for (StructuredStreamingSink sink : sinks) {
+ SparkEnvironment.sinkProcess(sparkEnvironment, sink, ds).start();
+ }
+ sparkEnvironment.getSparkSession().streams().awaitAnyTermination();
+ }
}
@Override
diff --git a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
index 6687c51..69e5b2c 100644
--- a/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
+++ b/seatunnel-apis/seatunnel-api-spark/src/main/scala/org/apache/seatunnel/spark/stream/SparkStreamingExecution.scala
@@ -19,7 +19,6 @@ package org.apache.seatunnel.spark.stream
import org.apache.seatunnel.env.Execution
import org.apache.seatunnel.plugin.Plugin
import org.apache.seatunnel.shade.com.typesafe.config.{Config, ConfigFactory}
-import org.apache.seatunnel.spark.batch.SparkBatchExecution
import org.apache.seatunnel.spark.{BaseSparkSink, BaseSparkSource, BaseSparkTransform, SparkEnvironment}
import org.apache.spark.sql.{Dataset, Row}
@@ -35,7 +34,7 @@ class SparkStreamingExecution(sparkEnvironment: SparkEnvironment)
val source = sources.get(0).asInstanceOf[SparkStreamingSource[_]]
sources.subList(1, sources.size()).foreach(s => {
- SparkBatchExecution.registerInputTempView(
+ SparkEnvironment.registerInputTempView(
s.asInstanceOf[BaseSparkSource[Dataset[Row]]],
sparkEnvironment)
})
@@ -44,21 +43,21 @@ class SparkStreamingExecution(sparkEnvironment: SparkEnvironment)
dataset => {
val conf = source.getConfig
if (conf.hasPath(Plugin.RESULT_TABLE_NAME)) {
- SparkBatchExecution.registerTempView(
+ SparkEnvironment.registerTempView(
conf.getString(Plugin.RESULT_TABLE_NAME),
dataset)
}
var ds = dataset
for (tf <- transforms) {
- ds = SparkBatchExecution.transformProcess(sparkEnvironment, tf, ds)
- SparkBatchExecution.registerTransformTempView(tf, ds)
+ ds = SparkEnvironment.transformProcess(sparkEnvironment, tf, ds)
+ SparkEnvironment.registerTransformTempView(tf, ds)
}
source.beforeOutput()
if (ds.take(1).length > 0) {
sinks.foreach(sink => {
- SparkBatchExecution.sinkProcess(sparkEnvironment, sink, ds)
+ SparkEnvironment.sinkProcess(sparkEnvironment, sink, ds)
})
}
diff --git a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/JobMode.java
similarity index 69%
copy from seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
copy to seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/JobMode.java
index 5c8ea3c..a7aa326 100644
--- a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/env/RuntimeEnv.java
+++ b/seatunnel-common/src/main/java/org/apache/seatunnel/common/constants/JobMode.java
@@ -15,23 +15,8 @@
* limitations under the License.
*/
-package org.apache.seatunnel.env;
-
-import org.apache.seatunnel.common.config.CheckResult;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-/**
- * engine related runtime environment
- */
-public interface RuntimeEnv {
-
- RuntimeEnv setConfig(Config config);
-
- Config getConfig();
-
- CheckResult checkConfig();
-
- RuntimeEnv prepare();
+package org.apache.seatunnel.common.constants;
+public enum JobMode {
+ BATCH, STREAMING, STRUCTURED_STREAMING
}
diff --git a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/flink/source/FileSourceTest.java b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/flink/source/FileSourceTest.java
index b7913d6..0431af9 100644
--- a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/flink/source/FileSourceTest.java
+++ b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/test/java/org/apache/seatunnel/flink/source/FileSourceTest.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.flink.source;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -62,7 +63,7 @@ public class FileSourceTest {
Config rootConfig = getRootConfig(configFile);
return new FlinkEnvironment()
- .setStreaming(false)
+ .setJobMode(JobMode.BATCH)
.setConfig(rootConfig)
.prepare();
}
diff --git a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
index cad5440..d98f8bf 100644
--- a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
+++ b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/config/ConfigBuilder.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.apis.BaseSink;
import org.apache.seatunnel.apis.BaseSource;
import org.apache.seatunnel.apis.BaseTransform;
import org.apache.seatunnel.common.config.ConfigRuntimeException;
+import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.env.Execution;
import org.apache.seatunnel.env.RuntimeEnv;
import org.apache.seatunnel.flink.FlinkEnvironment;
@@ -30,6 +31,7 @@ import org.apache.seatunnel.plugin.Plugin;
import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.spark.batch.SparkBatchExecution;
import org.apache.seatunnel.spark.stream.SparkStreamingExecution;
+import org.apache.seatunnel.spark.structuredstream.StructuredStreamingExecution;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
@@ -56,7 +58,7 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
private final EngineType engine;
private final ConfigPackage configPackage;
private final Config config;
- private boolean streaming;
+ private JobMode jobMode;
private Config envConfig;
private boolean enableHive;
private final ENVIRONMENT env;
@@ -98,10 +100,15 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
return env;
}
- private boolean checkIsStreaming() {
- List<? extends Config> sourceConfigList = config.getConfigList(PluginType.SOURCE.getType());
+ private void setJobMode(Config envConfig) {
+ if (envConfig.hasPath("job.mode")) {
+ jobMode = envConfig.getEnum(JobMode.class, "job.mode");
+ } else {
+ //Compatible with previous logic
+ List<? extends Config> sourceConfigList = config.getConfigList(PluginType.SOURCE.getType());
+ jobMode = sourceConfigList.get(0).getString(PLUGIN_NAME_KEY).toLowerCase().endsWith("stream") ? JobMode.STREAMING : JobMode.BATCH;
+ }
- return sourceConfigList.get(0).getString(PLUGIN_NAME_KEY).toLowerCase().endsWith("stream");
}
private boolean checkIsContainHive() {
@@ -197,7 +204,6 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
private ENVIRONMENT createEnv() {
envConfig = config.getConfig("env");
- streaming = checkIsStreaming();
enableHive = checkIsContainHive();
ENVIRONMENT env;
switch (engine) {
@@ -205,12 +211,13 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
env = (ENVIRONMENT) new SparkEnvironment().setEnableHive(enableHive);
break;
case FLINK:
- env = (ENVIRONMENT) new FlinkEnvironment().setStreaming(streaming);
+ env = (ENVIRONMENT) new FlinkEnvironment();
break;
default:
throw new IllegalArgumentException("Engine: " + engine + " is not supported");
}
- env.setConfig(envConfig).prepare();
+ setJobMode(envConfig);
+ env.setConfig(envConfig).setJobMode(jobMode).prepare();
return env;
}
@@ -219,23 +226,26 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
switch (engine) {
case SPARK:
SparkEnvironment sparkEnvironment = (SparkEnvironment) env;
- if (streaming) {
+ if (JobMode.STREAMING.equals(jobMode)) {
execution = new SparkStreamingExecution(sparkEnvironment);
+ } else if (JobMode.STRUCTURED_STREAMING.equals(jobMode)) {
+ execution = new StructuredStreamingExecution(sparkEnvironment);
} else {
execution = new SparkBatchExecution(sparkEnvironment);
}
break;
case FLINK:
FlinkEnvironment flinkEnvironment = (FlinkEnvironment) env;
- if (streaming) {
+ if (JobMode.STREAMING.equals(jobMode)) {
execution = new FlinkStreamExecution(flinkEnvironment);
} else {
execution = new FlinkBatchExecution(flinkEnvironment);
}
break;
default:
- break;
+ throw new IllegalArgumentException("No suitable engine");
}
+ LOGGER.info("current execution is [{}]", execution.getClass().getName());
return (Execution<BaseSource<ENVIRONMENT>, BaseTransform<ENVIRONMENT>, BaseSink<ENVIRONMENT>, ENVIRONMENT>) execution;
}