You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/10/31 01:37:22 UTC
[incubator-streampark] branch dev updated: [Feature] Unify flink configuration (#1858)
This is an automated email from the ASF dual-hosted git repository.
monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 00d5782ca [Feature] Unify flink configuration (#1858)
00d5782ca is described below
commit 00d5782ca3c53c5f10764628a75c8017e10f0b59
Author: benjobs <be...@apache.org>
AuthorDate: Mon Oct 31 09:37:16 2022 +0800
[Feature] Unify flink configuration (#1858)
* [Feature] Unify flink configuration
* [Feature] Unify flink configuration
* [Feature] Unify flink configuration
* [Feature] Unify flink configuration
* [Feature] Unify flink configuration
* [Feature] Unify flink configuration
* [Feature] Unify flink configuration
---
.../streampark/common/conf/ConfigConst.scala | 10 +-
.../apache/streampark/common/util/FileUtils.scala | 98 ++++++++++++++++++
.../console/base/config/WebMvcConfig.java | 5 +-
.../interceptor/FileHeaderCheckInterceptor.java | 1 +
.../src/main/resources/flink-application.conf | 109 +++++++++++----------
.../flink/core/FlinkStreamingInitializer.scala | 46 ++++-----
.../flink/core/FlinkTableInitializer.scala | 57 +++++++----
.../flink/core/conf/FlinkConfiguration.scala | 24 +++++
.../streampark/flink/core/conf/ParameterCli.scala | 6 +-
.../flink/submit/bean/SubmitRequest.scala | 4 +-
10 files changed, 257 insertions(+), 103 deletions(-)
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 2ece108a8..f25d7079c 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -108,9 +108,15 @@ object ConfigConst {
def KEY_FLINK_PARALLELISM(prefix: String = null): String = if (prefix == null) "parallelism.default" else s"${prefix}parallelism.default"
- val KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX = "flink.deployment.property."
+ val KEY_FLINK_OPTION_PREFIX = "flink.option."
- val KEY_FLINK_DEPLOYMENT_OPTION_PREFIX = "flink.deployment.option."
+ val KEY_FLINK_PROPERTY_PREFIX = "flink.property."
+
+ val KEY_FLINK_TABLE_PREFIX = "flink.table."
+
+ val KEY_APP_PREFIX = "app."
+
+ val KEY_SQL_PREFIX = "sql."
val KEY_FLINK_APP_NAME = "pipeline.name"
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
index 1bd68df2b..f8553f76b 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/FileUtils.scala
@@ -23,6 +23,104 @@ import scala.collection.JavaConversions._
object FileUtils extends org.apache.commons.io.FileUtils {
+ lazy val fileTypes: Map[String, String] = {
+ val maps = new util.HashMap[String, String]()
+ maps.put("FFD8FF", "jpg")
+ maps.put("89504E47", "png")
+ maps.put("47494638", "gif")
+ maps.put("49492A00227105008037", "tif")
+ maps.put("424D228C010000000000", "bmp")
+ maps.put("424D8240090000000000", "bmp")
+ maps.put("424D8E1B030000000000", "bmp")
+ maps.put("41433130313500000000", "dwg")
+
+ maps.put("68746D6C3E", "html")
+ maps.put("48544D4C207B0D0A0942", "css")
+ maps.put("696B2E71623D696B2E71", "js")
+ maps.put("7B5C727466315C616E73", "rtf")
+
+ maps.put("38425053000100000000", "psd")
+ maps.put("44656C69766572792D646174653A", "eml")
+ maps.put("D0CF11E0A1B11AE10000", "doc")
+
+
+ maps.put("D0CF11E0A1B11AE10000", "vsd")
+ maps.put("5374616E64617264204A", "mdb")
+ maps.put("252150532D41646F6265", "ps")
+ maps.put("255044462D312E", "pdf")
+ maps.put("75736167", "txt")
+
+ maps.put("2E524D46000000120001", "rmvb")
+ maps.put("464C5601050000000900", "flv")
+ maps.put("00000020667479706D70", "mp4")
+ maps.put("49443303000000002176", "mp3")
+ maps.put("000001B", "mpg")
+ maps.put("3026B2758E66CF11A6D9", "wmv")
+ maps.put("57415645", "wav")
+ maps.put("41564920", "avi")
+
+ maps.put("4D546864", "mid")
+ maps.put("504B0304", "zip")
+ maps.put("52617221", "rar")
+ maps.put("235468697320636F6E66", "ini")
+ maps.put("504B03040A000000", "jar")
+ maps.put("4D5A9000030000000400", "exe")
+
+ maps.put("3C25402070616765206C", "jsp")
+ maps.put("4D616E69666573742D56", "mf")
+ maps.put("3C3F786D6C", "xml")
+ maps.put("494E5345525420494E54", "sql")
+ maps.put("7061636B616765207765", "java")
+ maps.put("406563686F206F66660D", "bat")
+ maps.put("1F8B0800000000000000", "gz")
+ maps.put("6C6F67346A2E726F6F74", "properties")
+ maps.put("CAFEBABE0000002E0041", "class")
+ maps.put("49545346030000006000", "chm")
+ maps.put("04000000010000001300", "mxp")
+ maps.put("504B0304140006000800", "docx")
+ maps.put("D0CF11E0A1B11AE10000", "wps")
+ maps.put("6431303A637265617465", "torrent")
+
+ maps.put("6D6F6F76", "mov")
+ maps.put("FF575043", "wpd")
+ maps.put("CFAD12FEC5FD746F", "dbx")
+ maps.put("2142444E", "pst")
+ maps.put("AC9EBD8F", "qdf")
+ maps.put("E3828596", "pwl")
+ maps.put("2E7261FD", "ram")
+ maps.put("2E524D46", "rm")
+ maps.toMap
+ }
+
+ private[this] def bytesToHexString(src: Array[Byte]): String = {
+ val stringBuilder: StringBuilder = new StringBuilder
+ if (src == null || src.length <= 0) return null
+ for (i <- 0 until src.length) {
+ val v: Int = src(i) & 0xFF
+ val hv: String = Integer.toHexString(v).toUpperCase
+ if (hv.length < 2) {
+ stringBuilder.append(0)
+ }
+ stringBuilder.append(hv)
+ }
+ stringBuilder.toString
+ }
+
+ def getFileType(file: File): String = {
+ if (!file.exists || !file.isFile) {
+ throw new RuntimeException("The file does not exist or the path is a directory")
+ }
+ Utils.tryWithResource(new FileInputStream(file)) { in =>
+ val b = new Array[Byte](4)
+ in.read(b, 0, b.length)
+ val fileCode = bytesToHexString(b)
+ fileTypes.find(_._1.startsWith(fileCode)) match {
+ case Some(f) => f._2
+ case _ => null
+ }
+ }
+ }
+
def createTempDir(): File = {
val TEMP_DIR_ATTEMPTS = 10000
val baseDir = new File(System.getProperty("java.io.tmpdir"))
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/WebMvcConfig.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/WebMvcConfig.java
index 80e51b105..2d31e5f89 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/WebMvcConfig.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/config/WebMvcConfig.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.converter.ByteArrayHttpMessageConverter;
@@ -35,15 +36,13 @@ import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
-import javax.annotation.Resource;
-
import java.text.SimpleDateFormat;
import java.util.List;
@Configuration
public class WebMvcConfig implements WebMvcConfigurer {
- @Resource
+ @Autowired
private FileHeaderCheckInterceptor fileHeaderCheckInterceptor;
@Override
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/interceptor/FileHeaderCheckInterceptor.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/interceptor/FileHeaderCheckInterceptor.java
index 9a6d3ac3d..7c847352a 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/interceptor/FileHeaderCheckInterceptor.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/interceptor/FileHeaderCheckInterceptor.java
@@ -39,6 +39,7 @@ public class FileHeaderCheckInterceptor implements HandlerInterceptor {
private static List<String> fileHeaders = new ArrayList<>();
private int headerLength = 8;
+
static {
fileHeaders.add(FileType.JAR.getMagicNumber());
}
diff --git a/streampark-console/streampark-console-service/src/main/resources/flink-application.conf b/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
index bde9400de..e195bc8ad 100644
--- a/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
+++ b/streampark-console/streampark-console-service/src/main/resources/flink-application.conf
@@ -16,59 +16,60 @@
#
flink:
- deployment:
- property: #@see: https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
- $internal.application.main:
- pipeline.name:
- yarn.application.queue:
- taskmanager.numberOfTaskSlots: 1
- parallelism.default: 2
- jobmanager.memory:
- flink.size:
- heap.size:
- jvm-metaspace.size:
- jvm-overhead.max:
- off-heap.size:
- process.size:
- taskmanager.memory:
- flink.size:
- framework.heap.size:
- framework.off-heap.size:
- managed.size:
- process.size:
- task.heap.size:
- task.off-heap.size:
- jvm-metaspace.size:
- jvm-overhead.max:
- jvm-overhead.min:
- managed.fraction: 0.4
- pipeline:
- auto-watermark-interval: 200ms
- # checkpoint
- execution:
- checkpointing:
- mode: EXACTLY_ONCE
- interval: 30s
- timeout: 10min
- unaligned: false
- externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
- # state backend
- state:
- backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
- backend.incremental: true
- checkpoint-storage: filesystem
- savepoints.dir: file:///tmp/chkdir
- checkpoints.dir: file:///tmp/chkdir
- # restart strategy
- restart-strategy: fixed-delay # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
- restart-strategy.fixed-delay:
- attempts: 3
- delay: 5000
- restart-strategy.failure-rate:
- max-failures-per-interval:
- failure-rate-interval:
- delay:
+ property: #@see: https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/
+ $internal.application.main:
+ pipeline.name:
+ yarn.application.queue:
+ taskmanager.numberOfTaskSlots: 1
+ parallelism.default: 2
+ jobmanager.memory:
+ flink.size:
+ heap.size:
+ jvm-metaspace.size:
+ jvm-overhead.max:
+ off-heap.size:
+ process.size:
+ taskmanager.memory:
+ flink.size:
+ framework.heap.size:
+ framework.off-heap.size:
+ managed.size:
+ process.size:
+ task.heap.size:
+ task.off-heap.size:
+ jvm-metaspace.size:
+ jvm-overhead.max:
+ jvm-overhead.min:
+ managed.fraction: 0.4
+ pipeline:
+ auto-watermark-interval: 200ms
+ # checkpoint
+ execution:
+ checkpointing:
+ mode: EXACTLY_ONCE
+ interval: 30s
+ timeout: 10min
+ unaligned: false
+ externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
+ # state backend
+ state:
+ backend: hashmap # Special note: flink1.12 optional configuration ('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration ('hashmap', 'rocksdb'),
+ backend.incremental: true
+ checkpoint-storage: filesystem
+ savepoints.dir: file:///tmp/chkdir
+ checkpoints.dir: file:///tmp/chkdir
+ # restart strategy
+ restart-strategy: fixed-delay # Restart strategy [(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
+ restart-strategy.fixed-delay:
+ attempts: 3
+ delay: 5000
+ restart-strategy.failure-rate:
+ max-failures-per-interval:
+ failure-rate-interval:
+ delay:
# table
table:
- planner: blink # (blink|old|any)
- mode: streaming #(batch|streaming)
+ table.local-time-zone: default # @see https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/
+
+app: # user's parameter
+ #$key: $value
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 472a57513..9dfdc33a3 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.environment.{StreamExecutionEnvironment => JavaStreamEnv}
import org.apache.flink.table.api.TableConfig
+import org.apache.streampark.flink.core.conf.FlinkConfiguration
import java.io.File
import collection.JavaConversions._
@@ -45,7 +46,7 @@ private[flink] object FlinkStreamingInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
+ (flinkInitializer.configuration.parameter, flinkInitializer.streamEnvironment)
}
def initialize(args: StreamEnvConfig): (ParameterTool, StreamExecutionEnvironment) = {
@@ -58,7 +59,7 @@ private[flink] object FlinkStreamingInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment)
+ (flinkInitializer.configuration.parameter, flinkInitializer.streamEnvironment)
}
}
@@ -75,9 +76,9 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
private[this] var localStreamEnv: StreamExecutionEnvironment = _
- lazy val (parameter: ParameterTool, flinkConf: Configuration) = initParameter()
+ lazy val configuration: FlinkConfiguration = initParameter()
- def initParameter(): (ParameterTool, Configuration) = {
+ def initParameter(): FlinkConfiguration = {
val argsMap = ParameterTool.fromArgs(args)
val config = argsMap.get(KEY_APP_CONF(), null) match {
// scalastyle:off throwerror
@@ -85,14 +86,18 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
// scalastyle:on throwerror
case file => file
}
- val (userConf, flinkConf) = parseConfig(config)
+ val configMap = parseConfig(config)
+ val properConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
+ val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
// config priority: explicitly specified priority > project profiles > system profiles
- ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(userConf)).mergeWith(argsMap) -> Configuration.fromMap(flinkConf)
+ val parameter = ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(appConf)).mergeWith(argsMap)
+ val envConfig = Configuration.fromMap(properConf)
+ FlinkConfiguration(parameter, envConfig, null)
}
- def parseConfig(config: String): (Map[String, String], Map[String, String]) = {
+ def parseConfig(config: String): Map[String, String] = {
val extension = config.split("\\.").last.toLowerCase
- val configMap = config match {
+ config match {
case x if x.startsWith("yaml://") =>
PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.drop(7)))
case x if x.startsWith("prop://") =>
@@ -117,17 +122,14 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be properties or yml")
}
}
- val appConf = mutable.Map[String, String]()
- val flinkConf = mutable.Map[String, String]()
- configMap.foreach(x => {
- if (x._1.startsWith(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX)) {
- flinkConf += x._1.replace(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX, "") -> x._2
- }
- if (!x._1.startsWith(KEY_FLINK_DEPLOYMENT_OPTION_PREFIX)) {
- appConf += x._1.replace(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX, "") -> x._2
- }
+ }
+
+ def extractConfigByPrefix(configMap: Map[String, String], prefix: String): Map[String, String] = {
+ val map = mutable.Map[String, String]()
+ configMap.foreach(x => if (x._1.startsWith(prefix)) {
+ map += x._1.drop(prefix.length) -> x._2
})
- appConf -> flinkConf
+ map
}
def streamEnvironment: StreamExecutionEnvironment = {
@@ -142,14 +144,14 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
}
def initEnvironment(): Unit = {
- localStreamEnv = new StreamExecutionEnvironment(JavaStreamEnv.getExecutionEnvironment(flinkConf))
+ localStreamEnv = new StreamExecutionEnvironment(JavaStreamEnv.getExecutionEnvironment(configuration.envConfig))
apiType match {
- case ApiType.java if javaStreamEnvConfFunc != null => javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, parameter)
- case ApiType.scala if streamEnvConfFunc != null => streamEnvConfFunc(localStreamEnv, parameter)
+ case ApiType.java if javaStreamEnvConfFunc != null => javaStreamEnvConfFunc.configuration(localStreamEnv.getJavaEnv, configuration.parameter)
+ case ApiType.scala if streamEnvConfFunc != null => streamEnvConfFunc(localStreamEnv, configuration.parameter)
case _ =>
}
- localStreamEnv.getConfig.setGlobalJobParameters(parameter)
+ localStreamEnv.getConfig.setGlobalJobParameters(configuration.parameter)
}
}
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index ca42d257f..48b90a149 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -26,10 +26,11 @@ import org.apache.streampark.common.enums.ApiType.ApiType
import org.apache.streampark.common.enums.TableMode.TableMode
import org.apache.streampark.common.enums.{ApiType, PlannerType, TableMode}
import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
+import org.apache.streampark.flink.core.conf.FlinkConfiguration
import java.io.File
import scala.collection.JavaConversions._
-import scala.collection.Map
+import scala.collection.{Map, mutable}
import scala.util.{Failure, Success, Try}
private[flink] object FlinkTableInitializer {
@@ -48,7 +49,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+ (flinkInitializer.configuration.parameter, flinkInitializer.tableEnvironment)
}
def initialize(args: TableEnvConfig): (ParameterTool, TableEnvironment) = {
@@ -61,7 +62,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.tableEnvironment)
+ (flinkInitializer.configuration.parameter, flinkInitializer.tableEnvironment)
}
def initialize(args: Array[String],
@@ -78,7 +79,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment, flinkInitializer.streamTableEnvironment)
+ (flinkInitializer.configuration.parameter, flinkInitializer.streamEnvironment, flinkInitializer.streamTableEnvironment)
}
def initialize(args: StreamTableEnvConfig):
@@ -93,7 +94,7 @@ private[flink] object FlinkTableInitializer {
}
}
}
- (flinkInitializer.parameter, flinkInitializer.streamEnvironment, flinkInitializer.streamTableEnvironment)
+ (flinkInitializer.configuration.parameter, flinkInitializer.streamEnvironment, flinkInitializer.streamTableEnvironment)
}
}
@@ -131,42 +132,64 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
* In case of table SQL, the parameter conf is not required, it depends on the developer.
*/
- override def initParameter(): (ParameterTool, Configuration) = {
- val (appParameter: ParameterTool, flinkConf: Configuration) = {
+ override def initParameter(): FlinkConfiguration = {
+ val configuration = {
val argsMap = ParameterTool.fromArgs(args)
argsMap.get(KEY_APP_CONF(), null) match {
case null | "" =>
logWarn("Usage:can't fond config,you can set \"--conf $path \" in main arguments")
- ParameterTool.fromSystemProperties().mergeWith(argsMap) -> new Configuration()
+ val parameter = ParameterTool.fromSystemProperties().mergeWith(argsMap)
+ FlinkConfiguration(parameter, new Configuration(), new Configuration())
case file =>
- val (userConf, flinkConf) = super.parseConfig(file)
+ val configMap = parseConfig(file)
+ // set sql..
+ val sqlConf = mutable.Map[String, String]()
+ configMap.foreach(x => {
+ if (x._1.startsWith(KEY_SQL_PREFIX)) {
+ sqlConf += x._1.drop(KEY_SQL_PREFIX.length) -> x._2
+ }
+ })
+
// config priority: explicitly specified priority > project profiles > system profiles
- ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(userConf)).mergeWith(argsMap) -> Configuration.fromMap(flinkConf)
+ val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
+ val parameter = ParameterTool.fromSystemProperties()
+ .mergeWith(ParameterTool.fromMap(appConf))
+ .mergeWith(ParameterTool.fromMap(sqlConf))
+ .mergeWith(argsMap)
+
+ val properConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
+ val envConfig = Configuration.fromMap(properConf)
+
+ val tableConf = extractConfigByPrefix(configMap, KEY_FLINK_TABLE_PREFIX)
+ val tableConfig = Configuration.fromMap(tableConf)
+
+ FlinkConfiguration(parameter, envConfig, tableConfig)
}
}
- val appParam = appParameter.get(KEY_FLINK_SQL()) match {
- case null => appParameter
+ configuration.parameter.get(KEY_FLINK_SQL()) match {
+ case null => configuration
case param =>
// for streampark-console
Try(DeflaterUtils.unzipString(param)) match {
- case Success(value) => appParameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value)))
+ case Success(value) =>
+ configuration.copy(parameter = configuration.parameter.mergeWith(ParameterTool.fromMap(Map(KEY_FLINK_SQL() -> value))))
case Failure(_) =>
val sqlFile = new File(param)
Try(PropertiesUtils.fromYamlFile(sqlFile.getAbsolutePath)) match {
- case Success(value) => appParameter.mergeWith(ParameterTool.fromMap(value))
+ case Success(value) =>
+ configuration.copy(parameter = configuration.parameter.mergeWith(ParameterTool.fromMap(value)))
case Failure(e) =>
new IllegalArgumentException(s"[StreamPark] init sql error.$e")
- appParameter
+ configuration
}
}
}
-
- appParam -> flinkConf
}
def initEnvironment(tableMode: TableMode): Unit = {
val builder = EnvironmentSettings.newInstance()
+ val parameter = configuration.parameter
val plannerType = Try(PlannerType.withName(parameter.get(KEY_FLINK_TABLE_PLANNER))).getOrElse(PlannerType.blink)
try {
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala
new file mode 100644
index 000000000..ae6240fea
--- /dev/null
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/FlinkConfiguration.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.core.conf
+
+import org.apache.flink.api.java.utils.ParameterTool
+import org.apache.flink.configuration.Configuration
+
+case class FlinkConfiguration(parameter: ParameterTool,
+ envConfig: Configuration,
+ tableConfig: Configuration)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
index 40b9e87cd..1d2239560 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.core.conf
import org.apache.streampark.common.conf.ConfigConst
-import org.apache.streampark.common.conf.ConfigConst.{KEY_FLINK_DEPLOYMENT_OPTION_PREFIX, KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX}
+import org.apache.streampark.common.conf.ConfigConst.{KEY_FLINK_OPTION_PREFIX, KEY_FLINK_PROPERTY_PREFIX}
import org.apache.streampark.common.util.PropertiesUtils
import org.apache.commons.cli.{DefaultParser, Options}
@@ -28,8 +28,8 @@ import scala.util.{Failure, Success, Try}
object ParameterCli {
- private[this] val propertyPrefix = KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX
- private[this] val optionPrefix = KEY_FLINK_DEPLOYMENT_OPTION_PREFIX
+ private[this] val propertyPrefix = KEY_FLINK_PROPERTY_PREFIX
+ private[this] val optionPrefix = KEY_FLINK_OPTION_PREFIX
private[this] val optionMain = s"$propertyPrefix$$internal.application.main"
lazy val flinkOptions: Options = FlinkRunOption.allOptions
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index a37147f70..ca79b68dc 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -61,9 +61,9 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
@Nullable k8sSubmitParam: KubernetesSubmitParam,
@Nullable extraParameter: JavaMap[String, Any]) {
- lazy val appProperties: Map[String, String] = getParameterMap(KEY_FLINK_DEPLOYMENT_PROPERTY_PREFIX)
+ lazy val appProperties: Map[String, String] = getParameterMap(KEY_FLINK_PROPERTY_PREFIX)
- lazy val appOption: Map[String, String] = getParameterMap(KEY_FLINK_DEPLOYMENT_OPTION_PREFIX)
+ lazy val appOption: Map[String, String] = getParameterMap(KEY_FLINK_OPTION_PREFIX)
lazy val appMain: String = this.developmentMode match {
case DevelopmentMode.FLINKSQL => ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS