You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/05 15:27:54 UTC

[incubator-streampark] branch typesafe created (now bc62552b5)

This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a change to branch typesafe
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


      at bc62552b5 [feature] HOCON conf file support

This branch includes the following new commits:

     new bc62552b5 [feature] HOCON conf file support

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[incubator-streampark] 01/01: [feature] HOCON conf file support

Posted by be...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch typesafe
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit bc62552b57f289876f185740e5582ef7672f944b
Author: benjobs <be...@apache.org>
AuthorDate: Sat Nov 5 23:27:34 2022 +0800

    [feature] HOCON conf file support
---
 pom.xml                                            |  9 ++-
 streampark-common/pom.xml                          |  5 ++
 .../streampark/common/util/PropertiesUtils.scala   | 66 +++++++++++++---------
 .../console/core/entity/ApplicationConfig.java     | 21 ++++++-
 .../core/service/impl/ApplicationServiceImpl.java  |  3 +-
 .../flink/core/FlinkStreamingInitializer.scala     | 19 ++++---
 .../flink/submit/bean/SubmitRequest.scala          | 11 ++--
 7 files changed, 88 insertions(+), 46 deletions(-)

diff --git a/pom.xml b/pom.xml
index eca63cfda..83349b842 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,7 @@
         <mysql.version>8.0.16</mysql.version>
         <hikariCP.version>3.4.5</hikariCP.version>
         <snakeyaml.version>1.32</snakeyaml.version>
+        <typesafe-conf.version>1.4.2</typesafe-conf.version>
         <json4s-jackson.version>3.7.0-M2</json4s-jackson.version>
         <hbase-client.version>1.3.5</hbase-client.version>
         <commons-cli.version>1.3.1</commons-cli.version>
@@ -229,6 +230,12 @@
                 <version>${hikariCP.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>com.typesafe</groupId>
+                <artifactId>config</artifactId>
+                <version>${typesafe-conf.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>org.yaml</groupId>
                 <artifactId>snakeyaml</artifactId>
@@ -572,7 +579,7 @@
                 <version>${jupiter.version}</version>
                 <scope>test</scope>
             </dependency>
-            
+
             <dependency>
                 <groupId>org.junit.jupiter</groupId>
                 <artifactId>junit-jupiter-params</artifactId>
diff --git a/streampark-common/pom.xml b/streampark-common/pom.xml
index 2b4163d36..43b12511b 100644
--- a/streampark-common/pom.xml
+++ b/streampark-common/pom.xml
@@ -80,6 +80,11 @@
             <artifactId>snakeyaml</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>com.typesafe</groupId>
+            <artifactId>config</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.json4s</groupId>
             <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index f590bc848..2c0142ace 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.streampark.common.util
 
+import com.typesafe.config.ConfigFactory
 import org.yaml.snakeyaml.Yaml
 
 import java.io._
@@ -76,6 +77,16 @@ object PropertiesUtils extends Logger {
     }
   }
 
+  def fromHoconText(conf: String): Map[String, String] = {
+    try {
+      val reader = new StringReader(conf)
+      val config = ConfigFactory.parseReader(reader)
+      config.entrySet().map(x => x.getKey -> x.getValue.render()).toMap
+    } catch {
+      case e: IOException => throw new IllegalArgumentException(s"Failed when loading Hocon ", e)
+    }
+  }
+
   def fromPropertiesText(conf: String): Map[String, String] = {
     try {
       val properties = new Properties()
@@ -92,17 +103,14 @@ object PropertiesUtils extends Logger {
     require(file.exists(), s"[StreamPark] fromYamlFile: Yaml file $file does not exist")
     require(file.isFile, s"[StreamPark] fromYamlFile: Yaml file $file is not a normal file")
     val inputStream: InputStream = new FileInputStream(file)
-    try {
-      val map = MutableMap[String, String]()
-      new Yaml()
-        .load(inputStream)
-        .asInstanceOf[java.util.Map[String, Map[String, Any]]]
-        .flatMap(x => eachAppendYamlItem("", x._1, x._2, map)).toMap
-    } catch {
-      case e: IOException => throw new IllegalArgumentException(s"Failed when loading properties from $filename", e)
-    } finally {
-      inputStream.close()
-    }
+    fromYamlFile(inputStream)
+  }
+
+  def fromHoconFile(filename: String): Map[String, String] = {
+    val file = new File(filename)
+    require(file.exists(), s"[StreamPark] fromHoconFile: file $file does not exist")
+    val inputStream = new FileInputStream(file)
+    fromHoconFile(inputStream)
   }
 
   /** Load properties present in the given file. */
@@ -110,17 +118,8 @@ object PropertiesUtils extends Logger {
     val file = new File(filename)
     require(file.exists(), s"[StreamPark] fromPropertiesFile: Properties file $file does not exist")
     require(file.isFile, s"[StreamPark] fromPropertiesFile: Properties file $file is not a normal file")
-
-    val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8")
-    try {
-      val properties = new Properties()
-      properties.load(inReader)
-      properties.stringPropertyNames().map(k => (k, properties.getProperty(k).trim)).toMap
-    } catch {
-      case e: IOException => throw new IllegalArgumentException(s"Failed when loading properties from $filename", e)
-    } finally {
-      inReader.close()
-    }
+    val inputStream = new FileInputStream(file)
+    fromPropertiesFile(inputStream)
   }
 
   /** Load Yaml present in the given file. */
@@ -139,6 +138,17 @@ object PropertiesUtils extends Logger {
     }
   }
 
+  def fromHoconFile(inputStream: InputStream): Map[String, String] = {
+    require(inputStream != null, s"[StreamPark] fromHoconFile: Hocon inputStream  must not be null")
+    try {
+      val reader = new InputStreamReader(inputStream)
+      val config = ConfigFactory.parseReader(reader)
+      config.entrySet().map(x => x.getKey -> x.getValue.render()).toMap
+    } catch {
+      case e: IOException => throw new IllegalArgumentException(s"Failed when loading Hocon ", e)
+    }
+  }
+
   /** Load properties present in the given file. */
   def fromPropertiesFile(inputStream: InputStream): Map[String, String] = {
     require(inputStream != null, s"[StreamPark] fromPropertiesFile: Properties inputStream  must not be null")
@@ -153,18 +163,20 @@ object PropertiesUtils extends Logger {
 
   def fromYamlTextAsJava(text: String): JavaMap[String, String] = new JavaMap[String, String](fromYamlText(text).asJava)
 
-  def fromPropertiesTextAsJava(conf: String): JavaMap[String, String] = new JavaMap[String, String](fromPropertiesText(conf).asJava)
+  def fromHoconTextAsJava(text: String): JavaMap[String, String] = new JavaMap[String, String](fromHoconText(text).asJava)
+
+  def fromPropertiesTextAsJava(text: String): JavaMap[String, String] = new JavaMap[String, String](fromPropertiesText(text).asJava)
 
-  /** Load Yaml present in the given file. */
   def fromYamlFileAsJava(filename: String): JavaMap[String, String] = new JavaMap[String, String](fromYamlFile(filename).asJava)
 
-  /** Load properties present in the given file. */
+  def fromHoconFileAsJava(filename: String): JavaMap[String, String] = new JavaMap[String, String](fromHoconFile(filename).asJava)
+
   def fromPropertiesFileAsJava(filename: String): JavaMap[String, String] = new JavaMap[String, String](fromPropertiesFile(filename).asJava)
 
-  /** Load Yaml present in the given file. */
   def fromYamlFileAsJava(inputStream: InputStream): JavaMap[String, String] = new JavaMap[String, String](fromYamlFile(inputStream).asJava)
 
-  /** Load properties present in the given file. */
+  def fromHoconFileAsJava(inputStream: InputStream): JavaMap[String, String] = new JavaMap[String, String](fromHoconFile(inputStream).asJava)
+
   def fromPropertiesFileAsJava(inputStream: InputStream): JavaMap[String, String] = new JavaMap[String, String](fromPropertiesFile(inputStream).asJava)
 
   /**
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
index 4a50cae9f..db660043f 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
@@ -44,7 +44,8 @@ public class ApplicationConfig {
 
     /**
      * 1)yaml <br>
-     * 2)prop
+     * 2)prop <br>
+     * 3)hocon
      */
     private Integer format;
 
@@ -74,14 +75,30 @@ public class ApplicationConfig {
     }
 
     public Map<String, String> readConfig() {
-        switch (this.getFormat()) {
+        switch (this.format) {
             case 1:
                 return PropertiesUtils.fromYamlTextAsJava(DeflaterUtils.unzipString(this.content));
             case 2:
                 return PropertiesUtils.fromPropertiesTextAsJava(DeflaterUtils.unzipString(this.content));
+            case 3:
+                return PropertiesUtils.fromHoconTextAsJava(DeflaterUtils.unzipString(this.content));
             default:
                 break;
         }
         return null;
     }
+
+    public String configType() {
+        switch (this.format) {
+            case 1:
+                return "yaml";
+            case 2:
+                return "prop";
+            case 3:
+                return "conf";
+            default:
+                throw new IllegalArgumentException("getConfigType error, format must be (1|2|3), detail: 1:yaml, 2:properties, 3:hocon");
+        }
+    }
+
 }
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 1b2996966..47e86973a 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1262,8 +1262,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
             } else {
                 switch (application.getApplicationType()) {
                     case STREAMPARK_FLINK:
-                        String format = applicationConfig.getFormat() == 1 ? "yaml" : "prop";
-                        appConf = String.format("%s://%s", format, applicationConfig.getContent());
+                        appConf = String.format("%s://%s", applicationConfig.configType(), applicationConfig.getContent());
                         break;
                     case APACHE_FLINK:
                         appConf = String.format("json://{\"%s\":\"%s\"}", ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 8c84b3719..8189a034c 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -102,29 +102,30 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
 
   def parseConfig(config: String): Map[String, String] = {
     val extension = config.split("\\.").last.toLowerCase
+    lazy val content = DeflaterUtils.unzipString(config.drop(7))
     val map = config match {
-      case x if x.startsWith("yaml://") =>
-        PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.drop(7)))
-      case x if x.startsWith("prop://") =>
-        PropertiesUtils.fromPropertiesText(DeflaterUtils.unzipString(x.drop(7)))
+      case x if x.startsWith("yaml://") => PropertiesUtils.fromYamlText(content)
+      case x if x.startsWith("conf://") => PropertiesUtils.fromHoconText(content)
+      case x if x.startsWith("prop://") => PropertiesUtils.fromPropertiesText(content)
       case x if x.startsWith("hdfs://") =>
-
         /**
          * If the configuration file with the hdfs, user will need to copy the hdfs-related configuration files under the resources dir
          */
         val text = HdfsUtils.read(x)
         extension match {
-          case "properties" => PropertiesUtils.fromPropertiesText(text)
           case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
+          case "conf" => PropertiesUtils.fromHoconText(text)
+          case "properties" => PropertiesUtils.fromPropertiesText(text)
           case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be properties or yml")
         }
       case _ =>
         val configFile = new File(config)
-        require(configFile.exists(), s"[StreamPark] Usage:flink.conf file $configFile is not found!!!")
+        require(configFile.exists(), s"[StreamPark] Usage: flink.conf file $configFile is not found!!!")
         extension match {
-          case "properties" => PropertiesUtils.fromPropertiesFile(configFile.getAbsolutePath)
           case "yml" | "yaml" => PropertiesUtils.fromYamlFile(configFile.getAbsolutePath)
-          case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be properties or yml")
+          case "conf" => PropertiesUtils.fromHoconFile(configFile.getAbsolutePath)
+          case "properties" => PropertiesUtils.fromPropertiesFile(configFile.getAbsolutePath)
+          case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be (yml|conf|properties)")
         }
     }
     map.filter(_._2.nonEmpty)
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index d25311146..601fc8ade 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -104,11 +104,11 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
 
   private[this] def getParameterMap(prefix: String = ""): Map[String, String] = {
     if (this.appConf == null) Map.empty[String, String] else {
+      lazy val content = DeflaterUtils.unzipString(this.appConf.trim.drop(7))
       val map = this.appConf match {
-        case x if x.trim.startsWith("yaml://") =>
-          PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.trim.drop(7)))
-        case x if x.trim.startsWith("prop://") =>
-          PropertiesUtils.fromPropertiesText(DeflaterUtils.unzipString(x.trim.drop(7)))
+        case x if x.trim.startsWith("yaml://") => PropertiesUtils.fromYamlText(content)
+        case x if x.trim.startsWith("conf://") => PropertiesUtils.fromHoconText(content)
+        case x if x.trim.startsWith("prop://") => PropertiesUtils.fromPropertiesText(content)
         case x if x.trim.startsWith("hdfs://") =>
           /*
            * 如果配置文件为hdfs方式,则需要用户将hdfs相关配置文件copy到resources下...
@@ -116,8 +116,9 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
           val text = HdfsUtils.read(this.appConf)
           val extension = this.appConf.split("\\.").last.toLowerCase
           extension match {
-            case "properties" => PropertiesUtils.fromPropertiesText(text)
             case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
+            case "conf" => PropertiesUtils.fromHoconText(text)
+            case "properties" => PropertiesUtils.fromPropertiesText(text)
             case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be properties or yml")
           }
         case x if x.trim.startsWith("json://") =>