You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2022/05/07 03:58:02 UTC
[incubator-seatunnel] branch dev updated: [Bug] [Connector] Clickhouse sink can not work correctly when use multiple hosts (#1808) (#1809)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ea40a7ff [Bug] [Connector] Clickhouse sink can not work correctly when use multiple hosts (#1808) (#1809)
ea40a7ff is described below
commit ea40a7ff605534b964290a1680146e3685bb6b60
Author: xpleaf <xp...@163.com>
AuthorDate: Sat May 7 11:57:56 2022 +0800
[Bug] [Connector] Clickhouse sink can not work correctly when use multiple hosts (#1808) (#1809)
---
.../spark/clickhouse/sink/Clickhouse.scala | 47 +++++++++++++++++-----
.../spark/clickhouse/sink/ClickhouseFile.scala | 6 +--
.../seatunnel/spark/clickhouse/sink/Table.scala | 9 ++---
3 files changed, 43 insertions(+), 19 deletions(-)
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
index 202979a4..8d5c2752 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Clickhouse.scala
@@ -35,7 +35,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSink
import org.apache.seatunnel.spark.clickhouse.Config.{BULK_SIZE, DATABASE, FIELDS, HOST, PASSWORD, RETRY, RETRY_CODES, SHARDING_KEY, SPLIT_MODE, TABLE, USERNAME}
-import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse.{Shard, acceptedClickHouseSchema, distributedEngine, getClickHouseDistributedTable, getClickHouseSchema, getClickhouseConnection, getClusterShardList, getDefaultValue, getRowShard}
+import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse.{Shard, acceptedClickHouseSchema, distributedEngine, getClickHouseDistributedTable, getClickHouseSchema, getClickhouseConnection, getClusterShardList, getDefaultValue, getRowShard, parseHost}
import org.apache.spark.sql.{Dataset, Row}
import ru.yandex.clickhouse.{BalancedClickhouseDataSource, ClickHouseArray, ClickHouseConnectionImpl, ClickHousePreparedStatementImpl}
import ru.yandex.clickhouse.except.ClickHouseException
@@ -57,6 +57,7 @@ class Clickhouse extends SparkBatchSink {
private val shards = new util.TreeMap[Int, Shard]()
private val clickhousePrefix = "clickhouse."
private val properties: Properties = new Properties()
+ private var multiHosts: String = _
// used for split mode
private var splitMode: Boolean = false
@@ -80,7 +81,12 @@ class Clickhouse extends SparkBatchSink {
data.foreachPartition { iter: Iterator[Row] =>
val statementMap = this.shards.map(s => {
- val executorBalanced = new BalancedClickhouseDataSource(s._2.jdbc, this.properties)
+ // if use splitMode, jdbcUrl should use the shard itself url, or else should use multiHosts
+ var jdbcUrl = String.format("jdbc:clickhouse://%s/%s", multiHosts, s._2.database)
+ if (splitMode) {
+ jdbcUrl = s._2.jdbc
+ }
+ val executorBalanced = new BalancedClickhouseDataSource(jdbcUrl, this.properties)
val executorConn = executorBalanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
(s._2, executorConn.prepareStatement(this.initSQL).asInstanceOf[ClickHousePreparedStatementImpl])
}).toMap
@@ -125,9 +131,9 @@ class Clickhouse extends SparkBatchSink {
splitMode = config.getBoolean(SPLIT_MODE)
}
val database = config.getString(DATABASE)
- val host = config.getString(HOST)
- val conn = getClickhouseConnection(host, database, properties)
- val hostAndPort = host.split(":")
+ val hosts = parseHost(config.getString(HOST))
+ multiHosts = hosts.map(_.hostAndPort).mkString(",")
+ val conn = getClickhouseConnection(multiHosts, database, properties)
this.table = config.getString(TABLE)
this.tableSchema = getClickHouseSchema(conn, this.table).toMap
if (splitMode) {
@@ -137,7 +143,7 @@ class Clickhouse extends SparkBatchSink {
CheckResult.error(s"split mode only support table which engine is '$distributedEngine' at now")
} else {
this.shardTable = localTable.table
- val shardList = getClusterShardList(conn, localTable.clusterName, localTable.database, hostAndPort(1))
+ val shardList = getClusterShardList(conn, localTable.clusterName, localTable.database, hosts)
var weight = 0
for (elem <- shardList) {
this.shards(weight) = elem
@@ -149,8 +155,8 @@ class Clickhouse extends SparkBatchSink {
}
}
} else {
- // only one connection
- this.shards(0) = Shard(1, 1, 1, hostAndPort(0), hostAndPort(0), hostAndPort(1), database)
+ // only one connection, just use the first host in hosts, as it will be ignored when output data
+ this.shards(0) = Shard(1, 1, 1, hosts.head.host, hosts.head.host, hosts.head.port, database)
}
if (StringUtils.isNotEmpty(this.shardKey)) {
@@ -365,18 +371,33 @@ object Clickhouse {
}
}
+ def parseHost(host: String): List[HostAndPort] = {
+ host.split(",")
+ .map(_.trim)
+ .map(_.split(":"))
+ .map(hostAndPort => HostAndPort(hostAndPort(0), hostAndPort(1)))
+ .toList
+ }
+
def getClusterShardList(conn: ClickHouseConnectionImpl, clusterName: String,
- database: String, port: String): ListBuffer[Shard] = {
+ database: String, hosts: List[HostAndPort]): ListBuffer[Shard] = {
val rs = conn.createStatement().executeQuery(
s"select shard_num,shard_weight,replica_num,host_name,host_address," +
s"port from system.clusters where cluster = '$clusterName'")
// TODO The port will be used for data communication of the tcp protocol in the future
// port is tcp protocol, need http protocol port at now
val nodeList = mutable.ListBuffer[Shard]()
+ val defaultPort = hosts.head.port
while (rs.next()) {
+ val hostname = rs.getString(4)
+ val hostAddress = rs.getString(5)
+ val port = hosts.toStream
+ .filter(hostAndPort => hostname.equals(hostAndPort.host) || hostAddress.equals(hostAndPort.host))
+ .map(_.port)
+ .headOption
+ .getOrElse(defaultPort)
nodeList += Shard(rs.getInt(1), rs.getInt(2),
- rs.getInt(3), rs.getString(4), rs.getString(5),
- port, database)
+ rs.getInt(3), hostname, hostAddress, port, database)
}
nodeList
}
@@ -517,4 +538,8 @@ object Clickhouse {
database: String) extends Serializable {
val jdbc = s"jdbc:clickhouse://$hostAddress:$port/$database"
}
+
+ case class HostAndPort(host: String, port: String) {
+ val hostAndPort = s"$host:$port"
+ }
}
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
index 14e06327..095b318a 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/ClickhouseFile.scala
@@ -215,10 +215,10 @@ class ClickhouseFile extends SparkBatchSink {
clickhouseLocalPath = config.getString(CLICKHOUSE_LOCAL_PATH)
properties.put("user", config.getString(USERNAME))
properties.put("password", config.getString(PASSWORD))
- val host = config.getString(HOST)
+ val hosts = parseHost(config.getString(HOST))
val database = config.getString(DATABASE)
val table = config.getString(TABLE)
- val conn = getClickhouseConnection(host, database, properties)
+ val conn = getClickhouseConnection(hosts.map(_.hostAndPort).mkString(","), database, properties)
if (config.hasPath(COPY_METHOD)) {
this.copyFileMethod = getCopyMethod(config.getString(COPY_METHOD))
@@ -233,7 +233,7 @@ class ClickhouseFile extends SparkBatchSink {
checkResult = result
} else {
this.table = tableInfo
- tableInfo.initTableInfo(host, conn)
+ tableInfo.initTableInfo(hosts, conn)
tableInfo.initShardDataPath(config.getString(USERNAME), config.getString(PASSWORD))
// check config of node password whether completed or not
if (config.hasPath(NODE_FREE_PASSWORD) && config.getBoolean(NODE_FREE_PASSWORD)) {
diff --git a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Table.scala b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Table.scala
index 5a8c3f4c..89f0a4d8 100644
--- a/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Table.scala
+++ b/seatunnel-connectors/seatunnel-connectors-spark/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/clickhouse/sink/Table.scala
@@ -19,7 +19,7 @@ package org.apache.seatunnel.spark.clickhouse.sink
import org.apache.commons.lang3.StringUtils
import org.apache.seatunnel.common.config.CheckResult
-import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse.{Shard, distributedEngine, getClickHouseDistributedTable, getClickHouseSchema, getClickhouseConnection, getClusterShardList}
+import org.apache.seatunnel.spark.clickhouse.sink.Clickhouse.{HostAndPort, Shard, distributedEngine, getClickHouseDistributedTable, getClickHouseSchema, getClickhouseConnection, getClusterShardList, parseHost}
import org.apache.seatunnel.spark.clickhouse.sink.ClickhouseFile.getClickhouseTableInfo
import ru.yandex.clickhouse.ClickHouseConnectionImpl
@@ -39,13 +39,12 @@ class Table(val name: String, val database: String, val engine: String, val crea
var shardKeyType: String = _
var localCreateTableDDL: String = createTableDDL
- def initTableInfo(host: String, conn: ClickHouseConnectionImpl): Unit = {
+ def initTableInfo(hosts: List[HostAndPort], conn: ClickHouseConnectionImpl): Unit = {
if (shards.size() == 0) {
- val hostAndPort = host.split(":")
if (distributedEngine.equals(this.engine)) {
val localTable = getClickHouseDistributedTable(conn, database, name)
this.localTable = localTable.table
- val shardList = getClusterShardList(conn, localTable.clusterName, localTable.database, hostAndPort(1))
+ val shardList = getClusterShardList(conn, localTable.clusterName, localTable.database, hosts)
var weight = 0
for (elem <- shardList) {
this.shards.put(weight, elem)
@@ -54,7 +53,7 @@ class Table(val name: String, val database: String, val engine: String, val crea
this.shardWeightCount = weight
this.localCreateTableDDL = getClickhouseTableInfo(conn, localTable.database, localTable.table)._2.createTableDDL
} else {
- this.shards.put(0, Shard(1, 1, 1, hostAndPort(0), hostAndPort(0), hostAndPort(1), database))
+ this.shards.put(0, Shard(1, 1, 1, hosts.head.host, hosts.head.host, hosts.head.port, database))
}
}
}