You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/07 03:58:02 UTC

[incubator-seatunnel] branch dev updated: [Bug] [Connector] Clickhouse sink can not work correctly when use multiple hosts (#1808) (#1809)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ea40a7ff [Bug] [Connector] Clickhouse sink can not work correctly when use multiple hosts (#1808) (#1809)
ea40a7ff is described below

commit ea40a7ff605534b964290a1680146e3685bb6b60
Author: xpleaf <xp...@163.com>
AuthorDate: Sat May 7 11:57:56 2022 +0800

    [Bug] [Connector] Clickhouse sink can not work correctly when use multiple hosts (#1808) (#1809)
---
 .../spark/clickhouse/sink/Clickhouse.scala         | 47 +++++++++++++++++-----
 .../spark/clickhouse/sink/ClickhouseFile.scala     |  6 +--
 .../seatunnel/spark/clickhouse/sink/Table.scala    |  9 ++---
 3 files changed, 43 insertions(+), 19 deletions(-)

diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
index 202979a4..8d5c2752 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
@@ -35,7 +35,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
 import org.apache.seatunnel.spark.clickhouse.Config.{BULK_SIZE, DATABASE, FIELDS, HOST, PASSWORD, RETRY, RETRY_CODES, SHARDING_KEY, SPLIT_MODE, TABLE, USERNAME}
-import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse.{Shard, acceptedClickHouseSchema, distributedEngine, getClickHouseDistributedTable, getClickHouseSchema, getClickhouseConnection, getClusterShardList, getDefaultValue, getRowShard}
+import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse.{Shard, acceptedClickHouseSchema, distributedEngine, getClickHouseDistributedTable, getClickHouseSchema, getClickhouseConnection, getClusterShardList, getDefaultValue, getRowShard, parseHost}
 import org.apache.spark.sql.{Dataset, Row}
 import ru.yandex.clickhouse.{BalancedClickhouseDataSource, ClickHouseArray, ClickHouseConnectionImpl, ClickHousePreparedStatementImpl}
 import ru.yandex.clickhouse.except.ClickHouseException
@@ -57,6 +57,7 @@ class Clickhouse extends SparkBatchSink {
   private val shards = new util.TreeMap[Int, Shard]()
   private val clickhousePrefix = "clickhouse."
   private val properties: Properties = new Properties()
+  private var multiHosts: String = _
 
   // used for split mode
   private var splitMode: Boolean = false
@@ -80,7 +81,12 @@ class Clickhouse extends SparkBatchSink {
     data.foreachPartition { iter: Iterator[Row] =>
 
       val statementMap = this.shards.map(s => {
-        val executorBalanced = new BalancedClickhouseDataSource(s._2.jdbc, this.properties)
+        // if use splitMode, jdbcUrl should use the shard itself url, or else should use multiHosts
+        var jdbcUrl = String.format("jdbc:clickhouse://%s/%s", multiHosts, s._2.database)
+        if (splitMode) {
+          jdbcUrl = s._2.jdbc
+        }
+        val executorBalanced = new BalancedClickhouseDataSource(jdbcUrl, this.properties)
         val executorConn = executorBalanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
         (s._2, executorConn.prepareStatement(this.initSQL).asInstanceOf[ClickHousePreparedStatementImpl])
       }).toMap
@@ -125,9 +131,9 @@ class Clickhouse extends SparkBatchSink {
         splitMode = config.getBoolean(SPLIT_MODE)
       }
       val database = config.getString(DATABASE)
-      val host = config.getString(HOST)
-      val conn = getClickhouseConnection(host, database, properties)
-      val hostAndPort = host.split(":")
+      val hosts = parseHost(config.getString(HOST))
+      multiHosts = hosts.map(_.hostAndPort).mkString(",")
+      val conn = getClickhouseConnection(multiHosts, database, properties)
       this.table = config.getString(TABLE)
       this.tableSchema = getClickHouseSchema(conn, this.table).toMap
       if (splitMode) {
@@ -137,7 +143,7 @@ class Clickhouse extends SparkBatchSink {
           CheckResult.error(s"split mode only support table which engine is '$distributedEngine' at now")
         } else {
           this.shardTable = localTable.table
-          val shardList = getClusterShardList(conn, localTable.clusterName, localTable.database, hostAndPort(1))
+          val shardList = getClusterShardList(conn, localTable.clusterName, localTable.database, hosts)
           var weight = 0
           for (elem <- shardList) {
             this.shards(weight) = elem
@@ -149,8 +155,8 @@ class Clickhouse extends SparkBatchSink {
           }
         }
       } else {
-        // only one connection
-        this.shards(0) = Shard(1, 1, 1, hostAndPort(0), hostAndPort(0), hostAndPort(1), database)
+        // only one connection, just use the first host in hosts, as it will be ignored when output data
+        this.shards(0) = Shard(1, 1, 1, hosts.head.host, hosts.head.host, hosts.head.port, database)
       }
 
       if (StringUtils.isNotEmpty(this.shardKey)) {
@@ -365,18 +371,33 @@ object Clickhouse {
     }
   }
 
+  def parseHost(host: String): List[HostAndPort] = {
+    host.split(",")
+      .map(_.trim)
+      .map(_.split(":"))
+      .map(hostAndPort => HostAndPort(hostAndPort(0), hostAndPort(1)))
+      .toList
+  }
+
   def getClusterShardList(conn: ClickHouseConnectionImpl, clusterName: String,
-                          database: String, port: String): ListBuffer[Shard] = {
+                          database: String, hosts: List[HostAndPort]): ListBuffer[Shard] = {
     val rs = conn.createStatement().executeQuery(
       s"select shard_num,shard_weight,replica_num,host_name,host_address," +
         s"port from system.clusters where cluster = '$clusterName'")
     // TODO The port will be used for data communication of the tcp protocol in the future
     // port is tcp protocol, need http protocol port at now
     val nodeList = mutable.ListBuffer[Shard]()
+    val defaultPort = hosts.head.port
     while (rs.next()) {
+      val hostname = rs.getString(4)
+      val hostAddress = rs.getString(5)
+      val port = hosts.toStream
+        .filter(hostAndPort => hostname.equals(hostAndPort.host) || hostAddress.equals(hostAndPort.host))
+        .map(_.port)
+        .headOption
+        .getOrElse(defaultPort)
       nodeList += Shard(rs.getInt(1), rs.getInt(2),
-        rs.getInt(3), rs.getString(4), rs.getString(5),
-        port, database)
+        rs.getInt(3), hostname, hostAddress, port, database)
     }
     nodeList
   }
@@ -517,4 +538,8 @@ object Clickhouse {
                    database: String) extends Serializable {
     val jdbc = s"jdbc:clickhouse://$hostAddress:$port/$database"
   }
+
+  case class HostAndPort(host: String, port: String) {
+    val hostAndPort = s"$host:$port"
+  }
 }
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
index 14e06327..095b318a 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
@@ -215,10 +215,10 @@ class ClickhouseFile extends SparkBatchSink {
       clickhouseLocalPath = config.getString(CLICKHOUSE_LOCAL_PATH)
       properties.put("user", config.getString(USERNAME))
       properties.put("password", config.getString(PASSWORD))
-      val host = config.getString(HOST)
+      val hosts = parseHost(config.getString(HOST))
       val database = config.getString(DATABASE)
       val table = config.getString(TABLE)
-      val conn = getClickhouseConnection(host, database, properties)
+      val conn = getClickhouseConnection(hosts.map(_.hostAndPort).mkString(","), database, properties)
 
       if (config.hasPath(COPY_METHOD)) {
         this.copyFileMethod = getCopyMethod(config.getString(COPY_METHOD))
@@ -233,7 +233,7 @@ class ClickhouseFile extends SparkBatchSink {
         checkResult = result
       } else {
         this.table = tableInfo
-        tableInfo.initTableInfo(host, conn)
+        tableInfo.initTableInfo(hosts, conn)
         tableInfo.initShardDataPath(config.getString(USERNAME), config.getString(PASSWORD))
         // check config of node password whether completed or not
         if (config.hasPath(NODE_FREE_PASSWORD) && config.getBoolean(NODE_FREE_PASSWORD)) {
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Table.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Table.scala
index 5a8c3f4c..89f0a4d8 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Table.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Table.scala
@@ -19,7 +19,7 @@ package org.apache.seatunnel.spark.clickhouse.sink
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse.{Shard, distributedEngine, getClickHouseDistributedTable, getClickHouseSchema, getClickhouseConnection, getClusterShardList}
+import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse.{HostAndPort, Shard, distributedEngine, getClickHouseDistributedTable, getClickHouseSchema, getClickhouseConnection, getClusterShardList, parseHost}
 import org.apache.seatunnel.spark.clickhouse.sink.ClickhouseFile.getClickhouseTableInfo
 import ru.yandex.clickhouse.ClickHouseConnectionImpl
 
@@ -39,13 +39,12 @@ class Table(val name: String, val database: String, val engine: String, val crea
   var shardKeyType: String = _
   var localCreateTableDDL: String = createTableDDL
 
-  def initTableInfo(host: String, conn: ClickHouseConnectionImpl): Unit = {
+  def initTableInfo(hosts: List[HostAndPort], conn: ClickHouseConnectionImpl): Unit = {
     if (shards.size() == 0) {
-      val hostAndPort = host.split(":")
       if (distributedEngine.equals(this.engine)) {
         val localTable = getClickHouseDistributedTable(conn, database, name)
         this.localTable = localTable.table
-        val shardList = getClusterShardList(conn, localTable.clusterName, localTable.database, hostAndPort(1))
+        val shardList = getClusterShardList(conn, localTable.clusterName, localTable.database, hosts)
         var weight = 0
         for (elem <- shardList) {
           this.shards.put(weight, elem)
@@ -54,7 +53,7 @@ class Table(val name: String, val database: String, val engine: String, val crea
         this.shardWeightCount = weight
         this.localCreateTableDDL = getClickhouseTableInfo(conn, localTable.database, localTable.table)._2.createTableDDL
       } else {
-        this.shards.put(0, Shard(1, 1, 1, hostAndPort(0), hostAndPort(0), hostAndPort(1), database))
+        this.shards.put(0, Shard(1, 1, 1, hosts.head.host, hosts.head.host, hosts.head.port, database))
       }
     }
   }