You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by ya...@apache.org on 2022/09/23 15:45:00 UTC
[incubator-streampark] branch dev updated: [feature] execution run-mode support streaming and batch (#1670)
This is an automated email from the ASF dual-hosted git repository.
yangzhou pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new f91bff660 [feature] execution run-mode support streaming and batch (#1670)
f91bff660 is described below
commit f91bff660a7e23396a3bee78494224d4b09f27db
Author: benjobs <be...@apache.org>
AuthorDate: Fri Sep 23 23:44:50 2022 +0800
[feature] execution run-mode support streaming and batch (#1670)
* [feature] execution run-mode support streaming and batch
* [feature] execution run-mode support streaming and batch
* [feature] execution run-mode support streaming and batch
---
.../flink/core/scala/FlinkStreamTable.scala | 2 +-
.../flink/core/scala/FlinkStreaming.scala | 4 +-
.../streampark/flink/core/scala/FlinkTable.scala | 2 +-
.../streampark/flink/core/FlinkSqlExecutor.scala | 3 +-
.../flink/core/FlinkStreamTableTrait.scala | 1 -
.../streampark/flink/core/FlinkTableTrait.scala | 1 -
.../flink/core/test/FlinkSqlExecuteFunSuite.scala | 2 +-
.../flink/core/FlinkStreamingInitializer.scala | 14 ++++---
.../flink/core/FlinkTableInitializer.scala | 31 +++++++++------
.../streampark/flink/core/StreamTableContext.scala | 2 +-
.../streampark/flink/core/TableContext.scala | 2 +-
.../flink/core/FlinkStreamingInitializer.scala | 14 ++++---
.../flink/core/FlinkTableInitializer.scala | 30 ++++++++------
.../streampark/flink/core/StreamTableContext.scala | 2 +-
.../streampark/flink/core/TableContext.scala | 2 +-
.../flink/core/FlinkStreamingInitializer.scala | 16 +++++---
.../flink/core/FlinkTableInitializer.scala | 30 ++++++++------
.../streampark/flink/core/StreamTableContext.scala | 2 +-
.../streampark/flink/core/TableContext.scala | 4 +-
.../flink/core/FlinkStreamingInitializer.scala | 14 ++++---
.../flink/core/FlinkTableInitializer.scala | 30 ++++++++------
.../streampark/flink/core/StreamTableContext.scala | 2 +-
.../streampark/flink/core/TableContext.scala | 2 +-
.../apache/streampark/flink/cli/SqlClient.scala | 46 ++++++++++++++++++----
24 files changed, 162 insertions(+), 96 deletions(-)
diff --git a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
index 226e0d398..d8f4fd4e8 100644
--- a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
+++ b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
@@ -51,7 +51,7 @@ trait FlinkStreamTable extends Logger {
private[this] def init(args: Array[String]): Unit = {
SystemPropertyUtils.setAppHome(KEY_APP_HOME, classOf[FlinkStreamTable])
- context = new StreamTableContext(FlinkTableInitializer.initStreamTable(args, configStream, configTable))
+ context = new StreamTableContext(FlinkTableInitializer.initialize(args, configStream, configTable))
}
def configStream(env: StreamExecutionEnvironment, parameter: ParameterTool): Unit = {}
diff --git a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
index 30501d734..86ea0e40f 100644
--- a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
+++ b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
@@ -38,7 +38,7 @@ class StreamingContext(val parameter: ParameterTool, private val environment: St
/**
* for Java
*/
- def this(args: StreamEnvConfig) = this(FlinkStreamingInitializer.initJavaStream(args))
+ def this(args: StreamEnvConfig) = this(FlinkStreamingInitializer.initialize(args))
/**
* Recommend use this Api to start task
@@ -83,7 +83,7 @@ trait FlinkStreaming extends Serializable with Logger {
private[this] def init(args: Array[String]): Unit = {
SystemPropertyUtils.setAppHome(KEY_APP_HOME, classOf[FlinkStreaming])
- context = new StreamingContext(FlinkStreamingInitializer.initStream(args, config))
+ context = new StreamingContext(FlinkStreamingInitializer.initialize(args, config))
}
def ready(): Unit = {}
diff --git a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
index 63794df87..04a65f5f5 100644
--- a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
+++ b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
@@ -44,7 +44,7 @@ trait FlinkTable extends Logger {
private[this] def init(args: Array[String]): Unit = {
SystemPropertyUtils.setAppHome(KEY_APP_HOME, classOf[FlinkTable])
- context = new TableContext(FlinkTableInitializer.initTable(args, config))
+ context = new TableContext(FlinkTableInitializer.initialize(args, config))
}
def ready(): Unit = {}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index cdfce07a3..d65d5c46b 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -20,7 +20,7 @@ import org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.core.SqlCommand._
import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.{ConfigOption, Configuration}
+import org.apache.flink.configuration.{ConfigOption, Configuration, ExecutionOptions}
import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions, TableConfigOptions}
@@ -59,6 +59,7 @@ object FlinkSqlExecutor extends Logger {
val configOptions = new JavaHashMap[String, ConfigOption[_]]
val configList = List(
//classOf[PythonOptions],
+ classOf[ExecutionOptions],
classOf[ExecutionConfigOptions],
classOf[OptimizerConfigOptions],
classOf[TableConfigOptions]
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
index 04845782c..54d86e129 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
@@ -41,7 +41,6 @@ import org.apache.flink.table.catalog.Catalog
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions._
import org.apache.flink.table.module.Module
-import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.types.AbstractDataType
import org.apache.flink.util.SplittableIterator
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
index 541d9984f..ec2af84f4 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
@@ -24,7 +24,6 @@ import org.apache.flink.table.catalog.Catalog
import org.apache.flink.table.expressions.Expression
import org.apache.flink.table.functions._
import org.apache.flink.table.module.Module
-import org.apache.flink.table.sources.TableSource
import org.apache.flink.table.types.AbstractDataType
import java.lang
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
index f021ee753..6ebe3a238 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
@@ -28,7 +28,7 @@ class FlinkSqlExecuteFunSuite extends AnyFunSuite {
def execute(sql: String)(implicit func: String => Unit): Unit = {
val args = ArrayBuffer(KEY_FLINK_SQL("--"), DeflaterUtils.zipString(sql.stripMargin))
- val context = new StreamTableContext(FlinkTableInitializer.initStreamTable(args.toArray, null, null))
+ val context = new StreamTableContext(FlinkTableInitializer.initialize(args.toArray, null, null))
FlinkSqlExecutor.executeSql(KEY_FLINK_SQL(), context.parameter, context)
}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index b4575b1ce..c20279128 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -45,26 +45,28 @@ private[flink] object FlinkStreamingInitializer {
private[this] var flinkInitializer: FlinkStreamingInitializer = _
- def initStream(args: Array[String], config: (StreamExecutionEnvironment, ParameterTool) => Unit = null): (ParameterTool, StreamExecutionEnvironment) = {
+ def initialize(args: Array[String],
+ config: (StreamExecutionEnvironment, ParameterTool) => Unit):
+ (ParameterTool, StreamExecutionEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
flinkInitializer.streamEnvConfFunc = config
- flinkInitializer.initStreamEnv()
+ flinkInitializer.initEnvironment()
}
}
}
(flinkInitializer.parameter, flinkInitializer.streamEnvironment)
}
- def initJavaStream(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
+ def initialize(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkStreamingInitializer(args.args, ApiType.java)
flinkInitializer.javaStreamEnvConfFunc = args.conf
- flinkInitializer.initStreamEnv()
+ flinkInitializer.initEnvironment()
}
}
}
@@ -152,14 +154,14 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
if (localStreamEnv == null) {
this.synchronized {
if (localStreamEnv == null) {
- initStreamEnv()
+ initEnvironment()
}
}
}
localStreamEnv
}
- def initStreamEnv(): Unit = {
+ def initEnvironment(): Unit = {
localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
Try(parameter.get(KEY_FLINK_PARALLELISM()).toInt).getOrElse {
Try(parameter.get(CoreOptions.DEFAULT_PARALLELISM.key()).toInt).getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue().toInt)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 58f575b57..f0d097004 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -36,54 +36,60 @@ private[flink] object FlinkTableInitializer {
private[this] var flinkInitializer: FlinkTableInitializer = _
- def initTable(args: Array[String], config: (TableConfig, ParameterTool) => Unit = null): (ParameterTool, TableEnvironment) = {
+ def initialize(args: Array[String],
+ config: (TableConfig, ParameterTool) => Unit):
+ (ParameterTool, TableEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
flinkInitializer.tableConfFunc = config
- flinkInitializer.initTableEnv(TableMode.batch)
+ flinkInitializer.initEnvironment(TableMode.batch)
}
}
}
(flinkInitializer.parameter, flinkInitializer.tableEnvironment)
}
- def initJavaTable(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
+ def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
flinkInitializer.javaTableEnvConfFunc = args.conf
- flinkInitializer.initTableEnv(TableMode.batch)
+ flinkInitializer.initEnvironment(TableMode.batch)
}
}
}
(flinkInitializer.parameter, flinkInitializer.tableEnvironment)
}
- def initStreamTable(args: Array[String], configStream: (StreamExecutionEnvironment, ParameterTool) => Unit = null, configTable: (TableConfig, ParameterTool) => Unit = null): (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
+ def initialize(args: Array[String],
+ configStream: (StreamExecutionEnvironment, ParameterTool) => Unit,
+ configTable: (TableConfig, ParameterTool) => Unit):
+ (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
flinkInitializer.streamEnvConfFunc = configStream
flinkInitializer.tableConfFunc = configTable
- flinkInitializer.initTableEnv(TableMode.streaming)
+ flinkInitializer.initEnvironment(TableMode.streaming)
}
}
}
(flinkInitializer.parameter, flinkInitializer.streamEnvironment, flinkInitializer.streamTableEnvironment)
}
- def initJavaStreamTable(args: StreamTableEnvConfig): (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
+ def initialize(args: StreamTableEnvConfig):
+ (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
flinkInitializer.javaStreamEnvConfFunc = args.streamConfig
flinkInitializer.javaTableEnvConfFunc = args.tableConfig
- flinkInitializer.initTableEnv(TableMode.streaming)
+ flinkInitializer.initEnvironment(TableMode.streaming)
}
}
}
@@ -103,7 +109,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
if (localStreamTableEnv == null) {
this.synchronized {
if (localStreamTableEnv == null) {
- initTableEnv(TableMode.streaming)
+ initEnvironment(TableMode.streaming)
}
}
}
@@ -114,7 +120,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
if (localTableEnv == null) {
this.synchronized {
if (localTableEnv == null) {
- initTableEnv(TableMode.batch)
+ initEnvironment(TableMode.batch)
}
}
}
@@ -152,10 +158,9 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
}
}
}
-
}
- def initTableEnv(tableMode: TableMode): Unit = {
+ def initEnvironment(tableMode: TableMode): Unit = {
val builder = EnvironmentSettings.newInstance()
val plannerType = Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse {
logWarn(s" $KEY_FLINK_TABLE_PLANNER undefined,use default by: blinkPlanner")
@@ -203,7 +208,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
tableMode match {
case TableMode.batch => localTableEnv = TableEnvironment.create(setting)
case TableMode.streaming =>
- initStreamEnv()
+ initEnvironment()
if (streamEnvConfFunc != null) {
streamEnvConfFunc(streamEnvironment, parameter)
}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
index 4a55aa1ea..fe9cbda70 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -43,7 +43,7 @@ class StreamTableContext(override val parameter: ParameterTool,
/**
* for Java
*/
- def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initJavaStreamTable(args))
+ def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initialize(args))
@deprecated override def connect(connectorDescriptor: ConnectorDescriptor): StreamTableDescriptor = tableEnv.connect(connectorDescriptor)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index 1dfc2328a..b74f689fc 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -39,7 +39,7 @@ class TableContext(override val parameter: ParameterTool,
*
* @param args
*/
- def this(args: TableEnvConfig) = this(FlinkTableInitializer.initJavaTable(args))
+ def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args))
@deprecated override def connect(connectorDescriptor: ConnectorDescriptor): ConnectTableDescriptor = tableEnv.connect(connectorDescriptor)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 561e24c82..43ec72ac8 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -45,26 +45,28 @@ private[flink] object FlinkStreamingInitializer {
private[this] var flinkInitializer: FlinkStreamingInitializer = _
- def initStream(args: Array[String], config: (StreamExecutionEnvironment, ParameterTool) => Unit = null): (ParameterTool, StreamExecutionEnvironment) = {
+ def initialize(args: Array[String],
+ config: (StreamExecutionEnvironment, ParameterTool) => Unit):
+ (ParameterTool, StreamExecutionEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
flinkInitializer.streamEnvConfFunc = config
- flinkInitializer.initStreamEnv()
+ flinkInitializer.initEnvironment()
}
}
}
(flinkInitializer.parameter, flinkInitializer.streamEnvironment)
}
- def initJavaStream(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
+ def initialize(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkStreamingInitializer(args.args, ApiType.java)
flinkInitializer.javaStreamEnvConfFunc = args.conf
- flinkInitializer.initStreamEnv()
+ flinkInitializer.initEnvironment()
}
}
}
@@ -152,14 +154,14 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
if (localStreamEnv == null) {
this.synchronized {
if (localStreamEnv == null) {
- initStreamEnv()
+ initEnvironment()
}
}
}
localStreamEnv
}
- def initStreamEnv(): Unit = {
+ def initEnvironment(): Unit = {
localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
Try(parameter.get(KEY_FLINK_PARALLELISM()).toInt).getOrElse {
Try(parameter.get(CoreOptions.DEFAULT_PARALLELISM.key()).toInt).getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue().toInt)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 579c0b652..3f5b01dc9 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -36,54 +36,60 @@ private[flink] object FlinkTableInitializer {
private[this] var flinkInitializer: FlinkTableInitializer = _
- def initTable(args: Array[String], config: (TableConfig, ParameterTool) => Unit = null): (ParameterTool, TableEnvironment) = {
+ def initialize(args: Array[String],
+ config: (TableConfig, ParameterTool) => Unit):
+ (ParameterTool, TableEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
flinkInitializer.tableConfFunc = config
- flinkInitializer.initTableEnv(TableMode.batch)
+ flinkInitializer.initEnvironment(TableMode.batch)
}
}
}
(flinkInitializer.parameter, flinkInitializer.tableEnvironment)
}
- def initJavaTable(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
+ def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
flinkInitializer.javaTableEnvConfFunc = args.conf
- flinkInitializer.initTableEnv(TableMode.batch)
+ flinkInitializer.initEnvironment(TableMode.batch)
}
}
}
(flinkInitializer.parameter, flinkInitializer.tableEnvironment)
}
- def initStreamTable(args: Array[String], configStream: (StreamExecutionEnvironment, ParameterTool) => Unit = null, configTable: (TableConfig, ParameterTool) => Unit = null): (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
+ def initialize(args: Array[String],
+ configStream: (StreamExecutionEnvironment, ParameterTool) => Unit,
+ configTable: (TableConfig, ParameterTool) => Unit):
+ (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
flinkInitializer.streamEnvConfFunc = configStream
flinkInitializer.tableConfFunc = configTable
- flinkInitializer.initTableEnv(TableMode.streaming)
+ flinkInitializer.initEnvironment(TableMode.streaming)
}
}
}
(flinkInitializer.parameter, flinkInitializer.streamEnvironment, flinkInitializer.streamTableEnvironment)
}
- def initJavaStreamTable(args: StreamTableEnvConfig): (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
+ def initialize(args: StreamTableEnvConfig):
+ (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
flinkInitializer.javaStreamEnvConfFunc = args.streamConfig
flinkInitializer.javaTableEnvConfFunc = args.tableConfig
- flinkInitializer.initTableEnv(TableMode.streaming)
+ flinkInitializer.initEnvironment(TableMode.streaming)
}
}
}
@@ -103,7 +109,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
if (localStreamTableEnv == null) {
this.synchronized {
if (localStreamTableEnv == null) {
- initTableEnv(TableMode.streaming)
+ initEnvironment(TableMode.streaming)
}
}
}
@@ -114,7 +120,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
if (localTableEnv == null) {
this.synchronized {
if (localTableEnv == null) {
- initTableEnv(TableMode.batch)
+ initEnvironment(TableMode.batch)
}
}
}
@@ -154,7 +160,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
}
- def initTableEnv(tableMode: TableMode): Unit = {
+ def initEnvironment(tableMode: TableMode): Unit = {
val builder = EnvironmentSettings.newInstance()
val plannerType = Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse {
logWarn(s" $KEY_FLINK_TABLE_PLANNER undefined,use default by: blinkPlanner")
@@ -202,7 +208,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
tableMode match {
case TableMode.batch => localTableEnv = TableEnvironment.create(setting)
case TableMode.streaming =>
- initStreamEnv()
+ initEnvironment()
if (streamEnvConfFunc != null) {
streamEnvConfFunc(streamEnvironment, parameter)
}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
index 4df39c0eb..91bc1c348 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -47,7 +47,7 @@ class StreamTableContext(override val parameter: ParameterTool,
/**
* for Java
*/
- def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initJavaStreamTable(args))
+ def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initialize(args))
override def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table = tableEnv.fromDataStream[T](dataStream, schema)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index 718eb75d0..7ead41d37 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -39,7 +39,7 @@ class TableContext(override val parameter: ParameterTool,
*
* @param args
*/
- def this(args: TableEnvConfig) = this(FlinkTableInitializer.initJavaTable(args))
+ def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args))
override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 9abd742e7..850787cd2 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -45,26 +45,27 @@ private[flink] object FlinkStreamingInitializer {
private[this] var flinkInitializer: FlinkStreamingInitializer = _
- def initStream(args: Array[String], config: (StreamExecutionEnvironment, ParameterTool) => Unit = null): (ParameterTool, StreamExecutionEnvironment) = {
+ def initialize(args: Array[String],
+ config: (StreamExecutionEnvironment, ParameterTool) => Unit): (ParameterTool, StreamExecutionEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
flinkInitializer.streamEnvConfFunc = config
- flinkInitializer.initStreamEnv()
+ flinkInitializer.initEnvironment()
}
}
}
(flinkInitializer.parameter, flinkInitializer.streamEnvironment)
}
- def initJavaStream(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
+ def initialize(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkStreamingInitializer(args.args, ApiType.java)
flinkInitializer.javaStreamEnvConfFunc = args.conf
- flinkInitializer.initStreamEnv()
+ flinkInitializer.initEnvironment()
}
}
}
@@ -111,6 +112,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
case x if x.startsWith("prop://") =>
PropertiesUtils.fromPropertiesText(DeflaterUtils.unzipString(x.drop(7)))
case x if x.startsWith("hdfs://") =>
+
/**
* If the configuration file with the hdfs, user will need to copy the hdfs-related configuration files under the resources dir
*/
@@ -152,14 +154,14 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
if (localStreamEnv == null) {
this.synchronized {
if (localStreamEnv == null) {
- initStreamEnv()
+ initEnvironment()
}
}
}
localStreamEnv
}
- def initStreamEnv(): Unit = {
+ def initEnvironment(): Unit = {
localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
Try(parameter.get(KEY_FLINK_PARALLELISM()).toInt).getOrElse {
Try(parameter.get(CoreOptions.DEFAULT_PARALLELISM.key()).toInt).getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue().toInt)
@@ -200,6 +202,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
strategy match {
case RestartStrategy.`failure-rate` =>
+
/**
* restart-strategy.failure-rate.max-failures-per-interval: maximum number of restarts before a Job is deemed to have failed
* restart-strategy.failure-rate.failure-rate-interval: time interval for calculating the failure rate
@@ -235,6 +238,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
Time.of(delay._1, delay._2)
))
case RestartStrategy.`fixed-delay` =>
+
/**
* restart-strategy.fixed-delay.attempts: the number of times Flink tries to execute a Job before it finally failure
* restart-strategy.fixed-delay.delay: specific how long the restart interval
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 1387557e4..5e132d1fa 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -36,54 +36,60 @@ private[flink] object FlinkTableInitializer {
private[this] var flinkInitializer: FlinkTableInitializer = _
- def initTable(args: Array[String], config: (TableConfig, ParameterTool) => Unit = null): (ParameterTool, TableEnvironment) = {
+ def initialize(args: Array[String],
+ config: (TableConfig, ParameterTool) => Unit):
+ (ParameterTool, TableEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
flinkInitializer.tableConfFunc = config
- flinkInitializer.initTableEnv(TableMode.batch)
+ flinkInitializer.initEnvironment(TableMode.batch)
}
}
}
(flinkInitializer.parameter, flinkInitializer.tableEnvironment)
}
- def initJavaTable(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
+ def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
flinkInitializer.javaTableEnvConfFunc = args.conf
- flinkInitializer.initTableEnv(TableMode.batch)
+ flinkInitializer.initEnvironment(TableMode.batch)
}
}
}
(flinkInitializer.parameter, flinkInitializer.tableEnvironment)
}
- def initStreamTable(args: Array[String], configStream: (StreamExecutionEnvironment, ParameterTool) => Unit = null, configTable: (TableConfig, ParameterTool) => Unit = null): (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
+ def initialize(args: Array[String],
+ configStream: (StreamExecutionEnvironment, ParameterTool) => Unit,
+ configTable: (TableConfig, ParameterTool) => Unit):
+ (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
flinkInitializer.streamEnvConfFunc = configStream
flinkInitializer.tableConfFunc = configTable
- flinkInitializer.initTableEnv(TableMode.streaming)
+ flinkInitializer.initEnvironment(TableMode.streaming)
}
}
}
(flinkInitializer.parameter, flinkInitializer.streamEnvironment, flinkInitializer.streamTableEnvironment)
}
- def initJavaStreamTable(args: StreamTableEnvConfig): (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
+ def initialize(args: StreamTableEnvConfig):
+ (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
flinkInitializer.javaStreamEnvConfFunc = args.streamConfig
flinkInitializer.javaTableEnvConfFunc = args.tableConfig
- flinkInitializer.initTableEnv(TableMode.streaming)
+ flinkInitializer.initEnvironment(TableMode.streaming)
}
}
}
@@ -103,7 +109,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
if (localStreamTableEnv == null) {
this.synchronized {
if (localStreamTableEnv == null) {
- initTableEnv(TableMode.streaming)
+ initEnvironment(TableMode.streaming)
}
}
}
@@ -114,7 +120,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
if (localTableEnv == null) {
this.synchronized {
if (localTableEnv == null) {
- initTableEnv(TableMode.batch)
+ initEnvironment(TableMode.batch)
}
}
}
@@ -154,7 +160,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
}
- def initTableEnv(tableMode: TableMode): Unit = {
+ def initEnvironment(tableMode: TableMode): Unit = {
val builder = EnvironmentSettings.newInstance()
val mode = Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
mode match {
@@ -185,7 +191,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
tableMode match {
case TableMode.batch => localTableEnv = TableEnvironment.create(setting)
case TableMode.streaming =>
- initStreamEnv()
+ initEnvironment()
if (streamEnvConfFunc != null) {
streamEnvConfFunc(streamEnvironment, parameter)
}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
index f5ecf8dcd..5a3201441 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -46,7 +46,7 @@ class StreamTableContext(override val parameter: ParameterTool,
/**
* for Java
*/
- def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initJavaStreamTable(args))
+ def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initialize(args))
override def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table = tableEnv.fromDataStream[T](dataStream, schema)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index de441b4fb..27e963265 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -39,7 +39,9 @@ class TableContext(override val parameter: ParameterTool,
*
* @param args
*/
- def this(args: TableEnvConfig) = this(FlinkTableInitializer.initJavaTable(args))
+ def this(args: TableEnvConfig) = {
+ this(FlinkTableInitializer.initialize(args))
+ }
override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index df9589139..784b48727 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -47,26 +47,28 @@ private[flink] object FlinkStreamingInitializer {
private[this] var flinkInitializer: FlinkStreamingInitializer = _
- def initStream(args: Array[String], config: (StreamExecutionEnvironment, ParameterTool) => Unit = null): (ParameterTool, StreamExecutionEnvironment) = {
+ def initialize(args: Array[String],
+ config: (StreamExecutionEnvironment, ParameterTool) => Unit):
+ (ParameterTool, StreamExecutionEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
flinkInitializer.streamEnvConfFunc = config
- flinkInitializer.initStreamEnv()
+ flinkInitializer.initEnvironment()
}
}
}
(flinkInitializer.parameter, flinkInitializer.streamEnvironment)
}
- def initJavaStream(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
+ def initialize(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkStreamingInitializer(args.args, ApiType.java)
flinkInitializer.javaStreamEnvConfFunc = args.conf
- flinkInitializer.initStreamEnv()
+ flinkInitializer.initEnvironment()
}
}
}
@@ -151,14 +153,14 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
if (localStreamEnv == null) {
this.synchronized {
if (localStreamEnv == null) {
- initStreamEnv()
+ initEnvironment()
}
}
}
localStreamEnv
}
- def initStreamEnv(): Unit = {
+ def initEnvironment(): Unit = {
localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
Try(parameter.get(KEY_FLINK_PARALLELISM()).toInt).getOrElse {
Try(parameter.get(CoreOptions.DEFAULT_PARALLELISM.key()).toInt).getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue().toInt)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index ba51efaf4..8e3cc7bbe 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -39,54 +39,60 @@ private[flink] object FlinkTableInitializer {
private[this] var flinkInitializer: FlinkTableInitializer = _
- def initTable(args: Array[String], config: (TableConfig, ParameterTool) => Unit = null): (ParameterTool, TableEnvironment) = {
+ def initialize(args: Array[String],
+ config: (TableConfig, ParameterTool) => Unit):
+ (ParameterTool, TableEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
flinkInitializer.tableConfFunc = config
- flinkInitializer.initTableEnv(TableMode.batch)
+ flinkInitializer.initEnvironment(TableMode.batch)
}
}
}
(flinkInitializer.parameter, flinkInitializer.tableEnvironment)
}
- def initJavaTable(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
+ def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
flinkInitializer.javaTableEnvConfFunc = args.conf
- flinkInitializer.initTableEnv(TableMode.batch)
+ flinkInitializer.initEnvironment(TableMode.batch)
}
}
}
(flinkInitializer.parameter, flinkInitializer.tableEnvironment)
}
- def initStreamTable(args: Array[String], configStream: (StreamExecutionEnvironment, ParameterTool) => Unit = null, configTable: (TableConfig, ParameterTool) => Unit = null): (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
+ def initialize(args: Array[String],
+ configStream: (StreamExecutionEnvironment, ParameterTool) => Unit,
+ configTable: (TableConfig, ParameterTool) => Unit):
+ (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
flinkInitializer.streamEnvConfFunc = configStream
flinkInitializer.tableConfFunc = configTable
- flinkInitializer.initTableEnv(TableMode.streaming)
+ flinkInitializer.initEnvironment(TableMode.streaming)
}
}
}
(flinkInitializer.parameter, flinkInitializer.streamEnvironment, flinkInitializer.streamTableEnvironment)
}
- def initJavaStreamTable(args: StreamTableEnvConfig): (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
+ def initialize(args: StreamTableEnvConfig):
+ (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
if (flinkInitializer == null) {
this.synchronized {
if (flinkInitializer == null) {
flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
flinkInitializer.javaStreamEnvConfFunc = args.streamConfig
flinkInitializer.javaTableEnvConfFunc = args.tableConfig
- flinkInitializer.initTableEnv(TableMode.streaming)
+ flinkInitializer.initEnvironment(TableMode.streaming)
}
}
}
@@ -105,7 +111,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
if (localStreamTableEnv == null) {
this.synchronized {
if (localStreamTableEnv == null) {
- initTableEnv(TableMode.streaming)
+ initEnvironment(TableMode.streaming)
}
}
}
@@ -116,7 +122,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
if (localTableEnv == null) {
this.synchronized {
if (localTableEnv == null) {
- initTableEnv(TableMode.batch)
+ initEnvironment(TableMode.batch)
}
}
}
@@ -156,7 +162,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
}
- def initTableEnv(tableMode: TableMode): Unit = {
+ def initEnvironment(tableMode: TableMode): Unit = {
val builder = EnvironmentSettings.newInstance()
val mode = Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
mode match {
@@ -187,7 +193,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
tableMode match {
case TableMode.batch => localTableEnv = TableEnvironment.create(setting)
case TableMode.streaming =>
- initStreamEnv()
+ initEnvironment()
if (streamEnvConfFunc != null) {
streamEnvConfFunc(streamEnvironment, parameter)
}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
index b00fc834a..c9a90496e 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -32,7 +32,7 @@ class StreamTableContext(override val parameter: ParameterTool,
def this(args: (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment)) = this(args._1, args._2, args._3)
- def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initJavaStreamTable(args))
+ def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initialize(args))
override def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table = tableEnv.fromDataStream[T](dataStream, schema)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index b8b1eee9b..e93c25894 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -28,7 +28,7 @@ class TableContext(override val parameter: ParameterTool,
def this(args: (ParameterTool, TableEnvironment)) = this(args._1, args._2)
- def this(args: TableEnvConfig) = this(FlinkTableInitializer.initJavaTable(args))
+ def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args))
override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*)
diff --git a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index 7d9973191..28cbb92e0 100644
--- a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++ b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -17,19 +17,51 @@
package org.apache.streampark.flink.cli
-import org.apache.streampark.flink.core.scala.FlinkStreamTable
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.configuration.ExecutionOptions
+import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_CONF, KEY_FLINK_SQL, KEY_FLINK_TABLE_MODE}
+import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
+import org.apache.streampark.flink.core.{SqlCommand, SqlCommandParser}
+import org.apache.streampark.flink.core.scala.{FlinkStreamTable, FlinkTable}
import scala.language.implicitConversions
+import scala.util.Try
+object SqlClient extends App {
-object SqlClient extends FlinkStreamTable {
+ val parameterTool = ParameterTool.fromArgs(args)
- override def handle(): Unit = context.sql()
+ val flinkSql = {
+ val sql = parameterTool.get(KEY_FLINK_SQL())
+ require(sql != null && sql.trim.nonEmpty, "Usage: flink sql cannot be null")
+ Try(DeflaterUtils.unzipString(sql)).getOrElse(throw new IllegalArgumentException("Usage: flink sql is invalid or null, please check"))
+ }
+
+ val sets = SqlCommandParser.parseSQL(flinkSql).filter(_.command == SqlCommand.SET)
+
+ val mode = sets.find(_.operands.head == ExecutionOptions.RUNTIME_MODE.key()) match {
+ case Some(e) => e.operands(1)
+ case None =>
+ val appConf = parameterTool.get(KEY_APP_CONF(), null)
+ val defaultMode = "streaming"
+ if (appConf == null) defaultMode else {
+ val parameter = PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(appConf.drop(7)))
+ parameter.getOrElse(KEY_FLINK_TABLE_MODE, defaultMode)
+ }
+ }
+
+ mode match {
+ case "batch" => BatchSqlApp.main(args)
+ case "streaming" => StreamSqlApp.main(args)
+ case _ => throw new IllegalArgumentException("Usage: runtime execution-mode invalid, optional [streaming|batch]")
+ }
+
+ private[this] object BatchSqlApp extends FlinkTable {
+ override def handle(): Unit = context.sql()
+ }
- implicit def callback(message: String): Unit = {
- // scalastyle:off println
- println(message)
- // scalastyle:on println
+ private[this] object StreamSqlApp extends FlinkStreamTable {
+ override def handle(): Unit = context.sql()
}
}