You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/02/24 13:57:09 UTC

[incubator-seatunnel] branch dev updated: [Improve] [config] Optimize clickhouse connector checkConfig (#1152)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e59d60e  [Improve] [config]  Optimize clickhouse connector checkConfig (#1152)
e59d60e is described below

commit e59d60e500d29e93acda06dacf33675ce168f53c
Author: Simon <zh...@cvte.com>
AuthorDate: Thu Feb 24 21:56:25 2022 +0800

    [Improve] [config]  Optimize clickhouse connector checkConfig (#1152)
---
 .../apache/seatunnel/spark/sink/Clickhouse.scala   | 67 ++++++----------------
 1 file changed, 18 insertions(+), 49 deletions(-)

diff --git a/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala b/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
index af4f8a7..50ac363 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
@@ -27,7 +27,9 @@ import scala.collection.immutable.HashMap
 import scala.util.{Failure, Success, Try}
 import scala.util.matching.Regex
 
-import org.apache.seatunnel.common.config.{CheckResult, TypesafeConfigUtils}
+import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
+import org.apache.seatunnel.common.config.CheckResult
+import org.apache.seatunnel.common.config.TypesafeConfigUtils.{extractSubConfig, hasSubConfig}
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
@@ -43,7 +45,6 @@ class Clickhouse extends SparkBatchSink {
   var table: String = _
   var fields: java.util.List[String] = _
   var retryCodes: java.util.List[Integer] = _
-  //  var config: Config = ConfigFactory.empty()
   val clickhousePrefix = "clickhouse."
   val properties: Properties = new Properties()
 
@@ -79,60 +80,28 @@ class Clickhouse extends SparkBatchSink {
   }
 
   override def checkConfig(): CheckResult = {
-    val requiredOptions = List("host", "table", "database")
-    val nonExistsOptions =
-      requiredOptions.map(optionName => (optionName, config.hasPath(optionName))).filter { p =>
-        val (optionName, exists) = p
-        !exists
-      }
-
-    if (TypesafeConfigUtils.hasSubConfig(config, clickhousePrefix)) {
-      val clickhouseConfig = TypesafeConfigUtils.extractSubConfig(config, clickhousePrefix, false)
-      clickhouseConfig
-        .entrySet()
-        .foreach(entry => {
-          val key = entry.getKey
-          val value = String.valueOf(entry.getValue.unwrapped())
-          properties.put(key, value)
+    var checkResult = checkAllExists(config, "host", "table", "database", "username", "password")
+    if (checkResult.isSuccess) {
+      if (hasSubConfig(config, clickhousePrefix)) {
+        extractSubConfig(config, clickhousePrefix, false).entrySet().foreach(e => {
+          properties.put(e.getKey, String.valueOf(e.getValue.unwrapped()))
         })
-    }
-
-    if (nonExistsOptions.nonEmpty) {
-      CheckResult.error(
-        "please specify " + nonExistsOptions
-          .map { option =>
-            val (name, exists) = option
-            "[" + name + "]"
-          }
-          .mkString(", ") + " as non-empty string")
-    } else if (config.hasPath("username") && !config.hasPath("password") || config.hasPath(
-        "password")
-      && !config.hasPath("username")) {
-      CheckResult.error("please specify username and password at the same time")
-    } else {
-      this.jdbcLink = String.format(
-        "jdbc:clickhouse://%s/%s",
-        config.getString("host"),
-        config.getString("database"))
+      }
       if (config.hasPath("username")) {
         properties.put("user", config.getString("username"))
         properties.put("password", config.getString("password"))
       }
-
-      val balanced: BalancedClickhouseDataSource =
-        new BalancedClickhouseDataSource(jdbcLink, properties)
-      val conn = balanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
-
-      this.table = config.getString("table")
-      this.tableSchema = getClickHouseSchema(conn, table)
-
-      if (this.config.hasPath("fields")) {
-        this.fields = config.getStringList("fields")
-        acceptedClickHouseSchema()
-      } else {
-        CheckResult.success()
+      jdbcLink = s"jdbc:clickhouse://${config.getString("host")}/${config.getString("database")}"
+      val conn = new BalancedClickhouseDataSource(jdbcLink, properties).getConnection
+        .asInstanceOf[ClickHouseConnectionImpl]
+      table = config.getString("table")
+      tableSchema = getClickHouseSchema(conn, table)
+      if (config.hasPath("fields")) {
+        fields = config.getStringList("fields")
+        checkResult = acceptedClickHouseSchema()
       }
     }
+    checkResult
   }
 
   override def prepare(env: SparkEnvironment): Unit = {