You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by mo...@apache.org on 2022/11/04 16:00:50 UTC

[incubator-streampark] branch dev updated: [Bug] init config filter null value (#1963)

This is an automated email from the ASF dual-hosted git repository.

monster pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2807f5eb4 [Bug] init config filter null value (#1963)
2807f5eb4 is described below

commit 2807f5eb49d1762dcc7d45178ecc7b576107afcf
Author: benjobs <be...@apache.org>
AuthorDate: Sat Nov 5 00:00:43 2022 +0800

    [Bug] init config filter null value (#1963)
---
 .../streampark/flink/core/FlinkStreamingInitializer.scala  | 10 ++++++++--
 .../streampark/flink/core/FlinkTableInitializer.scala      | 14 ++++++++------
 2 files changed, 16 insertions(+), 8 deletions(-)

diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 9dfdc33a3..8c84b3719 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -89,15 +89,20 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
     val configMap = parseConfig(config)
     val properConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
     val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
+
     // config priority: explicitly specified priority > project profiles > system profiles
-    val parameter = ParameterTool.fromSystemProperties().mergeWith(ParameterTool.fromMap(appConf)).mergeWith(argsMap)
+    val parameter = ParameterTool.fromSystemProperties()
+      .mergeWith(ParameterTool.fromMap(properConf))
+      .mergeWith(ParameterTool.fromMap(appConf))
+      .mergeWith(argsMap)
+
     val envConfig = Configuration.fromMap(properConf)
     FlinkConfiguration(parameter, envConfig, null)
   }
 
   def parseConfig(config: String): Map[String, String] = {
     val extension = config.split("\\.").last.toLowerCase
-    config match {
+    val map = config match {
       case x if x.startsWith("yaml://") =>
         PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.drop(7)))
       case x if x.startsWith("prop://") =>
@@ -122,6 +127,7 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
           case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be properties or yml")
         }
     }
+    map.filter(_._2.nonEmpty)
   }
 
   def extractConfigByPrefix(configMap: Map[String, String], prefix: String): Map[String, String] = {
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 01574bcf4..b81ff54a4 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -152,18 +152,20 @@ private[flink] class FlinkTableInitializer(args: Array[String], apiType: ApiType
           })
 
           // config priority: explicitly specified priority > project profiles > system profiles
+          val properConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
           val appConf = extractConfigByPrefix(configMap, KEY_APP_PREFIX)
+          val tableConf = extractConfigByPrefix(configMap, KEY_FLINK_TABLE_PREFIX)
+
+          val tableConfig = Configuration.fromMap(tableConf)
+          val envConfig = Configuration.fromMap(properConf)
+
           val parameter = ParameterTool.fromSystemProperties()
+            .mergeWith(ParameterTool.fromMap(properConf))
+            .mergeWith(ParameterTool.fromMap(tableConf))
             .mergeWith(ParameterTool.fromMap(appConf))
             .mergeWith(ParameterTool.fromMap(sqlConf))
             .mergeWith(argsMap)
 
-          val properConf = extractConfigByPrefix(configMap, KEY_FLINK_PROPERTY_PREFIX)
-          val envConfig = Configuration.fromMap(properConf)
-
-          val tableConf = extractConfigByPrefix(configMap, KEY_FLINK_TABLE_PREFIX)
-          val tableConfig = Configuration.fromMap(tableConf)
-
           FlinkConfiguration(parameter, envConfig, tableConfig)
       }
     }