You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@streampark.apache.org by be...@apache.org on 2022/11/05 15:27:55 UTC
[incubator-streampark] 01/01: [feature] HOCON conf file support
This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch typesafe
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit bc62552b57f289876f185740e5582ef7672f944b
Author: benjobs <be...@apache.org>
AuthorDate: Sat Nov 5 23:27:34 2022 +0800
[feature] HOCON conf file support
---
pom.xml | 9 ++-
streampark-common/pom.xml | 5 ++
.../streampark/common/util/PropertiesUtils.scala | 66 +++++++++++++---------
.../console/core/entity/ApplicationConfig.java | 21 ++++++-
.../core/service/impl/ApplicationServiceImpl.java | 3 +-
.../flink/core/FlinkStreamingInitializer.scala | 19 ++++---
.../flink/submit/bean/SubmitRequest.scala | 11 ++--
7 files changed, 88 insertions(+), 46 deletions(-)
diff --git a/pom.xml b/pom.xml
index eca63cfda..83349b842 100644
--- a/pom.xml
+++ b/pom.xml
@@ -114,6 +114,7 @@
<mysql.version>8.0.16</mysql.version>
<hikariCP.version>3.4.5</hikariCP.version>
<snakeyaml.version>1.32</snakeyaml.version>
+ <typesafe-conf.version>1.4.2</typesafe-conf.version>
<json4s-jackson.version>3.7.0-M2</json4s-jackson.version>
<hbase-client.version>1.3.5</hbase-client.version>
<commons-cli.version>1.3.1</commons-cli.version>
@@ -229,6 +230,12 @@
<version>${hikariCP.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ <version>${typesafe-conf.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.yaml</groupId>
<artifactId>snakeyaml</artifactId>
@@ -572,7 +579,7 @@
<version>${jupiter.version}</version>
<scope>test</scope>
</dependency>
-
+
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
diff --git a/streampark-common/pom.xml b/streampark-common/pom.xml
index 2b4163d36..43b12511b 100644
--- a/streampark-common/pom.xml
+++ b/streampark-common/pom.xml
@@ -80,6 +80,11 @@
<artifactId>snakeyaml</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.json4s</groupId>
<artifactId>json4s-jackson_${scala.binary.version}</artifactId>
diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
index f590bc848..2c0142ace 100644
--- a/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
+++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/PropertiesUtils.scala
@@ -16,6 +16,7 @@
*/
package org.apache.streampark.common.util
+import com.typesafe.config.ConfigFactory
import org.yaml.snakeyaml.Yaml
import java.io._
@@ -76,6 +77,16 @@ object PropertiesUtils extends Logger {
}
}
+ def fromHoconText(conf: String): Map[String, String] = {
+ try {
+ val reader = new StringReader(conf)
+ val config = ConfigFactory.parseReader(reader)
+ config.entrySet().map(x => x.getKey -> x.getValue.render()).toMap
+ } catch {
+ case e: IOException => throw new IllegalArgumentException(s"Failed when loading Hocon ", e)
+ }
+ }
+
def fromPropertiesText(conf: String): Map[String, String] = {
try {
val properties = new Properties()
@@ -92,17 +103,14 @@ object PropertiesUtils extends Logger {
require(file.exists(), s"[StreamPark] fromYamlFile: Yaml file $file does not exist")
require(file.isFile, s"[StreamPark] fromYamlFile: Yaml file $file is not a normal file")
val inputStream: InputStream = new FileInputStream(file)
- try {
- val map = MutableMap[String, String]()
- new Yaml()
- .load(inputStream)
- .asInstanceOf[java.util.Map[String, Map[String, Any]]]
- .flatMap(x => eachAppendYamlItem("", x._1, x._2, map)).toMap
- } catch {
- case e: IOException => throw new IllegalArgumentException(s"Failed when loading properties from $filename", e)
- } finally {
- inputStream.close()
- }
+ fromYamlFile(inputStream)
+ }
+
+ def fromHoconFile(filename: String): Map[String, String] = {
+ val file = new File(filename)
+ require(file.exists(), s"[StreamPark] fromHoconFile: file $file does not exist")
+ val inputStream = new FileInputStream(file)
+ fromHoconFile(inputStream)
}
/** Load properties present in the given file. */
@@ -110,17 +118,8 @@ object PropertiesUtils extends Logger {
val file = new File(filename)
require(file.exists(), s"[StreamPark] fromPropertiesFile: Properties file $file does not exist")
require(file.isFile, s"[StreamPark] fromPropertiesFile: Properties file $file is not a normal file")
-
- val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8")
- try {
- val properties = new Properties()
- properties.load(inReader)
- properties.stringPropertyNames().map(k => (k, properties.getProperty(k).trim)).toMap
- } catch {
- case e: IOException => throw new IllegalArgumentException(s"Failed when loading properties from $filename", e)
- } finally {
- inReader.close()
- }
+ val inputStream = new FileInputStream(file)
+ fromPropertiesFile(inputStream)
}
/** Load Yaml present in the given file. */
@@ -139,6 +138,17 @@ object PropertiesUtils extends Logger {
}
}
+ def fromHoconFile(inputStream: InputStream): Map[String, String] = {
+ require(inputStream != null, s"[StreamPark] fromHoconFile: Hocon inputStream must not be null")
+ try {
+ val reader = new InputStreamReader(inputStream)
+ val config = ConfigFactory.parseReader(reader)
+ config.entrySet().map(x => x.getKey -> x.getValue.render()).toMap
+ } catch {
+ case e: IOException => throw new IllegalArgumentException(s"Failed when loading Hocon ", e)
+ }
+ }
+
/** Load properties present in the given file. */
def fromPropertiesFile(inputStream: InputStream): Map[String, String] = {
require(inputStream != null, s"[StreamPark] fromPropertiesFile: Properties inputStream must not be null")
@@ -153,18 +163,20 @@ object PropertiesUtils extends Logger {
def fromYamlTextAsJava(text: String): JavaMap[String, String] = new JavaMap[String, String](fromYamlText(text).asJava)
- def fromPropertiesTextAsJava(conf: String): JavaMap[String, String] = new JavaMap[String, String](fromPropertiesText(conf).asJava)
+ def fromHoconTextAsJava(text: String): JavaMap[String, String] = new JavaMap[String, String](fromHoconText(text).asJava)
+
+ def fromPropertiesTextAsJava(text: String): JavaMap[String, String] = new JavaMap[String, String](fromPropertiesText(text).asJava)
- /** Load Yaml present in the given file. */
def fromYamlFileAsJava(filename: String): JavaMap[String, String] = new JavaMap[String, String](fromYamlFile(filename).asJava)
- /** Load properties present in the given file. */
+ def fromHoconFileAsJava(filename: String): JavaMap[String, String] = new JavaMap[String, String](fromHoconFile(filename).asJava)
+
def fromPropertiesFileAsJava(filename: String): JavaMap[String, String] = new JavaMap[String, String](fromPropertiesFile(filename).asJava)
- /** Load Yaml present in the given file. */
def fromYamlFileAsJava(inputStream: InputStream): JavaMap[String, String] = new JavaMap[String, String](fromYamlFile(inputStream).asJava)
- /** Load properties present in the given file. */
+ def fromHoconFileAsJava(inputStream: InputStream): JavaMap[String, String] = new JavaMap[String, String](fromHoconFile(inputStream).asJava)
+
def fromPropertiesFileAsJava(inputStream: InputStream): JavaMap[String, String] = new JavaMap[String, String](fromPropertiesFile(inputStream).asJava)
/**
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
index 4a50cae9f..db660043f 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
@@ -44,7 +44,8 @@ public class ApplicationConfig {
/**
* 1)yaml <br>
- * 2)prop
+ * 2)prop <br>
+ * 3)hocon
*/
private Integer format;
@@ -74,14 +75,30 @@ public class ApplicationConfig {
}
public Map<String, String> readConfig() {
- switch (this.getFormat()) {
+ switch (this.format) {
case 1:
return PropertiesUtils.fromYamlTextAsJava(DeflaterUtils.unzipString(this.content));
case 2:
return PropertiesUtils.fromPropertiesTextAsJava(DeflaterUtils.unzipString(this.content));
+ case 3:
+ return PropertiesUtils.fromHoconTextAsJava(DeflaterUtils.unzipString(this.content));
default:
break;
}
return null;
}
+
+ public String configType() {
+ switch (this.format) {
+ case 1:
+ return "yaml";
+ case 2:
+ return "prop";
+ case 3:
+ return "conf";
+ default:
+ throw new IllegalArgumentException("getConfigType error, format must be (1|2|3), detail: 1:yaml, 2:properties, 3:hocon");
+ }
+ }
+
}
diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 1b2996966..47e86973a 100644
--- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1262,8 +1262,7 @@ public class ApplicationServiceImpl extends ServiceImpl<ApplicationMapper, Appli
} else {
switch (application.getApplicationType()) {
case STREAMPARK_FLINK:
- String format = applicationConfig.getFormat() == 1 ? "yaml" : "prop";
- appConf = String.format("%s://%s", format, applicationConfig.getContent());
+ appConf = String.format("%s://%s", applicationConfig.configType(), applicationConfig.getContent());
break;
case APACHE_FLINK:
appConf = String.format("json://{\"%s\":\"%s\"}", ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass());
diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 8c84b3719..8189a034c 100644
--- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -102,29 +102,30 @@ private[flink] class FlinkStreamingInitializer(args: Array[String], apiType: Api
def parseConfig(config: String): Map[String, String] = {
val extension = config.split("\\.").last.toLowerCase
+ lazy val content = DeflaterUtils.unzipString(config.drop(7))
val map = config match {
- case x if x.startsWith("yaml://") =>
- PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.drop(7)))
- case x if x.startsWith("prop://") =>
- PropertiesUtils.fromPropertiesText(DeflaterUtils.unzipString(x.drop(7)))
+ case x if x.startsWith("yaml://") => PropertiesUtils.fromYamlText(content)
+ case x if x.startsWith("conf://") => PropertiesUtils.fromHoconText(content)
+ case x if x.startsWith("prop://") => PropertiesUtils.fromPropertiesText(content)
case x if x.startsWith("hdfs://") =>
-
/**
* If the configuration file with the hdfs, user will need to copy the hdfs-related configuration files under the resources dir
*/
val text = HdfsUtils.read(x)
extension match {
- case "properties" => PropertiesUtils.fromPropertiesText(text)
case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
+ case "conf" => PropertiesUtils.fromHoconText(text)
+ case "properties" => PropertiesUtils.fromPropertiesText(text)
case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be properties or yml")
}
case _ =>
val configFile = new File(config)
- require(configFile.exists(), s"[StreamPark] Usage:flink.conf file $configFile is not found!!!")
+ require(configFile.exists(), s"[StreamPark] Usage: flink.conf file $configFile is not found!!!")
extension match {
- case "properties" => PropertiesUtils.fromPropertiesFile(configFile.getAbsolutePath)
case "yml" | "yaml" => PropertiesUtils.fromYamlFile(configFile.getAbsolutePath)
- case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be properties or yml")
+ case "conf" => PropertiesUtils.fromHoconFile(configFile.getAbsolutePath)
+ case "properties" => PropertiesUtils.fromPropertiesFile(configFile.getAbsolutePath)
+ case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be (yml|conf|properties)")
}
}
map.filter(_._2.nonEmpty)
diff --git a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
index d25311146..601fc8ade 100644
--- a/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
+++ b/streampark-flink/streampark-flink-submit/streampark-flink-submit-api/src/main/scala/org/apache/streampark/flink/submit/bean/SubmitRequest.scala
@@ -104,11 +104,11 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
private[this] def getParameterMap(prefix: String = ""): Map[String, String] = {
if (this.appConf == null) Map.empty[String, String] else {
+ lazy val content = DeflaterUtils.unzipString(this.appConf.trim.drop(7))
val map = this.appConf match {
- case x if x.trim.startsWith("yaml://") =>
- PropertiesUtils.fromYamlText(DeflaterUtils.unzipString(x.trim.drop(7)))
- case x if x.trim.startsWith("prop://") =>
- PropertiesUtils.fromPropertiesText(DeflaterUtils.unzipString(x.trim.drop(7)))
+ case x if x.trim.startsWith("yaml://") => PropertiesUtils.fromYamlText(content)
+ case x if x.trim.startsWith("conf://") => PropertiesUtils.fromHoconText(content)
+ case x if x.trim.startsWith("prop://") => PropertiesUtils.fromPropertiesText(content)
case x if x.trim.startsWith("hdfs://") =>
/*
* 如果配置文件为hdfs方式,则需要用户将hdfs相关配置文件copy到resources下...
@@ -116,8 +116,9 @@ case class SubmitRequest(flinkVersion: FlinkVersion,
val text = HdfsUtils.read(this.appConf)
val extension = this.appConf.split("\\.").last.toLowerCase
extension match {
- case "properties" => PropertiesUtils.fromPropertiesText(text)
case "yml" | "yaml" => PropertiesUtils.fromYamlText(text)
+ case "conf" => PropertiesUtils.fromHoconText(text)
+ case "properties" => PropertiesUtils.fromPropertiesText(text)
case _ => throw new IllegalArgumentException("[StreamPark] Usage:flink.conf file error,must be properties or yml")
}
case x if x.trim.startsWith("json://") =>