You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/04 16:00:50 UTC
[incubator-streampark] branch dev updated: [Bug] init config filter null value (#1963)
This is an automated email from the ASF dual-hosted git repository.
monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 2807f5eb4 [Bug] init config filter null value (#1963)
2807f5eb4 is described below
commit 2807f5eb49d1762dcc7d45178ecc7b576107afcf
Author: benjobs <be...@apache.org>
AuthorDate: Sat Nov 5 00:00:43 2022 +0800
[Bug] init config filter null value (#1963)
---
.../streampark/flink/core/FlinkStreamingInitializer.scala | 10 ++++++++--
.../streampark/flink/core/FlinkTableInitializer.scala | 14 ++++++++------
2 files changed, 16 insertions(+), 8 deletions(-)
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 9dfdc33a3..8c84b3719 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -89,15 +89,20 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
val configMap = parseConfig(config)
val properConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
+
// config priority: explicitly specified priority > project profiles > system profiles
- val parameter = ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(appConf)).mergeWith(argsMap)
+ val parameter = ParameterTool.fromSystemProperties()
+ .mergeWith(ParameterTool.fromMap(properConf))
+ .mergeWith(ParameterTool.fromMap(appConf))
+ .mergeWith(argsMap)
+
val envConfig = Configuration.fromMap(properConf)
FlinkConfiguration(parameter, envConfig, null)
}
def parseConfig(config: String): Map[String, String] = {
val extension = config.split("\\.").last.toLowerCase
- config match {
+ val map = config match {
case x if x.startsWith("yaml://") =>
PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.drop(7)))
case x if x.startsWith("prop://") =>
@@ -122,6 +127,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be properties or yml")
}
}
+ map.filter(_._2.nonEmpty)
}
def extractConfigByPrefix(configMap: Map[String, String], prefix: String): Map[String, String] = {
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 01574bcf4..b81ff54a4 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -152,18 +152,20 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
})
// config priority: explicitly specified priority > project profiles > system profiles
+ val properConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
+ val tableConf = extractConfigByPrefix(configMap, KEY_FLINK_TABLE_PREFIX)
+
+ val tableConfig = Configuration.fromMap(tableConf)
+ val envConfig = Configuration.fromMap(properConf)
+
val parameter = ParameterTool.fromSystemProperties()
+ .mergeWith(ParameterTool.fromMap(properConf))
+ .mergeWith(ParameterTool.fromMap(tableConf))
.mergeWith(ParameterTool.fromMap(appConf))
.mergeWith(ParameterTool.fromMap(sqlConf))
.mergeWith(argsMap)
- val properConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
- val envConfig = Configuration.fromMap(properConf)
-
- val tableConf = extractConfigByPrefix(configMap, KEY_FLINK_TABLE_PREFIX)
- val tableConfig = Configuration.fromMap(tableConf)
-
FlinkConfiguration(parameter, envConfig, tableConfig)
}
}