You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/02/24 13:57:09 UTC
[incubator-seatunnel] branch dev updated: [Improve] [config] Optimize clickhouse connector checkConfig (#1152)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e59d60e [Improve] [config] Optimize clickhouse connector checkConfig (#1152)
e59d60e is described below
commit e59d60e500d29e93acda06dacf33675ce168f53c
Author: Simon <zh...@cvte.com>
AuthorDate: Thu Feb 24 21:56:25 2022 +0800
[Improve] [config] Optimize clickhouse connector checkConfig (#1152)
---
.../apache/seatunnel/spark/sink/Clickhouse.scala | 67 ++++++----------------
1 file changed, 18 insertions(+), 49 deletions(-)
diff --git a/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala b/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
index af4f8a7..50ac363 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
@@ -27,7 +27,9 @@ import scala.collection.immutable.HashMap
import scala.util.{Failure, Success, Try}
import scala.util.matching.Regex
-import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
+import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.common.config.TypesafeConfigUtils.{extractSubConfig, hasSubConfig}
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSink
@@ -43,7 +45,6 @@ class Clickhouse extends SparkBatchSink {
var table: String = _
var fields: java.util.List[String] = _
var retryCodes: java.util.List[Integer] = _
- // var config: Config = ConfigFactory.empty()
val clickhousePrefix = "clickhouse."
val properties: Properties = new Properties()
@@ -79,60 +80,28 @@ class Clickhouse extends SparkBatchSink {
}
override def checkConfig(): CheckResult = {
- val requiredOptions = List("host", "table", "database")
- val nonExistsOptions =
- requiredOptions.map(optionName => (optionName, config.hasPath(optionName))).filter { p =>
- val (optionName, exists) = p
- !exists
- }
-
- if (TypesafeConfigUtils.hasSubConfig(config, clickhousePrefix)) {
- val clickhouseConfig = TypesafeConfigUtils.extractSubConfig(config, clickhousePrefix, false)
- clickhouseConfig
- .entrySet()
- .foreach(entry => {
- val key = entry.getKey
- val value = String.valueOf(entry.getValue.unwrapped())
- properties.put(key, value)
+ var checkResult = checkAllExists(config, "host", "table", "database", "username", "password")
+ if (checkResult.isSuccess) {
+ if (hasSubConfig(config, clickhousePrefix)) {
+ extractSubConfig(config, clickhousePrefix, false).entrySet().foreach(e => {
+ properties.put(e.getKey, String.valueOf(e.getValue.unwrapped()))
})
- }
-
- if (nonExistsOptions.nonEmpty) {
- CheckResult.error(
- "please specify " + nonExistsOptions
- .map { option =>
- val (name, exists) = option
- "[" + name + "]"
- }
- .mkString(", ") + " as non-empty string")
- } else if (config.hasPath("username") && !config.hasPath("password") || config.hasPath(
- "password")
- && !config.hasPath("username")) {
- CheckResult.error("please specify username and password at the same time")
- } else {
- this.jdbcLink = String.format(
- "jdbc:clickhouse://%s/%s",
- config.getString("host"),
- config.getString("database"))
+ }
if (config.hasPath("username")) {
properties.put("user", config.getString("username"))
properties.put("password", config.getString("password"))
}
-
- val balanced: BalancedClickhouseDataSource =
- new BalancedClickhouseDataSource(jdbcLink, properties)
- val conn = balanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
-
- this.table = config.getString("table")
- this.tableSchema = getClickHouseSchema(conn, table)
-
- if (this.config.hasPath("fields")) {
- this.fields = config.getStringList("fields")
- acceptedClickHouseSchema()
- } else {
- CheckResult.success()
+ jdbcLink = s"jdbc:clickhouse://${config.getString("host")}/${config.getString("database")}"
+ val conn = new BalancedClickhouseDataSource(jdbcLink, properties).getConnection
+ .asInstanceOf[ClickHouseConnectionImpl]
+ table = config.getString("table")
+ tableSchema = getClickHouseSchema(conn, table)
+ if (config.hasPath("fields")) {
+ fields = config.getStringList("fields")
+ checkResult = acceptedClickHouseSchema()
}
}
+ checkResult
}
override def prepare(env: SparkEnvironment): Unit = {