You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/10/31 01:37:22 UTC

[incubator-streampark] branch dev updated: [Feature] Unify flink configuration (#1858)

This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 00d5782ca [Feature] Unify flink configuration (#1858)
00d5782ca is described below

commit 00d5782ca3c53c5f10764628a75c8017e10f0b59
Author: benjobs <be...@apache.org>
AuthorDate: Mon Oct 31 09:37:16 2022 +0800

    [Feature] Unify flink configuration (#1858)
    
    * [Feature] Unify flink configuration
    
    * [Feature] Unify flink configuration
    
    * [Feature] Unify flink configuration
    
    * [Feature] Unify flink configuration
    
    * [Feature] Unify flink configuration
    
    * [Feature] Unify flink configuration
    
    * [Feature] Unify flink configuration
---
 .../streampark/common/conf/ConfigConst.scala       |  10 +-
 .../apache/streampark/common/util/FileUtils.scala  |  98 ++++++++++++++++++
 .../console/base/config/WebMvcConfig.java          |   5 +-
 .../interceptor/FileHeaderCheckInterceptor.java    |   1 +
 .../src/main/resources/flink-application.conf      | 109 +++++++++++----------
 .../flink/core/FlinkStreamingInitializer.scala     |  46 ++++-----
 .../flink/core/FlinkTableInitializer.scala         |  57 +++++++----
 .../flink/core/conf/FlinkConfiguration.scala       |  24 +++++
 .../streampark/flink/core/conf/ParameterCli.scala  |   6 +-
 .../flink/submit/bean/SubmitRequest.scala          |   4 +-
 10 files changed, 257 insertions(+), 103 deletions(-)

diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 2ece108a8..f25d7079c 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -108,9 +108,15 @@ object ConfigConst {
 
   def KEY_FLINK_PARALLELISM(prefix: String = null): String = if (prefix == null) "parallelism.default" else s"${prefix}parallelism.default"
 
-  val KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX = "flink.deployment.property."
+  val KEY_FLINK_OPTION_PREFIX = "flink.option."
 
-  val KEY_FLINK_DEPLOYMENT_OPTION_PREFIX = "flink.deployment.option."
+  val KEY_FLINK_PROPERTY_PREFIX = "flink.property."
+
+  val KEY_FLINK_TABLE_PREFIX = "flink.table."
+
+  val KEY_APP_PREFIX = "app."
+
+  val KEY_SQL_PREFIX = "sql."
 
   val KEY_FLINK_APP_NAME = "pipeline.name"
 
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 1bd68df2b..f8553f76b 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -23,6 +23,104 @@ import scala.collection.JavaConversions._
 
 object FileUtils extends org.apache.commons.io.FileUtils {
 
+  lazy val fileTypes: Map[String, String] = {
+    val maps = new util.HashMap[String, String]()
+    maps.put("FFD8FF", "jpg")
+    maps.put("89504E47", "png")
+    maps.put("47494638", "gif")
+    maps.put("49492A00227105008037", "tif")
+    maps.put("424D228C010000000000", "bmp")
+    maps.put("424D8240090000000000", "bmp")
+    maps.put("424D8E1B030000000000", "bmp")
+    maps.put("41433130313500000000", "dwg")
+
+    maps.put("68746D6C3E", "html")
+    maps.put("48544D4C207B0D0A0942", "css")
+    maps.put("696B2E71623D696B2E71", "js")
+    maps.put("7B5C727466315C616E73", "rtf")
+
+    maps.put("38425053000100000000", "psd")
+    maps.put("44656C69766572792D646174653A", "eml")
+    maps.put("D0CF11E0A1B11AE10000", "doc")
+
+
+    maps.put("D0CF11E0A1B11AE10000", "vsd")
+    maps.put("5374616E64617264204A", "mdb")
+    maps.put("252150532D41646F6265", "ps")
+    maps.put("255044462D312E", "pdf")
+    maps.put("75736167", "txt")
+
+    maps.put("2E524D46000000120001", "rmvb")
+    maps.put("464C5601050000000900", "flv")
+    maps.put("00000020667479706D70", "mp4")
+    maps.put("49443303000000002176", "mp3")
+    maps.put("000001B", "mpg")
+    maps.put("3026B2758E66CF11A6D9", "wmv")
+    maps.put("57415645", "wav")
+    maps.put("41564920", "avi")
+
+    maps.put("4D546864", "mid")
+    maps.put("504B0304", "zip")
+    maps.put("52617221", "rar")
+    maps.put("235468697320636F6E66", "ini")
+    maps.put("504B03040A000000", "jar")
+    maps.put("4D5A9000030000000400", "exe")
+
+    maps.put("3C25402070616765206C", "jsp")
+    maps.put("4D616E69666573742D56", "mf")
+    maps.put("3C3F786D6C", "xml")
+    maps.put("494E5345525420494E54", "sql")
+    maps.put("7061636B616765207765", "java")
+    maps.put("406563686F206F66660D", "bat")
+    maps.put("1F8B0800000000000000", "gz")
+    maps.put("6C6F67346A2E726F6F74", "properties")
+    maps.put("CAFEBABE0000002E0041", "class")
+    maps.put("49545346030000006000", "chm")
+    maps.put("04000000010000001300", "mxp")
+    maps.put("504B0304140006000800", "docx")
+    maps.put("D0CF11E0A1B11AE10000", "wps")
+    maps.put("6431303A637265617465", "torrent")
+
+    maps.put("6D6F6F76", "mov")
+    maps.put("FF575043", "wpd")
+    maps.put("CFAD12FEC5FD746F", "dbx")
+    maps.put("2142444E", "pst")
+    maps.put("AC9EBD8F", "qdf")
+    maps.put("E3828596", "pwl")
+    maps.put("2E7261FD", "ram")
+    maps.put("2E524D46", "rm")
+    maps.toMap
+  }
+
+  private[this] def bytesToHexString(src: Array[Byte]): String = {
+    val stringBuilder: StringBuilder = new StringBuilder
+    if (src == null || src.length <= 0) return null
+    for (i <- 0 until src.length) {
+      val v: Int = src(i) & 0xFF
+      val hv: String = Integer.toHexString(v).toUpperCase
+      if (hv.length < 2) {
+        stringBuilder.append(0)
+      }
+      stringBuilder.append(hv)
+    }
+    stringBuilder.toString
+  }
+
+  def getFileType(file: File): String = {
+    if (!file.exists || !file.isFile) {
+      throw new RuntimeException("The file does not exist or the path is a directory")
+    }
+    Utils.tryWithResource(new FileInputStream(file)) { in =>
+      val b = new Array[Byte](4)
+      in.read(b, 0, b.length)
+      val fileCode = bytesToHexString(b)
+      fileTypes.find(_._1.startsWith(fileCode)) match {
+        case Some(f) => f._2
+        case _ => null
+      }
+    }
+  }
+
   def createTempDir(): File = {
     val TEMP_DIR_ATTEMPTS = 10000
     val baseDir = new File(System.getProperty("java.io.tmpdir"))
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/WebMvcConfig.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/WebMvcConfig.java
index 80e51b105..2d31e5f89 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/WebMvcConfig.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/WebMvcConfig.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.module.SimpleModule;
 import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.http.converter.ByteArrayHttpMessageConverter;
@@ -35,15 +36,13 @@ import org.springframework.web.servlet.config.annotation.CorsRegistry;
 import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
 import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
 
-import javax.annotation.Resource;
-
 import java.text.SimpleDateFormat;
 import java.util.List;
 
 @Configuration
 public class WebMvcConfig implements WebMvcConfigurer {
 
-    @Resource
+    @Autowired
     private FileHeaderCheckInterceptor fileHeaderCheckInterceptor;
 
     @Override
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/interceptor/FileHeaderCheckInterceptor.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/interceptor/FileHeaderCheckInterceptor.java
index 9a6d3ac3d..7c847352a 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/interceptor/FileHeaderCheckInterceptor.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/interceptor/FileHeaderCheckInterceptor.java
@@ -39,6 +39,7 @@ public class FileHeaderCheckInterceptor implements HandlerInterceptor {
 
     private static List<String> fileHeaders = new ArrayList<>();
     private int headerLength = 8;
+
     static {
         fileHeaders.add(FileType.JAR.getMagicNumber());
     }
diff --git a/streampark-console/streampark-console-service/src/main/resources/flink-application.conf b/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
index bde9400de..e195bc8ad 100644
--- a/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
+++ b/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
@@ -16,59 +16,60 @@
 #
 
 flink:
-  deployment:
-    property: #@see: https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
-      $internal.application.main:
-      pipeline.name:
-      yarn.application.queue:
-      taskmanager.numberOfTaskSlots: 1
-      parallelism.default: 2
-      jobmanager.memory:
-        flink.size:
-        heap.size:
-        jvm-metaspace.size:
-        jvm-overhead.max:
-        off-heap.size:
-        process.size:
-      taskmanager.memory:
-        flink.size:
-        framework.heap.size:
-        framework.off-heap.size:
-        managed.size:
-        process.size:
-        task.heap.size:
-        task.off-heap.size:
-        jvm-metaspace.size:
-        jvm-overhead.max:
-        jvm-overhead.min:
-        managed.fraction: 0.4
-      pipeline:
-        auto-watermark-interval: 200ms
-      # checkpoint
-      execution:
-        checkpointing:
-          mode: EXACTLY_ONCE
-          interval: 30s
-          timeout: 10min
-          unaligned: false
-          externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
-      # state backend
-      state:
-        backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
-        backend.incremental: true
-        checkpoint-storage: filesystem
-        savepoints.dir: file:///tmp/chkdir
-        checkpoints.dir: file:///tmp/chkdir
-      # restart strategy
-      restart-strategy: fixed-delay  # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
-      restart-strategy.fixed-delay:
-        attempts: 3
-        delay: 5000
-      restart-strategy.failure-rate:
-        max-failures-per-interval:
-        failure-rate-interval:
-        delay:
+  property: #@see: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/
+    $internal.application.main:
+    pipeline.name:
+    yarn.application.queue:
+    taskmanager.numberOfTaskSlots: 1
+    parallelism.default: 2
+    jobmanager.memory:
+      flink.size:
+      heap.size:
+      jvm-metaspace.size:
+      jvm-overhead.max:
+      off-heap.size:
+      process.size:
+    taskmanager.memory:
+      flink.size:
+      framework.heap.size:
+      framework.off-heap.size:
+      managed.size:
+      process.size:
+      task.heap.size:
+      task.off-heap.size:
+      jvm-metaspace.size:
+      jvm-overhead.max:
+      jvm-overhead.min:
+      managed.fraction: 0.4
+    pipeline:
+      auto-watermark-interval: 200ms
+    # checkpoint
+    execution:
+      checkpointing:
+        mode: EXACTLY_ONCE
+        interval: 30s
+        timeout: 10min
+        unaligned: false
+        externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
+    # state backend
+    state:
+      backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
+      backend.incremental: true
+      checkpoint-storage: filesystem
+      savepoints.dir: file:///tmp/chkdir
+      checkpoints.dir: file:///tmp/chkdir
+    # restart strategy
+    restart-strategy: fixed-delay  # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
+    restart-strategy.fixed-delay:
+      attempts: 3
+      delay: 5000
+    restart-strategy.failure-rate:
+      max-failures-per-interval:
+      failure-rate-interval:
+      delay:
   # table
   table:
-    planner: blink # (blink|old|any)
-    mode: streaming #(batch|streaming)
+    table.local-time-zone: default # @see https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/
+
+app: # user's parameter
+  #$key: $value
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 472a57513..9dfdc33a3 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
 import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamEnv}
 import org.apache.flink.table.api.TableConfig
+import org.apache.streampark.flink.core.conf.FlinkConfiguration
 
 import java.io.File
 import collection.JavaConversions._
@@ -45,7 +46,7 @@ private[flink] object FlinkStreamingInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
+    (flinkInitializer.configuration.parameter, flinkInitializer.streamEnvironment)
   }
 
   def initialize(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
@@ -58,7 +59,7 @@ private[flink] object FlinkStreamingInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
+    (flinkInitializer.configuration.parameter, flinkInitializer.streamEnvironment)
   }
 }
 
@@ -75,9 +76,9 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
 
   private[this] var localStreamEnv: StreamExecutionEnvironment = _
 
-  lazy val (parameter: ParameterTool, flinkConf: Configuration) = initParameter()
+  lazy val configuration: FlinkConfiguration = initParameter()
 
-  def initParameter(): (ParameterTool, Configuration) = {
+  def initParameter(): FlinkConfiguration = {
     val argsMap = ParameterTool.fromArgs(args)
     val config = argsMap.get(KEY_APP_CONF(), null) match {
       // scalastyle:off throwerror
@@ -85,14 +86,18 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
       // scalastyle:on throwerror
       case file => file
     }
-    val (userConf, flinkConf) = parseConfig(config)
+    val configMap = parseConfig(config)
+    val properConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
+    val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
     // config priority: explicitly specified priority > project profiles > system profiles
-    ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(userConf)).mergeWith(argsMap) -> Configuration.fromMap(flinkConf)
+    val parameter = ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(appConf)).mergeWith(argsMap)
+    val envConfig = Configuration.fromMap(properConf)
+    FlinkConfiguration(parameter, envConfig, null)
   }
 
-  def parseConfig(config: String): (Map[String, String], Map[String, String]) = {
+  def parseConfig(config: String): Map[String, String] = {
     val extension = config.split("\\.").last.toLowerCase
-    val configMap = config match {
+    config match {
       case x if x.startsWith("yaml://") =>
         PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.drop(7)))
       case x if x.startsWith("prop://") =>
@@ -117,17 +122,14 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
           case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be properties or yml")
         }
     }
-    val appConf = mutable.Map[String, String]()
-    val flinkConf = mutable.Map[String, String]()
-    configMap.foreach(x => {
-      if (x._1.startsWith(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX)) {
-        flinkConf += x._1.replace(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX, "") -> x._2
-      }
-      if (!x._1.startsWith(KEY_FLINK_DEPLOYMENT_OPTION_PREFIX)) {
-        appConf += x._1.replace(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX, "") -> x._2
-      }
+  }
+
+  def extractConfigByPrefix(configMap: Map[String, String], prefix: String): Map[String, String] = {
+    val map = mutable.Map[String, String]()
+    configMap.foreach(x => if (x._1.startsWith(prefix)) {
+      map += x._1.drop(prefix.length) -> x._2
     })
-    appConf -> flinkConf
+    map
   }
 
   def streamEnvironment: StreamExecutionEnvironment = {
@@ -142,14 +144,14 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
   }
 
   def initEnvironment(): Unit = {
-    localStreamEnv = new StreamExecutionEnvironment(JavaStreamEnv.getExecutionEnvironment(flinkConf))
+    localStreamEnv = new StreamExecutionEnvironment(JavaStreamEnv.getExecutionEnvironment(configuration.envConfig))
 
     apiType match {
-      case ApiType.java if javaStreamEnvConfFunc != null => javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
-      case ApiType.scala if streamEnvConfFunc != null => streamEnvConfFunc(localStreamEnv, parameter)
+      case ApiType.java if javaStreamEnvConfFunc != null => javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, configuration.parameter)
+      case ApiType.scala if streamEnvConfFunc != null => streamEnvConfFunc(localStreamEnv, configuration.parameter)
       case _ =>
     }
-    localStreamEnv.getConfig.setGlobalJobParameters(parameter)
+    localStreamEnv.getConfig.setGlobalJobParameters(configuration.parameter)
   }
 
 }
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index ca42d257f..48b90a149 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -26,10 +26,11 @@ import org.apache.streampark.common.enums.ApiType.ApiType
 import org.apache.streampark.common.enums.TableMode.TableMode
 import org.apache.streampark.common.enums.{ApiType, PlannerType, TableMode}
 import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
+import org.apache.streampark.flink.core.conf.FlinkConfiguration
 
 import java.io.File
 import scala.collection.JavaConversions._
-import scala.collection.Map
+import scala.collection.{Map, mutable}
 import scala.util.{Failure, Success, Try}
 
 private[flink] object FlinkTableInitializer {
@@ -48,7 +49,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+    (flinkInitializer.configuration.parameter, flinkInitializer.tableEnvironment)
   }
 
   def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
@@ -61,7 +62,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+    (flinkInitializer.configuration.parameter, flinkInitializer.tableEnvironment)
   }
 
   def initialize(args: Array[String],
@@ -78,7 +79,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment, flinkInitializer.streamTableEnvironment)
+    (flinkInitializer.configuration.parameter, flinkInitializer.streamEnvironment, flinkInitializer.streamTableEnvironment)
   }
 
   def initialize(args: StreamTableEnvConfig):
@@ -93,7 +94,7 @@ private[flink] object FlinkTableInitializer {
         }
       }
     }
-    (flinkInitializer.parameter, flinkInitializer.streamEnvironment, flinkInitializer.streamTableEnvironment)
+    (flinkInitializer.configuration.parameter, flinkInitializer.streamEnvironment, flinkInitializer.streamTableEnvironment)
   }
 
 }
@@ -131,42 +132,64 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
    * In case of table SQL, the parameter conf is not required, it depends on the developer.
    */
 
-  override def initParameter(): (ParameterTool, Configuration) = {
-    val (appParameter: ParameterTool, flinkConf: Configuration) = {
+  override def initParameter(): FlinkConfiguration = {
+    val configuration = {
       val argsMap = ParameterTool.fromArgs(args)
       argsMap.get(KEY_APP_CONF(), null) match {
         case null | "" =>
           logWarn("Usage:can't fond config,you can set \"--conf $path \" in main arguments")
-          ParameterTool.fromSystemProperties().mergeWith(argsMap) -> new Configuration()
+          val parameter = ParameterTool.fromSystemProperties().mergeWith(argsMap)
+          FlinkConfiguration(parameter, new Configuration(), new Configuration())
         case file =>
-          val (userConf, flinkConf) = super.parseConfig(file)
+          val configMap = parseConfig(file)
+          // set sql..
+          val sqlConf = mutable.Map[String, String]()
+          configMap.foreach(x => {
+            if (x._1.startsWith(KEY_SQL_PREFIX)) {
+              sqlConf += x._1.drop(KEY_SQL_PREFIX.length) -> x._2
+            }
+          })
+
           // config priority: explicitly specified priority > project profiles > system profiles
-          ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(userConf)).mergeWith(argsMap) -> Configuration.fromMap(flinkConf)
+          val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
+          val parameter = ParameterTool.fromSystemProperties()
+            .mergeWith(ParameterTool.fromMap(appConf))
+            .mergeWith(ParameterTool.fromMap(sqlConf))
+            .mergeWith(argsMap)
+
+          val properConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
+          val envConfig = Configuration.fromMap(properConf)
+
+          val tableConf = extractConfigByPrefix(configMap, KEY_FLINK_TABLE_PREFIX)
+          val tableConfig = Configuration.fromMap(tableConf)
+
+          FlinkConfiguration(parameter, envConfig, tableConfig)
       }
     }
 
-    val appParam = appParameter.get(KEY_FLINK_SQL()) match {
-      case null => appParameter
+    configuration.parameter.get(KEY_FLINK_SQL()) match {
+      case null => configuration
       case param =>
         // for streampark-console
         Try(DeflaterUtils.unzipString(param)) match {
-          case Success(value) => appParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
+          case Success(value) =>
+            configuration.copy(parameter = configuration.parameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value))))
           case Failure(_) =>
             val sqlFile = new File(param)
             Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
-              case Success(value) => appParameter.mergeWith(ParameterTool.fromMap(value))
+              case Success(value) =>
+                configuration.copy(parameter = configuration.parameter.mergeWith(ParameterTool.fromMap(value)))
               case Failure(e) =>
                 new IllegalArgumentException(s"[StreamPark] init sql error.$e")
-                appParameter
+                configuration
             }
         }
     }
-
-    appParam -> flinkConf
   }
 
   def initEnvironment(tableMode: TableMode): Unit = {
     val builder = EnvironmentSettings.newInstance()
+    val parameter = configuration.parameter
     val plannerType = Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse(PlannerType.blink)
 
     try {
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala
new file mode 100644
index 000000000..ae6240fea
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.core.conf
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.configuration.Configuration
+
+case class FlinkConfiguration(parameter: ParameterTool,
+                              envConfig: Configuration,
+                              tableConfig: Configuration)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
index 40b9e87cd..1d2239560 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
@@ -17,7 +17,7 @@
 package org.apache.streampark.flink.core.conf
 
 import org.apache.streampark.common.conf.ConfigConst
-import org.apache.streampark.common.conf.ConfigConst.{KEY_FLINK_DEPLOYMENT_OPTION_PREFIX, KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX}
+import org.apache.streampark.common.conf.ConfigConst.{KEY_FLINK_OPTION_PREFIX, KEY_FLINK_PROPERTY_PREFIX}
 import org.apache.streampark.common.util.PropertiesUtils
 import org.apache.commons.cli.{DefaultParser, Options}
 
@@ -28,8 +28,8 @@ import scala.util.{Failure, Success, Try}
 
 object ParameterCli {
 
-  private[this] val propertyPrefix = KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX
-  private[this] val optionPrefix = KEY_FLINK_DEPLOYMENT_OPTION_PREFIX
+  private[this] val propertyPrefix = KEY_FLINK_PROPERTY_PREFIX
+  private[this] val optionPrefix = KEY_FLINK_OPTION_PREFIX
   private[this] val optionMain = s"$propertyPrefix$$internal.application.main"
 
   lazy val flinkOptions: Options = FlinkRunOption.allOptions
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index a37147f70..ca79b68dc 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -61,9 +61,9 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
                          @Nullable k8sSubmitParam: KubernetesSubmitParam,
                          @Nullable extraParameter: JavaMap[String, Any]) {
 
-  lazy val appProperties: Map[String, String] = getParameterMap(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX)
+  lazy val appProperties: Map[String, String] = getParameterMap(KEY_FLINK_PROPERTY_PREFIX)
 
-  lazy val appOption: Map[String, String] = getParameterMap(KEY_FLINK_DEPLOYMENT_OPTION_PREFIX)
+  lazy val appOption: Map[String, String] = getParameterMap(KEY_FLINK_OPTION_PREFIX)
 
   lazy val appMain: String = this.developmentMode match {
     case DevelopmentMode.FLINKSQL => ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS