You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/04/07 08:39:24 UTC

[incubator-seatunnel] branch dev updated: [Improve][Connector] clickhouse configuration extraction (#1671)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b50de38f [Improve][Connector] clickhouse configuration extraction (#1671)
b50de38f is described below

commit b50de38fcb6478349a69f7c62af891fc38368d93
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Thu Apr 7 16:39:18 2022 +0800

    [Improve][Connector] clickhouse configuration extraction (#1671)
---
 .../apache/seatunnel/spark/clickhouse/Config.scala | 109 +++++++++++++++++++++
 .../spark/clickhouse/sink/Clickhouse.scala         |  43 ++++----
 .../spark/clickhouse/sink/ClickhouseFile.scala     |  47 ++++-----
 3 files changed, 155 insertions(+), 44 deletions(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/Config.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/Config.scala
new file mode 100644
index 00000000..75eb715f
--- /dev/null
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/Config.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.spark.clickhouse
+
+/**
+ * Configurations for Clickhouse sink
+ */
+object Config extends Serializable {
+
+  /**
+   * Bulk size of clickhouse jdbc
+   */
+  val BULK_SIZE = "bulk_size"
+
+  /**
+   * Clickhouse jdbc retry time
+   */
+  val RETRY = "retry"
+
+  /**
+   * Clickhouse fields
+   */
+  val FIELDS = "fields"
+
+  /**
+   * Clickhouse server host
+   */
+  val HOST = "host"
+
+  /**
+   * Clickhouse table name
+   */
+  val TABLE = "table"
+
+  /**
+   * Clickhouse database name
+   */
+  val DATABASE = "database"
+
+  /**
+   * Clickhouse server username
+   */
+  val USERNAME = "username"
+
+  /**
+   * Clickhouse server password
+   */
+  val PASSWORD = "password"
+
+  /**
+   * Split mode when table is distributed engine
+   */
+  val SPLIT_MODE = "split_mode"
+
+  /**
+   * When split_mode is true, the sharding_key use for split
+   */
+  val SHARDING_KEY = "sharding_key"
+
+  /**
+   * The retry code when use clickhouse jdbc
+   */
+  val RETRY_CODES = "retry_codes"
+
+  /**
+   * ClickhouseFile sink connector used clickhouse-local program's path
+   */
+  val CLICKHOUSE_LOCAL_PATH = "clickhouse_local_path"
+
+  /**
+   * The method of copy Clickhouse file
+   */
+  val COPY_METHOD = "copy_method"
+
+  /**
+   * The size of each batch read temporary data into local file.
+   */
+  val TMP_BATCH_CACHE_LINE = "tmp_batch_cache_line"
+
+  /**
+   * Clickhouse server node is free-password.
+   */
+  val NODE_FREE_PASSWORD = "node_free_password"
+
+  /**
+   * The password of Clickhouse server node
+   */
+  val NODE_PASS = "node_pass"
+
+  /**
+   * The address of Clickhouse server node
+   */
+  val NODE_ADDRESS = "node_address"
+}
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
index f62714f2..bb7dd890 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
@@ -34,6 +34,7 @@ import org.apache.seatunnel.common.config.TypesafeConfigUtils.{extractSubConfig,
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.seatunnel.spark.clickhouse.Config.{BULK_SIZE, DATABASE, FIELDS, HOST, PASSWORD, RETRY, RETRY_CODES, SHARDING_KEY, SPLIT_MODE, TABLE, USERNAME}
 import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse.{Shard, acceptedClickHouseSchema, distributedEngine, getClickHouseDistributedTable, getClickHouseSchema, getClickhouseConnection, getClusterShardList, getDefaultValue, getRowShard}
 import org.apache.spark.sql.{Dataset, Row}
 import ru.yandex.clickhouse.{BalancedClickhouseDataSource, ClickHouseConnectionImpl, ClickHousePreparedStatementImpl}
@@ -69,10 +70,10 @@ class Clickhouse extends SparkBatchSink {
 
   override def output(data: Dataset[Row], environment: SparkEnvironment): Unit = {
     val dfFields = data.schema.fieldNames
-    val bulkSize = config.getInt("bulk_size")
-    val retry = config.getInt("retry")
+    val bulkSize = config.getInt(BULK_SIZE)
+    val retry = config.getInt(RETRY)
 
-    if (!config.hasPath("fields")) {
+    if (!config.hasPath(FIELDS)) {
       fields = dfFields.toList
       initSQL = initPrepareSQL()
     }
@@ -109,7 +110,7 @@ class Clickhouse extends SparkBatchSink {
   }
 
   override def checkConfig(): CheckResult = {
-    var checkResult = checkAllExists(config, "host", "table", "database", "username", "password")
+    var checkResult = checkAllExists(config, HOST, TABLE, DATABASE, USERNAME, PASSWORD)
     if (checkResult.isSuccess) {
       if (hasSubConfig(config, clickhousePrefix)) {
         extractSubConfig(config, clickhousePrefix, false).entrySet().foreach(e => {
@@ -117,20 +118,20 @@ class Clickhouse extends SparkBatchSink {
         })
       }
 
-      properties.put("user", config.getString("username"))
-      properties.put("password", config.getString("password"))
+      properties.put("user", config.getString(USERNAME))
+      properties.put("password", config.getString(PASSWORD))
 
-      if (config.hasPath("split_mode")) {
-        splitMode = config.getBoolean("split_mode")
+      if (config.hasPath(SPLIT_MODE)) {
+        splitMode = config.getBoolean(SPLIT_MODE)
       }
-      val database = config.getString("database")
-      val host = config.getString("host")
+      val database = config.getString(DATABASE)
+      val host = config.getString(HOST)
       val conn = getClickhouseConnection(host, database, properties)
       val hostAndPort = host.split(":")
-      this.table = config.getString("table")
+      this.table = config.getString(TABLE)
       this.tableSchema = getClickHouseSchema(conn, this.table).toMap
       if (splitMode) {
-        val tableName = config.getString("table")
+        val tableName = config.getString(TABLE)
         val localTable = getClickHouseDistributedTable(conn, database, tableName)
         if (Objects.isNull(localTable)) {
           CheckResult.error(s"split mode only support table which engine is '$distributedEngine' at now")
@@ -143,8 +144,8 @@ class Clickhouse extends SparkBatchSink {
             weight += elem.shardWeight
           }
           this.shardWeightCount = weight
-          if (config.hasPath("sharding_key") && StringUtils.isNotEmpty(config.getString("sharding_key"))) {
-            this.shardKey = config.getString("sharding_key")
+          if (config.hasPath(SHARDING_KEY) && StringUtils.isNotEmpty(config.getString(SHARDING_KEY))) {
+            this.shardKey = config.getString(SHARDING_KEY)
           }
         }
       } else {
@@ -160,8 +161,8 @@ class Clickhouse extends SparkBatchSink {
           this.shardKeyType = this.tableSchema(this.shardKey)
         }
       }
-      if (this.config.hasPath("fields")) {
-        this.fields = config.getStringList("fields")
+      if (this.config.hasPath(FIELDS)) {
+        this.fields = config.getStringList(FIELDS)
         checkResult = acceptedClickHouseSchema(this.fields.toList, this.tableSchema, this.table)
       }
     }
@@ -169,18 +170,18 @@ class Clickhouse extends SparkBatchSink {
   }
 
   override def prepare(env: SparkEnvironment): Unit = {
-    if (config.hasPath("fields")) {
+    if (config.hasPath(FIELDS)) {
       this.initSQL = initPrepareSQL()
     }
 
     val defaultConfig = ConfigFactory.parseMap(
       Map(
-        "bulk_size" -> 20000,
+        BULK_SIZE -> 20000,
         // "retry_codes" -> util.Arrays.asList(ClickHouseErrorCode.NETWORK_ERROR.code),
-        "retry_codes" -> util.Arrays.asList(),
-        "retry" -> 1))
+        RETRY_CODES -> util.Arrays.asList(),
+        RETRY -> 1))
     config = config.withFallback(defaultConfig)
-    retryCodes = config.getIntList("retry_codes")
+    retryCodes = config.getIntList(RETRY_CODES)
   }
 
 
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
index 676fb5d0..14e06327 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
@@ -26,6 +26,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.seatunnel.spark.clickhouse.Config.{CLICKHOUSE_LOCAL_PATH, COPY_METHOD, DATABASE, FIELDS, HOST, NODE_ADDRESS, NODE_FREE_PASSWORD, NODE_PASS, PASSWORD, SHARDING_KEY, TABLE, TMP_BATCH_CACHE_LINE, USERNAME}
 import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse._
 import org.apache.seatunnel.spark.clickhouse.sink.ClickhouseFile.{CLICKHOUSE_FILE_PREFIX, LOGGER, UUID_LENGTH, getClickhouseTableInfo}
 import org.apache.seatunnel.spark.clickhouse.sink.Table
@@ -66,7 +67,7 @@ class ClickhouseFile extends SparkBatchSink {
 
   override def output(data: Dataset[Row], env: SparkEnvironment): Unit = {
 
-    if (!config.hasPath("fields")) {
+    if (!config.hasPath(FIELDS)) {
       this.fields = data.schema.fieldNames.toList
     }
 
@@ -208,23 +209,23 @@ class ClickhouseFile extends SparkBatchSink {
   }
 
   override def checkConfig(): CheckResult = {
-    var checkResult = checkAllExists(config, "host", "table", "database", "username", "password",
-      "clickhouse_local_path")
+    var checkResult = checkAllExists(config, HOST, TABLE, DATABASE, USERNAME, PASSWORD,
+      CLICKHOUSE_LOCAL_PATH)
     if (checkResult.isSuccess) {
-      clickhouseLocalPath = config.getString("clickhouse_local_path")
-      properties.put("user", config.getString("username"))
-      properties.put("password", config.getString("password"))
-      val host = config.getString("host")
-      val database = config.getString("database")
-      val table = config.getString("table")
+      clickhouseLocalPath = config.getString(CLICKHOUSE_LOCAL_PATH)
+      properties.put("user", config.getString(USERNAME))
+      properties.put("password", config.getString(PASSWORD))
+      val host = config.getString(HOST)
+      val database = config.getString(DATABASE)
+      val table = config.getString(TABLE)
       val conn = getClickhouseConnection(host, database, properties)
 
-      if (config.hasPath("copy_method")) {
-        this.copyFileMethod = getCopyMethod(config.getString("copy_method"))
+      if (config.hasPath(COPY_METHOD)) {
+        this.copyFileMethod = getCopyMethod(config.getString(COPY_METHOD))
       }
 
-      if (config.hasPath("tmp_batch_cache_line")) {
-        this.tmpBatchCacheLine = config.getInt("tmp_batch_cache_line")
+      if (config.hasPath(TMP_BATCH_CACHE_LINE)) {
+        this.tmpBatchCacheLine = config.getInt(TMP_BATCH_CACHE_LINE)
       }
 
       val (result, tableInfo) = getClickhouseTableInfo(conn, database, table)
@@ -233,16 +234,16 @@ class ClickhouseFile extends SparkBatchSink {
       } else {
         this.table = tableInfo
         tableInfo.initTableInfo(host, conn)
-        tableInfo.initShardDataPath(config.getString("username"), config.getString("password"))
+        tableInfo.initShardDataPath(config.getString(USERNAME), config.getString(PASSWORD))
         // check config of node password whether completed or not
-        if (config.hasPath("node_free_password") && config.getBoolean("node_free_password")) {
+        if (config.hasPath(NODE_FREE_PASSWORD) && config.getBoolean(NODE_FREE_PASSWORD)) {
           this.freePass = true
-        } else if (config.hasPath("node_pass")) {
-          val nodePass = config.getObjectList("node_pass")
+        } else if (config.hasPath(NODE_PASS)) {
+          val nodePass = config.getObjectList(NODE_PASS)
           val nodePassMap = mutable.Map[String, String]()
           nodePass.foreach(np => {
-            val address = np.toConfig.getString("node_address")
-            val password = np.toConfig.getString("password")
+            val address = np.toConfig.getString(NODE_ADDRESS)
+            val password = np.toConfig.getString(PASSWORD)
             nodePassMap(address) = password
           })
           this.nodePass = nodePassMap.toMap
@@ -254,13 +255,13 @@ class ClickhouseFile extends SparkBatchSink {
         }
         if (checkResult.isSuccess) {
           // check sharding method
-          if (config.hasPath("sharding_key") && StringUtils.isNotEmpty(config.getString("sharding_key"))) {
-            this.table.shardKey = config.getString("sharding_key")
+          if (config.hasPath(SHARDING_KEY) && StringUtils.isNotEmpty(config.getString(SHARDING_KEY))) {
+            this.table.shardKey = config.getString(SHARDING_KEY)
           }
           checkResult = this.table.prepareShardInfo(conn)
           if (checkResult.isSuccess) {
-            if (this.config.hasPath("fields")) {
-              this.fields = config.getStringList("fields").toList
+            if (this.config.hasPath(FIELDS)) {
+              this.fields = config.getStringList(FIELDS).toList
               checkResult = acceptedClickHouseSchema(this.fields, JavaConversions.mapAsScalaMap(this.table
                 .tableSchema).toMap, this.table.name)
             }