You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by ya...@apache.org on 2022/09/23 15:45:00 UTC

[incubator-streampark] branch dev updated: [feature] execution run-mode support streaming and batch (#1670)

This is an automated email from the ASF dual-hosted git repository.

yangzhou pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new f91bff660 [feature] execution run-mode support streaming and batch (#1670)
f91bff660 is described below

commit f91bff660a7e23396a3bee78494224d4b09f27db
Author: benjobs <be...@apache.org>
AuthorDate: Fri Sep 23 23:44:50 2022 +0800

    [feature] execution run-mode support streaming and batch (#1670)
    
    * [feature] execution run-mode support streaming and batch
    
    * [feature] execution run-mode support streaming and batch
    
    * [feature] execution run-mode support streaming and batch
---
 .../flink/core/scala/FlinkStreamTable.scala        |  2 +-
 .../flink/core/scala/FlinkStreaming.scala          |  4 +-
 .../streampark/flink/core/scala/FlinkTable.scala   |  2 +-
 .../streampark/flink/core/FlinkSqlExecutor.scala   |  3 +-
 .../flink/core/FlinkStreamTableTrait.scala         |  1 -
 .../streampark/flink/core/FlinkTableTrait.scala    |  1 -
 .../flink/core/test/FlinkSqlExecuteFunSuite.scala  |  2 +-
 .../flink/core/FlinkStreamingInitializer.scala     | 14 ++++---
 .../flink/core/FlinkTableInitializer.scala         | 31 +++++++++------
 .../streampark/flink/core/StreamTableContext.scala |  2 +-
 .../streampark/flink/core/TableContext.scala       |  2 +-
 .../flink/core/FlinkStreamingInitializer.scala     | 14 ++++---
 .../flink/core/FlinkTableInitializer.scala         | 30 ++++++++------
 .../streampark/flink/core/StreamTableContext.scala |  2 +-
 .../streampark/flink/core/TableContext.scala       |  2 +-
 .../flink/core/FlinkStreamingInitializer.scala     | 16 +++++---
 .../flink/core/FlinkTableInitializer.scala         | 30 ++++++++------
 .../streampark/flink/core/StreamTableContext.scala |  2 +-
 .../streampark/flink/core/TableContext.scala       |  4 +-
 .../flink/core/FlinkStreamingInitializer.scala     | 14 ++++---
 .../flink/core/FlinkTableInitializer.scala         | 30 ++++++++------
 .../streampark/flink/core/StreamTableContext.scala |  2 +-
 .../streampark/flink/core/TableContext.scala       |  2 +-
 .../apache/streampark/flink/cli/SqlClient.scala    | 46 ++++++++++++++++++----
 24 files changed, 162 insertions(+), 96 deletions(-)

diff --git a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
index 226e0d398..d8f4fd4e8 100644
--- a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
+++ b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
@@ -51,7 +51,7 @@ trait FlinkStreamTable extends Logger {
 
   private[this] def init(args: Array[String]): Unit = {
     SystemPropertyUtils.setAppHome(KEY_APP_HOME, classOf[FlinkStreamTable])
-    context = new StreamTableContext(FlinkTableInitializer.initStreamTable(args, configStream, configTable))
+    context = new StreamTableContext(FlinkTableInitializer.initialize(args, configStream, configTable))
   }
 
   def configStream(env: StreamExecutionEnvironment, parameter: ParameterTool): Unit = {}
diff --git a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
index 30501d734..86ea0e40f 100644
--- a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
+++ b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
@@ -38,7 +38,7 @@ class StreamingContext(val parameter: ParameterTool, private val environment: St
   /**
    * for Java
    */
-  def this(args: StreamEnvConfig) = this(FlinkStreamingInitializer.initJavaStream(args))
+  def this(args: StreamEnvConfig) = this(FlinkStreamingInitializer.initialize(args))
 
   /**
    * Recommend use this Api to start task
@@ -83,7 +83,7 @@ trait FlinkStreaming extends Serializable with Logger {
 
   private[this] def init(args: Array[String]): Unit = {
     SystemPropertyUtils.setAppHome(KEY_APP_HOME, classOf[FlinkStreaming])
-    context = new StreamingContext(FlinkStreamingInitializer.initStream(args, config))
+    context = new StreamingContext(FlinkStreamingInitializer.initialize(args, config))
   }
 
   def ready(): Unit = {}
diff --git a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
index 63794df87..04a65f5f5 100644
--- a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
+++ b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
@@ -44,7 +44,7 @@ trait FlinkTable extends Logger {
 
   private[this] def init(args: Array[String]): Unit = {
     SystemPropertyUtils.setAppHome(KEY_APP_HOME, classOf[FlinkTable])
-    context = new TableContext(FlinkTableInitializer.initTable(args, config))
+    context = new TableContext(FlinkTableInitializer.initialize(args, config))
   }
 
   def ready(): Unit = {}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index cdfce07a3..d65d5c46b 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -20,7 +20,7 @@ import org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.core.SqlCommand._
 import org.apache.flink.api.java.utils.ParameterTool
-import org.apache.flink.configuration.{ConfigOption, Configuration}
+import org.apache.flink.configuration.{ConfigOption, Configuration, ExecutionOptions}
 import org.apache.flink.table.api.TableEnvironment
 import org.apache.flink.table.api.config.{ExecutionConfigOptions, OptimizerConfigOptions, TableConfigOptions}
 
@@ -59,6 +59,7 @@ object FlinkSqlExecutor extends Logger {
     val configOptions = new JavaHashMap[String, ConfigOption[_]]
     val configList = List(
       //classOf[PythonOptions],
+      classOf[ExecutionOptions],
       classOf[ExecutionConfigOptions],
       classOf[OptimizerConfigOptions],
       classOf[TableConfigOptions]
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
index 04845782c..54d86e129 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
@@ -41,7 +41,6 @@ import org.apache.flink.table.catalog.Catalog
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions._
 import org.apache.flink.table.module.Module
-import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.types.AbstractDataType
 import org.apache.flink.util.SplittableIterator
 
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
index 541d9984f..ec2af84f4 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
@@ -24,7 +24,6 @@ import org.apache.flink.table.catalog.Catalog
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.functions._
 import org.apache.flink.table.module.Module
-import org.apache.flink.table.sources.TableSource
 import org.apache.flink.table.types.AbstractDataType
 
 import java.lang
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
index f021ee753..6ebe3a238 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
@@ -28,7 +28,7 @@ class FlinkSqlExecuteFunSuite extends AnyFunSuite {
 
   def execute(sql: String)(implicit func: String => Unit): Unit = {
     val args = ArrayBuffer(KEY_FLINK_SQL("--"), DeflaterUtils.zipString(sql.stripMargin))
-    val context = new StreamTableContext(FlinkTableInitializer.initStreamTable(args.toArray, null, null))
+    val context = new StreamTableContext(FlinkTableInitializer.initialize(args.toArray, null, null))
     FlinkSqlExecutor.executeSql(KEY_FLINK_SQL(), context.parameter, context)
   }
 
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index b4575b1ce..c20279128 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -45,26 +45,28 @@ private[flink] object FlinkStreamingInitializer {
 
   private[this] var flinkInitializer: FlinkStreamingInitializer = _
 
-  def initStream(args: Array[String], config: (StreamExecutionEnvironment, ParameterTool) => Unit = null): (ParameterTool, StreamExecutionEnvironment) = {
+  def initialize(args: Array[String],
+                 config: (StreamExecutionEnvironment, ParameterTool) => Unit):
+  (ParameterTool, StreamExecutionEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
           flinkInitializer.streamEnvConfFunc = config
-          flinkInitializer.initStreamEnv()
+          flinkInitializer.initEnvironment()
         }
       }
     }
     (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
   }
 
-  def initJavaStream(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
+  def initialize(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkStreamingInitializer(args.args, ApiType.java)
           flinkInitializer.javaStreamEnvConfFunc = args.conf
-          flinkInitializer.initStreamEnv()
+          flinkInitializer.initEnvironment()
         }
       }
     }
@@ -152,14 +154,14 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
     if (localStreamEnv == null) {
       this.synchronized {
         if (localStreamEnv == null) {
-          initStreamEnv()
+          initEnvironment()
         }
       }
     }
     localStreamEnv
   }
 
-  def initStreamEnv(): Unit = {
+  def initEnvironment(): Unit = {
     localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
     Try(parameter.get(KEY_FLINK_PARALLELISM()).toInt).getOrElse {
       Try(parameter.get(CoreOptions.DEFAULT_PARALLELISM.key()).toInt).getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue().toInt)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 58f575b57..f0d097004 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -36,54 +36,60 @@ private[flink] object FlinkTableInitializer {
 
   private[this] var flinkInitializer: FlinkTableInitializer = _
 
-  def initTable(args: Array[String], config: (TableConfig, ParameterTool) => Unit = null): (ParameterTool, TableEnvironment) = {
+  def initialize(args: Array[String],
+                 config: (TableConfig, ParameterTool) => Unit):
+  (ParameterTool, TableEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
           flinkInitializer.tableConfFunc = config
-          flinkInitializer.initTableEnv(TableMode.batch)
+          flinkInitializer.initEnvironment(TableMode.batch)
         }
       }
     }
     (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
   }
 
-  def initJavaTable(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
+  def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
           flinkInitializer.javaTableEnvConfFunc = args.conf
-          flinkInitializer.initTableEnv(TableMode.batch)
+          flinkInitializer.initEnvironment(TableMode.batch)
         }
       }
     }
     (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
   }
 
-  def initStreamTable(args: Array[String], configStream: (StreamExecutionEnvironment, ParameterTool) => Unit = null, configTable: (TableConfig, ParameterTool) => Unit = null): (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
+  def initialize(args: Array[String],
+                 configStream: (StreamExecutionEnvironment, ParameterTool) => Unit,
+                 configTable: (TableConfig, ParameterTool) => Unit):
+  (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
           flinkInitializer.streamEnvConfFunc = configStream
           flinkInitializer.tableConfFunc = configTable
-          flinkInitializer.initTableEnv(TableMode.streaming)
+          flinkInitializer.initEnvironment(TableMode.streaming)
         }
       }
     }
     (flinkInitializer.parameter, flinkInitializer.streamEnvironment, flinkInitializer.streamTableEnvironment)
   }
 
-  def initJavaStreamTable(args: StreamTableEnvConfig): (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
+  def initialize(args: StreamTableEnvConfig):
+  (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
           flinkInitializer.javaStreamEnvConfFunc = args.streamConfig
           flinkInitializer.javaTableEnvConfFunc = args.tableConfig
-          flinkInitializer.initTableEnv(TableMode.streaming)
+          flinkInitializer.initEnvironment(TableMode.streaming)
         }
       }
     }
@@ -103,7 +109,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
     if (localStreamTableEnv == null) {
       this.synchronized {
         if (localStreamTableEnv == null) {
-          initTableEnv(TableMode.streaming)
+          initEnvironment(TableMode.streaming)
         }
       }
     }
@@ -114,7 +120,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
     if (localTableEnv == null) {
       this.synchronized {
         if (localTableEnv == null) {
-          initTableEnv(TableMode.batch)
+          initEnvironment(TableMode.batch)
         }
       }
     }
@@ -152,10 +158,9 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
             }
         }
     }
-
   }
 
-  def initTableEnv(tableMode: TableMode): Unit = {
+  def initEnvironment(tableMode: TableMode): Unit = {
     val builder = EnvironmentSettings.newInstance()
     val plannerType = Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse {
       logWarn(s" $KEY_FLINK_TABLE_PLANNER undefined,use default by: blinkPlanner")
@@ -203,7 +208,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
     tableMode match {
       case TableMode.batch => localTableEnv = TableEnvironment.create(setting)
       case TableMode.streaming =>
-        initStreamEnv()
+        initEnvironment()
         if (streamEnvConfFunc != null) {
           streamEnvConfFunc(streamEnvironment, parameter)
         }
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
index 4a55aa1ea..fe9cbda70 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -43,7 +43,7 @@ class StreamTableContext(override val parameter: ParameterTool,
   /**
    * for Java
    */
-  def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initJavaStreamTable(args))
+  def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initialize(args))
 
 
   @deprecated override def connect(connectorDescriptor: ConnectorDescriptor): StreamTableDescriptor = tableEnv.connect(connectorDescriptor)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index 1dfc2328a..b74f689fc 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -39,7 +39,7 @@ class TableContext(override val parameter: ParameterTool,
    *
    * @param args
    */
-  def this(args: TableEnvConfig) = this(FlinkTableInitializer.initJavaTable(args))
+  def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args))
 
 
   @deprecated override def connect(connectorDescriptor: ConnectorDescriptor): ConnectTableDescriptor = tableEnv.connect(connectorDescriptor)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 561e24c82..43ec72ac8 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -45,26 +45,28 @@ private[flink] object FlinkStreamingInitializer {
 
   private[this] var flinkInitializer: FlinkStreamingInitializer = _
 
-  def initStream(args: Array[String], config: (StreamExecutionEnvironment, ParameterTool) => Unit = null): (ParameterTool, StreamExecutionEnvironment) = {
+  def initialize(args: Array[String],
+                 config: (StreamExecutionEnvironment, ParameterTool) => Unit):
+  (ParameterTool, StreamExecutionEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
           flinkInitializer.streamEnvConfFunc = config
-          flinkInitializer.initStreamEnv()
+          flinkInitializer.initEnvironment()
         }
       }
     }
     (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
   }
 
-  def initJavaStream(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
+  def initialize(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkStreamingInitializer(args.args, ApiType.java)
           flinkInitializer.javaStreamEnvConfFunc = args.conf
-          flinkInitializer.initStreamEnv()
+          flinkInitializer.initEnvironment()
         }
       }
     }
@@ -152,14 +154,14 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
     if (localStreamEnv == null) {
       this.synchronized {
         if (localStreamEnv == null) {
-          initStreamEnv()
+          initEnvironment()
         }
       }
     }
     localStreamEnv
   }
 
-  def initStreamEnv(): Unit = {
+  def initEnvironment(): Unit = {
     localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
     Try(parameter.get(KEY_FLINK_PARALLELISM()).toInt).getOrElse {
       Try(parameter.get(CoreOptions.DEFAULT_PARALLELISM.key()).toInt).getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue().toInt)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 579c0b652..3f5b01dc9 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -36,54 +36,60 @@ private[flink] object FlinkTableInitializer {
 
   private[this] var flinkInitializer: FlinkTableInitializer = _
 
-  def initTable(args: Array[String], config: (TableConfig, ParameterTool) => Unit = null): (ParameterTool, TableEnvironment) = {
+  def initialize(args: Array[String],
+                 config: (TableConfig, ParameterTool) => Unit):
+  (ParameterTool, TableEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
           flinkInitializer.tableConfFunc = config
-          flinkInitializer.initTableEnv(TableMode.batch)
+          flinkInitializer.initEnvironment(TableMode.batch)
         }
       }
     }
     (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
   }
 
-  def initJavaTable(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
+  def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
           flinkInitializer.javaTableEnvConfFunc = args.conf
-          flinkInitializer.initTableEnv(TableMode.batch)
+          flinkInitializer.initEnvironment(TableMode.batch)
         }
       }
     }
     (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
   }
 
-  def initStreamTable(args: Array[String], configStream: (StreamExecutionEnvironment, ParameterTool) => Unit = null, configTable: (TableConfig, ParameterTool) => Unit = null): (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
+  def initialize(args: Array[String],
+                 configStream: (StreamExecutionEnvironment, ParameterTool) => Unit,
+                 configTable: (TableConfig, ParameterTool) => Unit):
+  (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
           flinkInitializer.streamEnvConfFunc = configStream
           flinkInitializer.tableConfFunc = configTable
-          flinkInitializer.initTableEnv(TableMode.streaming)
+          flinkInitializer.initEnvironment(TableMode.streaming)
         }
       }
     }
     (flinkInitializer.parameter, flinkInitializer.streamEnvironment, flinkInitializer.streamTableEnvironment)
   }
 
-  def initJavaStreamTable(args: StreamTableEnvConfig): (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
+  def initialize(args: StreamTableEnvConfig):
+  (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
           flinkInitializer.javaStreamEnvConfFunc = args.streamConfig
           flinkInitializer.javaTableEnvConfFunc = args.tableConfig
-          flinkInitializer.initTableEnv(TableMode.streaming)
+          flinkInitializer.initEnvironment(TableMode.streaming)
         }
       }
     }
@@ -103,7 +109,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
     if (localStreamTableEnv == null) {
       this.synchronized {
         if (localStreamTableEnv == null) {
-          initTableEnv(TableMode.streaming)
+          initEnvironment(TableMode.streaming)
         }
       }
     }
@@ -114,7 +120,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
     if (localTableEnv == null) {
       this.synchronized {
         if (localTableEnv == null) {
-          initTableEnv(TableMode.batch)
+          initEnvironment(TableMode.batch)
         }
       }
     }
@@ -154,7 +160,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
 
   }
 
-  def initTableEnv(tableMode: TableMode): Unit = {
+  def initEnvironment(tableMode: TableMode): Unit = {
     val builder = EnvironmentSettings.newInstance()
     val plannerType = Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse {
       logWarn(s" $KEY_FLINK_TABLE_PLANNER undefined,use default by: blinkPlanner")
@@ -202,7 +208,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
     tableMode match {
       case TableMode.batch => localTableEnv = TableEnvironment.create(setting)
       case TableMode.streaming =>
-        initStreamEnv()
+        initEnvironment()
         if (streamEnvConfFunc != null) {
           streamEnvConfFunc(streamEnvironment, parameter)
         }
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
index 4df39c0eb..91bc1c348 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -47,7 +47,7 @@ class StreamTableContext(override val parameter: ParameterTool,
   /**
    * for Java
    */
-  def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initJavaStreamTable(args))
+  def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initialize(args))
 
 
   override def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table = tableEnv.fromDataStream[T](dataStream, schema)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index 718eb75d0..7ead41d37 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -39,7 +39,7 @@ class TableContext(override val parameter: ParameterTool,
    *
    * @param args
    */
-  def this(args: TableEnvConfig) = this(FlinkTableInitializer.initJavaTable(args))
+  def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args))
 
 
   override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 9abd742e7..850787cd2 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -45,26 +45,27 @@ private[flink] object FlinkStreamingInitializer {
 
   private[this] var flinkInitializer: FlinkStreamingInitializer = _
 
-  def initStream(args: Array[String], config: (StreamExecutionEnvironment, ParameterTool) => Unit = null): (ParameterTool, StreamExecutionEnvironment) = {
+  def initialize(args: Array[String],
+                 config: (StreamExecutionEnvironment, ParameterTool) => Unit): (ParameterTool, StreamExecutionEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
           flinkInitializer.streamEnvConfFunc = config
-          flinkInitializer.initStreamEnv()
+          flinkInitializer.initEnvironment()
         }
       }
     }
     (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
   }
 
-  def initJavaStream(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
+  def initialize(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkStreamingInitializer(args.args, ApiType.java)
           flinkInitializer.javaStreamEnvConfFunc = args.conf
-          flinkInitializer.initStreamEnv()
+          flinkInitializer.initEnvironment()
         }
       }
     }
@@ -111,6 +112,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
       case x if x.startsWith("prop://") =>
         PropertiesUtils.fromPropertiesText(DeflaterUtils.unzipString(x.drop(7)))
       case x if x.startsWith("hdfs://") =>
+
         /**
          * If the configuration file with the hdfs, user will need to copy the hdfs-related configuration files under the resources dir
          */
@@ -152,14 +154,14 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
     if (localStreamEnv == null) {
       this.synchronized {
         if (localStreamEnv == null) {
-          initStreamEnv()
+          initEnvironment()
         }
       }
     }
     localStreamEnv
   }
 
-  def initStreamEnv(): Unit = {
+  def initEnvironment(): Unit = {
     localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
     Try(parameter.get(KEY_FLINK_PARALLELISM()).toInt).getOrElse {
       Try(parameter.get(CoreOptions.DEFAULT_PARALLELISM.key()).toInt).getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue().toInt)
@@ -200,6 +202,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
 
     strategy match {
       case RestartStrategy.`failure-rate` =>
+
         /**
          * restart-strategy.failure-rate.max-failures-per-interval: maximum number of restarts before a Job is deemed to have failed
          * restart-strategy.failure-rate.failure-rate-interval: time interval for calculating the failure rate
@@ -235,6 +238,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
           Time.of(delay._1, delay._2)
         ))
       case RestartStrategy.`fixed-delay` =>
+
         /**
          * restart-strategy.fixed-delay.attempts: the number of times Flink tries to execute a Job before it finally failure
          * restart-strategy.fixed-delay.delay: specific how long the restart interval
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 1387557e4..5e132d1fa 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -36,54 +36,60 @@ private[flink] object FlinkTableInitializer {
 
   private[this] var flinkInitializer: FlinkTableInitializer = _
 
-  def initTable(args: Array[String], config: (TableConfig, ParameterTool) => Unit = null): (ParameterTool, TableEnvironment) = {
+  def initialize(args: Array[String],
+                 config: (TableConfig, ParameterTool) => Unit):
+  (ParameterTool, TableEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
           flinkInitializer.tableConfFunc = config
-          flinkInitializer.initTableEnv(TableMode.batch)
+          flinkInitializer.initEnvironment(TableMode.batch)
         }
       }
     }
     (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
   }
 
-  def initJavaTable(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
+  def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
           flinkInitializer.javaTableEnvConfFunc = args.conf
-          flinkInitializer.initTableEnv(TableMode.batch)
+          flinkInitializer.initEnvironment(TableMode.batch)
         }
       }
     }
     (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
   }
 
-  def initStreamTable(args: Array[String], configStream: (StreamExecutionEnvironment, ParameterTool) => Unit = null, configTable: (TableConfig, ParameterTool) => Unit = null): (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
+  def initialize(args: Array[String],
+                 configStream: (StreamExecutionEnvironment, ParameterTool) => Unit,
+                 configTable: (TableConfig, ParameterTool) => Unit):
+  (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
           flinkInitializer.streamEnvConfFunc = configStream
           flinkInitializer.tableConfFunc = configTable
-          flinkInitializer.initTableEnv(TableMode.streaming)
+          flinkInitializer.initEnvironment(TableMode.streaming)
         }
       }
     }
     (flinkInitializer.parameter, flinkInitializer.streamEnvironment, flinkInitializer.streamTableEnvironment)
   }
 
-  def initJavaStreamTable(args: StreamTableEnvConfig): (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
+  def initialize(args: StreamTableEnvConfig):
+  (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
           flinkInitializer.javaStreamEnvConfFunc = args.streamConfig
           flinkInitializer.javaTableEnvConfFunc = args.tableConfig
-          flinkInitializer.initTableEnv(TableMode.streaming)
+          flinkInitializer.initEnvironment(TableMode.streaming)
         }
       }
     }
@@ -103,7 +109,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
     if (localStreamTableEnv == null) {
       this.synchronized {
         if (localStreamTableEnv == null) {
-          initTableEnv(TableMode.streaming)
+          initEnvironment(TableMode.streaming)
         }
       }
     }
@@ -114,7 +120,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
     if (localTableEnv == null) {
       this.synchronized {
         if (localTableEnv == null) {
-          initTableEnv(TableMode.batch)
+          initEnvironment(TableMode.batch)
         }
       }
     }
@@ -154,7 +160,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
 
   }
 
-  def initTableEnv(tableMode: TableMode): Unit = {
+  def initEnvironment(tableMode: TableMode): Unit = {
     val builder = EnvironmentSettings.newInstance()
     val mode = Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
     mode match {
@@ -185,7 +191,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
     tableMode match {
       case TableMode.batch => localTableEnv = TableEnvironment.create(setting)
       case TableMode.streaming =>
-        initStreamEnv()
+        initEnvironment()
         if (streamEnvConfFunc != null) {
           streamEnvConfFunc(streamEnvironment, parameter)
         }
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
index f5ecf8dcd..5a3201441 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -46,7 +46,7 @@ class StreamTableContext(override val parameter: ParameterTool,
   /**
    * for Java
    */
-  def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initJavaStreamTable(args))
+  def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initialize(args))
 
 
   override def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table = tableEnv.fromDataStream[T](dataStream, schema)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index de441b4fb..27e963265 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -39,7 +39,9 @@ class TableContext(override val parameter: ParameterTool,
    *
    * @param args
    */
-  def this(args: TableEnvConfig) = this(FlinkTableInitializer.initJavaTable(args))
+  def this(args: TableEnvConfig) = {
+    this(FlinkTableInitializer.initialize(args))
+  }
 
 
   override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index df9589139..784b48727 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -47,26 +47,28 @@ private[flink] object FlinkStreamingInitializer {
 
   private[this] var flinkInitializer: FlinkStreamingInitializer = _
 
-  def initStream(args: Array[String], config: (StreamExecutionEnvironment, ParameterTool) => Unit = null): (ParameterTool, StreamExecutionEnvironment) = {
+  def initialize(args: Array[String],
+                 config: (StreamExecutionEnvironment, ParameterTool) => Unit):
+  (ParameterTool, StreamExecutionEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkStreamingInitializer(args, ApiType.scala)
           flinkInitializer.streamEnvConfFunc = config
-          flinkInitializer.initStreamEnv()
+          flinkInitializer.initEnvironment()
         }
       }
     }
     (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
   }
 
-  def initJavaStream(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
+  def initialize(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkStreamingInitializer(args.args, ApiType.java)
           flinkInitializer.javaStreamEnvConfFunc = args.conf
-          flinkInitializer.initStreamEnv()
+          flinkInitializer.initEnvironment()
         }
       }
     }
@@ -151,14 +153,14 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
     if (localStreamEnv == null) {
       this.synchronized {
         if (localStreamEnv == null) {
-          initStreamEnv()
+          initEnvironment()
         }
       }
     }
     localStreamEnv
   }
 
-  def initStreamEnv(): Unit = {
+  def initEnvironment(): Unit = {
     localStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment
     Try(parameter.get(KEY_FLINK_PARALLELISM()).toInt).getOrElse {
       Try(parameter.get(CoreOptions.DEFAULT_PARALLELISM.key()).toInt).getOrElse(CoreOptions.DEFAULT_PARALLELISM.defaultValue().toInt)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index ba51efaf4..8e3cc7bbe 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -39,54 +39,60 @@ private[flink] object FlinkTableInitializer {
 
   private[this] var flinkInitializer: FlinkTableInitializer = _
 
-  def initTable(args: Array[String], config: (TableConfig, ParameterTool) => Unit = null): (ParameterTool, TableEnvironment) = {
+  def initialize(args: Array[String],
+                 config: (TableConfig, ParameterTool) => Unit):
+  (ParameterTool, TableEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
           flinkInitializer.tableConfFunc = config
-          flinkInitializer.initTableEnv(TableMode.batch)
+          flinkInitializer.initEnvironment(TableMode.batch)
         }
       }
     }
     (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
   }
 
-  def initJavaTable(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
+  def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
           flinkInitializer.javaTableEnvConfFunc = args.conf
-          flinkInitializer.initTableEnv(TableMode.batch)
+          flinkInitializer.initEnvironment(TableMode.batch)
         }
       }
     }
     (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
   }
 
-  def initStreamTable(args: Array[String], configStream: (StreamExecutionEnvironment, ParameterTool) => Unit = null, configTable: (TableConfig, ParameterTool) => Unit = null): (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
+  def initialize(args: Array[String],
+                 configStream: (StreamExecutionEnvironment, ParameterTool) => Unit,
+                 configTable: (TableConfig, ParameterTool) => Unit):
+  (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkTableInitializer(args, ApiType.scala)
           flinkInitializer.streamEnvConfFunc = configStream
           flinkInitializer.tableConfFunc = configTable
-          flinkInitializer.initTableEnv(TableMode.streaming)
+          flinkInitializer.initEnvironment(TableMode.streaming)
         }
       }
     }
     (flinkInitializer.parameter, flinkInitializer.streamEnvironment, flinkInitializer.streamTableEnvironment)
   }
 
-  def initJavaStreamTable(args: StreamTableEnvConfig): (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
+  def initialize(args: StreamTableEnvConfig):
+  (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment) = {
     if (flinkInitializer == null) {
       this.synchronized {
         if (flinkInitializer == null) {
           flinkInitializer = new FlinkTableInitializer(args.args, ApiType.java)
           flinkInitializer.javaStreamEnvConfFunc = args.streamConfig
           flinkInitializer.javaTableEnvConfFunc = args.tableConfig
-          flinkInitializer.initTableEnv(TableMode.streaming)
+          flinkInitializer.initEnvironment(TableMode.streaming)
         }
       }
     }
@@ -105,7 +111,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
     if (localStreamTableEnv == null) {
       this.synchronized {
         if (localStreamTableEnv == null) {
-          initTableEnv(TableMode.streaming)
+          initEnvironment(TableMode.streaming)
         }
       }
     }
@@ -116,7 +122,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
     if (localTableEnv == null) {
       this.synchronized {
         if (localTableEnv == null) {
-          initTableEnv(TableMode.batch)
+          initEnvironment(TableMode.batch)
         }
       }
     }
@@ -156,7 +162,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
 
   }
 
-  def initTableEnv(tableMode: TableMode): Unit = {
+  def initEnvironment(tableMode: TableMode): Unit = {
     val builder = EnvironmentSettings.newInstance()
     val mode = Try(TableMode.withName(parameter.get(KEY_FLINK_TABLE_MODE))).getOrElse(tableMode)
     mode match {
@@ -187,7 +193,7 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
     tableMode match {
       case TableMode.batch => localTableEnv = TableEnvironment.create(setting)
       case TableMode.streaming =>
-        initStreamEnv()
+        initEnvironment()
         if (streamEnvConfFunc != null) {
           streamEnvConfFunc(streamEnvironment, parameter)
         }
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
index b00fc834a..c9a90496e 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/StreamTableContext.scala
@@ -32,7 +32,7 @@ class StreamTableContext(override val parameter: ParameterTool,
 
   def this(args: (ParameterTool, StreamExecutionEnvironment, StreamTableEnvironment)) = this(args._1, args._2, args._3)
 
-  def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initJavaStreamTable(args))
+  def this(args: StreamTableEnvConfig) = this(FlinkTableInitializer.initialize(args))
 
   override def fromDataStream[T](dataStream: DataStream[T], schema: Schema): Table = tableEnv.fromDataStream[T](dataStream, schema)
 
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index b8b1eee9b..e93c25894 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -28,7 +28,7 @@ class TableContext(override val parameter: ParameterTool,
 
   def this(args: (ParameterTool, TableEnvironment)) = this(args._1, args._2)
 
-  def this(args: TableEnvConfig) = this(FlinkTableInitializer.initJavaTable(args))
+  def this(args: TableEnvConfig) = this(FlinkTableInitializer.initialize(args))
 
   override def useModules(strings: String*): Unit = tableEnv.useModules(strings: _*)
 
diff --git a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index 7d9973191..28cbb92e0 100644
--- a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++ b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -17,19 +17,51 @@
 
 package org.apache.streampark.flink.cli
 
-import org.apache.streampark.flink.core.scala.FlinkStreamTable
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.configuration.ExecutionOptions
+import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_CONF, KEY_FLINK_SQL, KEY_FLINK_TABLE_MODE}
+import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
+import org.apache.streampark.flink.core.{SqlCommand, SqlCommandParser}
+import org.apache.streampark.flink.core.scala.{FlinkStreamTable, FlinkTable}
 
 import scala.language.implicitConversions
+import scala.util.Try
 
+object SqlClient extends App {
 
-object SqlClient extends FlinkStreamTable {
+  val parameterTool = ParameterTool.fromArgs(args)
 
-  override def handle(): Unit = context.sql()
+  val flinkSql = {
+    val sql = parameterTool.get(KEY_FLINK_SQL())
+    require(sql != null && sql.trim.nonEmpty, "Usage: flink sql cannot be null")
+    Try(DeflaterUtils.unzipString(sql)).getOrElse(throw new IllegalArgumentException("Usage: flink sql is invalid or null, please check"))
+  }
+
+  val sets = SqlCommandParser.parseSQL(flinkSql).filter(_.command == SqlCommand.SET)
+
+  val mode = sets.find(_.operands.head == ExecutionOptions.RUNTIME_MODE.key()) match {
+    case Some(e) => e.operands(1)
+    case None =>
+      val appConf = parameterTool.get(KEY_APP_CONF(), null)
+      val defaultMode = "streaming"
+      if (appConf == null) defaultMode else {
+        val parameter = PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(appConf.drop(7)))
+        parameter.getOrElse(KEY_FLINK_TABLE_MODE, defaultMode)
+      }
+  }
+
+  mode match {
+    case "batch" => BatchSqlApp.main(args)
+    case "streaming" => StreamSqlApp.main(args)
+    case _ => throw new IllegalArgumentException("Usage: runtime execution-mode invalid, optional [streaming|batch]")
+  }
+
+  private[this] object BatchSqlApp extends FlinkTable {
+    override def handle(): Unit = context.sql()
+  }
 
-  implicit def callback(message: String): Unit = {
-    // scalastyle:off println
-    println(message)
-    // scalastyle:on println
+  private[this] object StreamSqlApp extends FlinkStreamTable {
+    override def handle(): Unit = context.sql()
   }
 
 }