You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/05 15:27:55 UTC

[incubator-streampark] 01/01: [feature] HOCON conf file support

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch typesafe
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit bc62552b57f289876f185740e5582ef7672f944b
Author: benjobs <be...@apache.org>
AuthorDate: Sat Nov 5 23:27:34 2022 +0800

    [feature] HOCON conf file support
---
 pom.xml                                            |  9 ++-
 streampark-common/pom.xml                          |  5 ++
 .../streampark/common/util/PropertiesUtils.scala   | 66 +++++++++++++---------
 .../console/core/entity/ApplicationConfig.java     | 21 ++++++-
 .../core/service/impl/ApplicationServiceImpl.java  |  3 +-
 .../flink/core/FlinkStreamingInitializer.scala     | 19 ++++---
 .../flink/submit/bean/SubmitRequest.scala          | 11 ++--
 7 files changed, 88 insertions(+), 46 deletions(-)

diff --git a/pom.xml b/pom.xml
index eca63cfda..83349b842 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,7 @@
         <mysql.version>8.0.16</mysql.version>
         <hikariCP.version>3.4.5</hikariCP.version>
         <snakeyaml.version>1.32</snakeyaml.version>
+        <typesafe-conf.version>1.4.2</typesafe-conf.version>
         <json4s-jackson.version>3.7.0-M2</json4s-jackson.version>
         <hbase-client.version>1.3.5</hbase-client.version>
         <commons-cli.version>1.3.1</commons-cli.version>
@@ -229,6 +230,12 @@
                 <version>${hikariCP.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>com.typesafe</groupId>
+                <artifactId>config</artifactId>
+                <version>${typesafe-conf.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>org.yaml</groupId>
                 <artifactId>snakeyaml</artifactId>
@@ -572,7 +579,7 @@
                 <version>${jupiter.version}</version>
                 <scope>test</scope>
             </dependency>
-            
+
             <dependency>
                 <groupId>org.junit.jupiter</groupId>
                 <artifactId>junit-jupiter-params</artifactId>
diff --git a/streampark-common/pom.xml b/streampark-common/pom.xml
index 2b4163d36..43b12511b 100644
--- a/streampark-common/pom.xml
+++ b/streampark-common/pom.xml
@@ -80,6 +80,11 @@
             <artifactId>snakeyaml</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.json4s</groupId>
             <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index f590bc848..2c0142ace 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.streampark.common.util
 
+import com.typesafe.config.ConfigFactory
 import org.yaml.snakeyaml.Yaml
 
 import java.io._
@@ -76,6 +77,16 @@ object PropertiesUtils extends Logger {
     }
   }
 
+  def fromHoconText(conf: String): Map[String, String] = {
+    try {
+      val reader = new StringReader(conf)
+      val config = ConfigFactory.parseReader(reader)
+      config.entrySet().map(x => x.getKey -> x.getValue.render()).toMap
+    } catch {
+      case e: IOException => throw new IllegalArgumentException(s"Failed when loading Hocon ", e)
+    }
+  }
+
   def fromPropertiesText(conf: String): Map[String, String] = {
     try {
       val properties = new Properties()
@@ -92,17 +103,14 @@ object PropertiesUtils extends Logger {
     require(file.exists(), s"[StreamPark] fromYamlFile: Yaml file $file does not exist")
     require(file.isFile, s"[StreamPark] fromYamlFile: Yaml file $file is not a normal file")
     val inputStream: InputStream = new FileInputStream(file)
-    try {
-      val map = MutableMap[String, String]()
-      new Yaml()
-        .load(inputStream)
-        .asInstanceOf[java.util.Map[String, Map[String, Any]]]
-        .flatMap(x => eachAppendYamlItem("", x._1, x._2, map)).toMap
-    } catch {
-      case e: IOException => throw new IllegalArgumentException(s"Failed when loading properties from $filename", e)
-    } finally {
-      inputStream.close()
-    }
+    fromYamlFile(inputStream)
+  }
+
+  def fromHoconFile(filename: String): Map[String, String] = {
+    val file = new File(filename)
+    require(file.exists(), s"[StreamPark] fromHoconFile: file $file does not exist")
+    val inputStream = new FileInputStream(file)
+    fromHoconFile(inputStream)
   }
 
   /** Load properties present in the given file. */
@@ -110,17 +118,8 @@ object PropertiesUtils extends Logger {
     val file = new File(filename)
     require(file.exists(), s"[StreamPark] fromPropertiesFile: Properties file $file does not exist")
     require(file.isFile, s"[StreamPark] fromPropertiesFile: Properties file $file is not a normal file")
-
-    val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8")
-    try {
-      val properties = new Properties()
-      properties.load(inReader)
-      properties.stringPropertyNames().map(k => (k, properties.getProperty(k).trim)).toMap
-    } catch {
-      case e: IOException => throw new IllegalArgumentException(s"Failed when loading properties from $filename", e)
-    } finally {
-      inReader.close()
-    }
+    val inputStream = new FileInputStream(file)
+    fromPropertiesFile(inputStream)
   }
 
   /** Load Yaml present in the given file. */
@@ -139,6 +138,17 @@ object PropertiesUtils extends Logger {
     }
   }
 
+  def fromHoconFile(inputStream: InputStream): Map[String, String] = {
+    require(inputStream != null, s"[StreamPark] fromHoconFile: Hocon inputStream  must not be null")
+    try {
+      val reader = new InputStreamReader(inputStream)
+      val config = ConfigFactory.parseReader(reader)
+      config.entrySet().map(x => x.getKey -> x.getValue.render()).toMap
+    } catch {
+      case e: IOException => throw new IllegalArgumentException(s"Failed when loading Hocon ", e)
+    }
+  }
+
   /** Load properties present in the given file. */
   def fromPropertiesFile(inputStream: InputStream): Map[String, String] = {
     require(inputStream != null, s"[StreamPark] fromPropertiesFile: Properties inputStream  must not be null")
@@ -153,18 +163,20 @@ object PropertiesUtils extends Logger {
 
   def fromYamlTextAsJava(text: String): JavaMap[String, String] = new JavaMap[String, String](fromYamlText(text).asJava)
 
-  def fromPropertiesTextAsJava(conf: String): JavaMap[String, String] = new JavaMap[String, String](fromPropertiesText(conf).asJava)
+  def fromHoconTextAsJava(text: String): JavaMap[String, String] = new JavaMap[String, String](fromHoconText(text).asJava)
+
+  def fromPropertiesTextAsJava(text: String): JavaMap[String, String] = new JavaMap[String, String](fromPropertiesText(text).asJava)
 
-  /** Load Yaml present in the given file. */
   def fromYamlFileAsJava(filename: String): JavaMap[String, String] = new JavaMap[String, String](fromYamlFile(filename).asJava)
 
-  /** Load properties present in the given file. */
+  def fromHoconFileAsJava(filename: String): JavaMap[String, String] = new JavaMap[String, String](fromHoconFile(filename).asJava)
+
   def fromPropertiesFileAsJava(filename: String): JavaMap[String, String] = new JavaMap[String, String](fromPropertiesFile(filename).asJava)
 
-  /** Load Yaml present in the given file. */
   def fromYamlFileAsJava(inputStream: InputStream): JavaMap[String, String] = new JavaMap[String, String](fromYamlFile(inputStream).asJava)
 
-  /** Load properties present in the given file. */
+  def fromHoconFileAsJava(inputStream: InputStream): JavaMap[String, String] = new JavaMap[String, String](fromHoconFile(inputStream).asJava)
+
   def fromPropertiesFileAsJava(inputStream: InputStream): JavaMap[String, String] = new JavaMap[String, String](fromPropertiesFile(inputStream).asJava)
 
   /**
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
index 4a50cae9f..db660043f 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
@@ -44,7 +44,8 @@ public class ApplicationConfig {
 
     /**
      * 1)yaml <br>
-     * 2)prop
+     * 2)prop <br>
+     * 3)hocon
      */
     private Integer format;
 
@@ -74,14 +75,30 @@ public class ApplicationConfig {
     }
 
     public Map<String, String> readConfig() {
-        switch (this.getFormat()) {
+        switch (this.format) {
             case 1:
                 return PropertiesUtils.fromYamlTextAsJava(DeflaterUtils.unzipString(this.content));
             case 2:
                 return PropertiesUtils.fromPropertiesTextAsJava(DeflaterUtils.unzipString(this.content));
+            case 3:
+                return PropertiesUtils.fromHoconTextAsJava(DeflaterUtils.unzipString(this.content));
             default:
                 break;
         }
         return null;
     }
+
+    public String configType() {
+        switch (this.format) {
+            case 1:
+                return "yaml";
+            case 2:
+                return "prop";
+            case 3:
+                return "conf";
+            default:
+                throw new IllegalArgumentException("getConfigType error, format must be (1|2|3), detail: 1:yaml, 2:properties, 3:hocon");
+        }
+    }
+
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 1b2996966..47e86973a 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1262,8 +1262,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             } else {
                 switch (application.getApplicationType()) {
                     case STREAMPARK_FLINK:
-                        String format = applicationConfig.getFormat() == 1 ? "yaml" : "prop";
-                        appConf = String.format("%s://%s", format, applicationConfig.getContent());
+                        appConf = String.format("%s://%s", applicationConfig.configType(), applicationConfig.getContent());
                         break;
                     case APACHE_FLINK:
                         appConf = String.format("json://{\"%s\":\"%s\"}", ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 8c84b3719..8189a034c 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -102,29 +102,30 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
 
   def parseConfig(config: String): Map[String, String] = {
     val extension = config.split("\\.").last.toLowerCase
+    lazy val content = DeflaterUtils.unzipString(config.drop(7))
     val map = config match {
-      case x if x.startsWith("yaml://") =>
-        PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.drop(7)))
-      case x if x.startsWith("prop://") =>
-        PropertiesUtils.fromPropertiesText(DeflaterUtils.unzipString(x.drop(7)))
+      case x if x.startsWith("yaml://") => PropertiesUtils.fromYamlText(content)
+      case x if x.startsWith("conf://") => PropertiesUtils.fromHoconText(content)
+      case x if x.startsWith("prop://") => PropertiesUtils.fromPropertiesText(content)
       case x if x.startsWith("hdfs://") =>
-
         /**
          * If the configuration file with the hdfs, user will need to copy the hdfs-related configuration files under the resources dir
          */
         val text = HdfsUtils.read(x)
         extension match {
-          case "properties" => PropertiesUtils.fromPropertiesText(text)
           case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
+          case "conf" => PropertiesUtils.fromHoconText(text)
+          case "properties" => PropertiesUtils.fromPropertiesText(text)
           case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be properties or yml")
         }
       case _ =>
         val configFile = new File(config)
-        require(configFile.exists(), s"[StreamPark] Usage:flink.conf file $configFile is not found!!!")
+        require(configFile.exists(), s"[StreamPark] Usage: flink.conf file $configFile is not found!!!")
         extension match {
-          case "properties" => PropertiesUtils.fromPropertiesFile(configFile.getAbsolutePath)
           case "yml" | "yaml" => PropertiesUtils.fromYamlFile(configFile.getAbsolutePath)
-          case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be properties or yml")
+          case "conf" => PropertiesUtils.fromHoconFile(configFile.getAbsolutePath)
+          case "properties" => PropertiesUtils.fromPropertiesFile(configFile.getAbsolutePath)
+          case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be (yml|conf|properties)")
         }
     }
     map.filter(_._2.nonEmpty)
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index d25311146..601fc8ade 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -104,11 +104,11 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
 
   private[this] def getParameterMap(prefix: String = ""): Map[String, String] = {
     if (this.appConf == null) Map.empty[String, String] else {
+      lazy val content = DeflaterUtils.unzipString(this.appConf.trim.drop(7))
       val map = this.appConf match {
-        case x if x.trim.startsWith("yaml://") =>
-          PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.trim.drop(7)))
-        case x if x.trim.startsWith("prop://") =>
-          PropertiesUtils.fromPropertiesText(DeflaterUtils.unzipString(x.trim.drop(7)))
+        case x if x.trim.startsWith("yaml://") => PropertiesUtils.fromYamlText(content)
+        case x if x.trim.startsWith("conf://") => PropertiesUtils.fromHoconText(content)
+        case x if x.trim.startsWith("prop://") => PropertiesUtils.fromPropertiesText(content)
         case x if x.trim.startsWith("hdfs://") =>
           /*
            * 如果配置文件为hdfs方式,则需要用户将hdfs相关配置文件copy到resources下...
@@ -116,8 +116,9 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
           val text = HdfsUtils.read(this.appConf)
           val extension = this.appConf.split("\\.").last.toLowerCase
           extension match {
-            case "properties" => PropertiesUtils.fromPropertiesText(text)
             case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
+            case "conf" => PropertiesUtils.fromHoconText(text)
+            case "properties" => PropertiesUtils.fromPropertiesText(text)
             case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be properties or yml")
           }
         case x if x.trim.startsWith("json://") =>