You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wu...@apache.org on 2022/03/04 06:11:05 UTC
[incubator-seatunnel] branch dev updated: [Feature-1358][Connector] Clickhouse add support split mode (#1358) (#1369)
This is an automated email from the ASF dual-hosted git repository.
wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cb6cc1a [Feature-1358][Connector] Clickhouse add support split mode (#1358) (#1369)
cb6cc1a is described below
commit cb6cc1a0c6759132c704f72ace149cad7fe51d51
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Fri Mar 4 14:11:00 2022 +0800
[Feature-1358][Connector] Clickhouse add support split mode (#1358) (#1369)
* [ST-1358][feat] Clickhouse sink support split mode
Support split mode to write data to clickhouse.
This closes #1358
* [ST-1358][feat] Clickhouse sink support split mode
Update clickhouse sink split_mode doc
This closes #1358
* [ST-1358][feat] Clickhouse sink support split mode
fix fields check problem
This closes #1358
---
.../spark/configuration/sink-plugins/Clickhouse.md | 40 ++--
.../apache/seatunnel/spark/sink/Clickhouse.scala | 243 ++++++++++++++++-----
2 files changed, 219 insertions(+), 64 deletions(-)
diff --git a/docs/en/spark/configuration/sink-plugins/Clickhouse.md b/docs/en/spark/configuration/sink-plugins/Clickhouse.md
index 9dd8666..d561de2 100644
--- a/docs/en/spark/configuration/sink-plugins/Clickhouse.md
+++ b/docs/en/spark/configuration/sink-plugins/Clickhouse.md
@@ -8,19 +8,21 @@ Use [Clickhouse-jdbc](https://github.com/ClickHouse/clickhouse-jdbc) to correspo
## Options
-| name | type | required | default value |
-| -------------- | ------ | -------- | ------------- |
-| bulk_size | number | no | 20000 |
-| clickhouse.* | string | no | |
-| database | string | yes | - |
-| fields | array | no | - |
-| host | string | yes | - |
-| password | string | no | - |
-| retry | number | no | 1 |
-| retry_codes | array | no | [ ] |
-| table | string | yes | - |
-| username | string | no | - |
-| common-options | string | no | - |
+| name | type | required | default value |
+|----------------|---------| -------- |---------------|
+| bulk_size | number | no | 20000 |
+| clickhouse.* | string | no | |
+| database | string | yes | - |
+| fields | array | no | - |
+| host | string | yes | - |
+| password | string | no | - |
+| retry | number | no | 1 |
+| retry_codes | array | no | [ ] |
+| table | string | yes | - |
+| username | string | no | - |
+| split_mode | boolean | no | false |
+| sharding_key | string | no | - |
+| common-options | string | no | - |
### bulk_size [number]
@@ -66,6 +68,18 @@ In addition to the above mandatory parameters that must be specified by `clickho
The way to specify the parameter is to add the prefix `clickhouse.` to the original parameter name. For example, the way to specify `socket_timeout` is: `clickhouse.socket_timeout = 50000` . If these non-essential parameters are not specified, they will use the default values given by `clickhouse-jdbc`.
+### split_mode [boolean]
+
+This mode only support clickhouse table which engine is 'Distributed'. They will split distributed table
+data in seatunnel and perform the write directly on each shard. The shard weight define is clickhouse will be
+counted.
+
+### sharding_key [string]
+
+When use split_mode, which node to send data to is a problem, the default is random selection, but the
+'sharding_key' parameter can be used to specify the field for the sharding algorithm. This option only
+worked when 'split_mode' is true.
+
### common options [string]
Sink plugin common parameters, please refer to [Sink Plugin](./sink-plugin.md) for details
diff --git a/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala b/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
index f786cd1..338778c 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
@@ -16,14 +16,17 @@
*/
package org.apache.seatunnel.spark.sink
-import java.math.BigDecimal
+import net.jpountz.xxhash.{XXHash64, XXHashFactory}
+import org.apache.commons.lang3.StringUtils
+
+import java.math.{BigDecimal, BigInteger}
import java.sql.PreparedStatement
import java.text.SimpleDateFormat
import java.util
-import java.util.Properties
+import java.util.{Objects, Properties}
import scala.collection.JavaConversions._
import scala.collection.immutable.HashMap
-import scala.util.{Failure, Success, Try}
+import scala.util.{Failure, Random, Success, Try}
import scala.util.matching.Regex
import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
import org.apache.seatunnel.common.config.CheckResult
@@ -31,22 +34,38 @@ import org.apache.seatunnel.common.config.TypesafeConfigUtils.{extractSubConfig,
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
import org.apache.seatunnel.spark.SparkEnvironment
import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.seatunnel.spark.sink.Clickhouse.{DistributedEngine, Shard}
import org.apache.spark.sql.{Dataset, Row}
import ru.yandex.clickhouse.{BalancedClickhouseDataSource, ClickHouseConnectionImpl}
import ru.yandex.clickhouse.except.{ClickHouseException, ClickHouseUnknownException}
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicLong
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
import scala.annotation.tailrec
class Clickhouse extends SparkBatchSink {
- var tableSchema: Map[String, String] = new HashMap[String, String]()
- var jdbcLink: String = _
- var initSQL: String = _
- var table: String = _
- var fields: java.util.List[String] = _
- var retryCodes: java.util.List[Integer] = _
- val clickhousePrefix = "clickhouse."
- val properties: Properties = new Properties()
+ private var tableSchema: Map[String, String] = new HashMap[String, String]()
+ private var initSQL: String = _
+ private var table: String = _
+ private var fields: java.util.List[String] = _
+ private var retryCodes: java.util.List[Integer] = _
+ private val shards = new util.TreeMap[Int, Shard]()
+ private val clickhousePrefix = "clickhouse."
+ private val properties: Properties = new Properties()
+
+ // used for split mode
+ private var splitMode: Boolean = false
+ private val random = new Random()
+ private var shardKey: String = ""
+ private var shardKeyType: String = _
+ private var shardTable: String = _
+ private var shardWeightCount = 0
+ // used for split mode end
+
override def output(data: Dataset[Row], environment: SparkEnvironment): Unit = {
val dfFields = data.schema.fieldNames
@@ -58,24 +77,33 @@ class Clickhouse extends SparkBatchSink {
initSQL = initPrepareSQL()
}
data.foreachPartition { iter: Iterator[Row] =>
- val executorBalanced = new BalancedClickhouseDataSource(this.jdbcLink, this.properties)
- val executorConn = executorBalanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
- val statement = executorConn.prepareStatement(this.initSQL)
- var length = 0
+ val statementMap = this.shards.map(s => {
+ val executorBalanced = new BalancedClickhouseDataSource(s._2.jdbc, this.properties)
+ val executorConn = executorBalanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
+ (s._2, executorConn.prepareStatement(this.initSQL))
+ }).toMap
+
+ // hashInstance cannot be serialized, can only be created in a partition
+ val hashInstance = XXHashFactory.fastestInstance().hash64()
+ val lengthMap = statementMap.map(s => (s._1, new AtomicLong(0)))
for (item <- iter) {
- length += 1
+ val shard = getRowShard(hashInstance, item)
+ val statement = statementMap(shard)
renderStatement(fields, item, dfFields, statement)
statement.addBatch()
-
- if (length >= bulkSize) {
+ val length = lengthMap(shard)
+ if (length.addAndGet(1) >= bulkSize) {
execute(statement, retry)
- length = 0
+ length.set(0)
}
}
+ statementMap.foreach(s => {
+ execute(s._2, retry)
+ s._2.close()
+ })
- execute(statement, retry)
}
}
@@ -87,17 +115,56 @@ class Clickhouse extends SparkBatchSink {
properties.put(e.getKey, String.valueOf(e.getValue.unwrapped()))
})
}
- if (config.hasPath("username")) {
- properties.put("user", config.getString("username"))
- properties.put("password", config.getString("password"))
+
+ properties.put("user", config.getString("username"))
+ properties.put("password", config.getString("password"))
+
+ if (config.hasPath("split_mode")) {
+ splitMode = config.getBoolean("split_mode")
}
- jdbcLink = s"jdbc:clickhouse://${config.getString("host")}/${config.getString("database")}"
- val conn = new BalancedClickhouseDataSource(jdbcLink, properties).getConnection
- .asInstanceOf[ClickHouseConnectionImpl]
- table = config.getString("table")
- tableSchema = getClickHouseSchema(conn, table)
- if (config.hasPath("fields")) {
- fields = config.getStringList("fields")
+ val database = config.getString("database")
+ val host = config.getString("host")
+ val globalJdbc = String.format("jdbc:clickhouse://%s/%s", host, database)
+ val balanced: BalancedClickhouseDataSource =
+ new BalancedClickhouseDataSource(globalJdbc, properties)
+ val conn = balanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
+ val hostAndPort = host.split(":")
+ this.table = config.getString("table")
+ this.tableSchema = getClickHouseSchema(conn, this.table)
+ if (splitMode) {
+ val tableName = config.getString("table")
+ val localTable = getClickHouseDistributedTable(conn, database, tableName)
+ if (Objects.isNull(localTable)) {
+ CheckResult.error(s"split mode only support table which engine is 'Distributed' at now")
+ } else {
+ this.shardTable = localTable.table
+ val shardList = getClusterShardList(conn, localTable.clusterName, localTable.database, hostAndPort(1))
+ var weight = 0
+ for (elem <- shardList) {
+ this.shards(weight) = elem
+ weight += elem.shardWeight
+ }
+ this.shardWeightCount = weight
+ if (config.hasPath("sharding_key") && StringUtils.isNotEmpty(config.getString("sharding_key"))) {
+ this.shardKey = config.getString("sharding_key")
+ }
+ }
+ } else {
+ // only one connection
+ this.shards(0) = new Shard(1, 1, 1, hostAndPort(0),
+ hostAndPort(0), hostAndPort(1), database)
+ }
+
+ if (StringUtils.isNotEmpty(this.shardKey)) {
+ if (!this.tableSchema.containsKey(this.shardKey)) {
+ checkResult = CheckResult.error(
+ s"not find field '${this.shardKey}' in table '${this.table}' as sharding key")
+ } else {
+ this.shardKeyType = this.tableSchema(this.shardKey)
+ }
+ }
+ if (this.config.hasPath("fields")) {
+ this.fields = config.getStringList("fields")
checkResult = acceptedClickHouseSchema()
}
}
@@ -120,8 +187,8 @@ class Clickhouse extends SparkBatchSink {
}
private def getClickHouseSchema(
- conn: ClickHouseConnectionImpl,
- table: String): Map[String, String] = {
+ conn: ClickHouseConnectionImpl,
+ table: String): Map[String, String] = {
val sql = String.format("desc %s", table)
val resultSet = conn.createStatement.executeQuery(sql)
var schema = new HashMap[String, String]()
@@ -131,12 +198,77 @@ class Clickhouse extends SparkBatchSink {
schema
}
+ private def getClusterShardList(conn: ClickHouseConnectionImpl, clusterName: String,
+ database: String, port: String): ListBuffer[Shard] = {
+ val rs = conn.createStatement().executeQuery(
+ s"select shard_num,shard_weight,replica_num,host_name,host_address," +
+ s"port from system.clusters where cluster = '$clusterName'")
+ // TODO The port will be used for data communication of the tcp protocol in the future
+ // port is tcp protocol, need http protocol port at now
+ val nodeList = mutable.ListBuffer[Shard]()
+ while (rs.next()) {
+ nodeList += new Shard(rs.getInt(1), rs.getInt(2),
+ rs.getInt(3), rs.getString(4), rs.getString(5),
+ port, database)
+ }
+ nodeList
+ }
+
+ private def getClickHouseDistributedTable(conn: ClickHouseConnectionImpl, database: String, table: String):
+ DistributedEngine = {
+ val rs = conn.createStatement().executeQuery(
+ s"select engine_full from system.tables where " +
+ s"database = '$database' and name = '$table' and engine = 'Distributed'")
+ if (rs.next()) {
+ // engineFull field will be like : Distributed(cluster, database, table[, sharding_key[, policy_name]])
+ val engineFull = rs.getString(1)
+ val infos = engineFull.substring(12).split(",").map(s => s.replaceAll("'", ""))
+ new DistributedEngine(infos(0), infos(1).trim, infos(2).replaceAll("\\)", "").trim)
+ } else {
+ null
+ }
+ }
+
+ private def getRowShard(hashInstance: XXHash64, row: Row): Shard = {
+ if (splitMode) {
+ if (StringUtils.isEmpty(this.shardKey) || row.schema.fieldNames.indexOf(this.shardKey) == -1) {
+ this.shards.lowerEntry(this.random.nextInt(this.shardWeightCount) + 1).getValue
+ } else {
+ val fieldIndex = row.fieldIndex(this.shardKey)
+ if (row.isNullAt(fieldIndex)) {
+ this.shards.lowerEntry(this.random.nextInt(this.shardWeightCount) + 1).getValue
+ } else {
+ var offset = 0
+ this.shardKeyType match {
+ case Clickhouse.floatPattern(_) =>
+ offset = row.getFloat(fieldIndex).toInt % this.shardWeightCount
+ case Clickhouse.intPattern(_) | Clickhouse.uintPattern(_) =>
+ offset = row.getInt(fieldIndex) % this.shardWeightCount
+ case Clickhouse.decimalPattern(_) =>
+ offset = row.getDecimal(fieldIndex).toBigInteger.mod(BigInteger.valueOf(this
+ .shardWeightCount)).intValue()
+ case _ =>
+ offset = (hashInstance.hash(ByteBuffer.wrap(row.getString(fieldIndex).getBytes), 0) & Long.MaxValue %
+ this.shardWeightCount).toInt
+ }
+ this.shards.lowerEntry(offset + 1).getValue
+ }
+ }
+ } else {
+ this.shards.head._2
+ }
+ }
+
private def initPrepareSQL(): String = {
val prepare = List.fill(fields.size)("?")
+ var table = this.table
+ if (splitMode) {
+ table = this.shardTable
+ }
val sql = String.format(
"insert into %s (%s) values (%s)",
- this.table,
+ table,
this.fields.map(a => a).mkString(","),
prepare.mkString(","))
@@ -170,9 +302,9 @@ class Clickhouse extends SparkBatchSink {
@tailrec
private def renderDefaultStatement(
- index: Int,
- fieldType: String,
- statement: PreparedStatement): Unit = {
+ index: Int,
+ fieldType: String,
+ statement: PreparedStatement): Unit = {
fieldType match {
case "DateTime" | "Date" | "String" =>
statement.setString(index + 1, Clickhouse.renderStringDefault(fieldType))
@@ -192,9 +324,9 @@ class Clickhouse extends SparkBatchSink {
}
private def renderNullStatement(
- index: Int,
- fieldType: String,
- statement: PreparedStatement): Unit = {
+ index: Int,
+ fieldType: String,
+ statement: PreparedStatement): Unit = {
fieldType match {
case "String" =>
statement.setNull(index + 1, java.sql.Types.VARCHAR)
@@ -210,11 +342,11 @@ class Clickhouse extends SparkBatchSink {
}
private def renderBaseTypeStatement(
- index: Int,
- fieldIndex: Int,
- fieldType: String,
- item: Row,
- statement: PreparedStatement): Unit = {
+ index: Int,
+ fieldIndex: Int,
+ fieldType: String,
+ item: Row,
+ statement: PreparedStatement): Unit = {
fieldType match {
case "DateTime" | "Date" | "String" =>
statement.setString(index + 1, item.getAs[String](fieldIndex))
@@ -232,10 +364,10 @@ class Clickhouse extends SparkBatchSink {
}
private def renderStatement(
- fields: util.List[String],
- item: Row,
- dsFields: Array[String],
- statement: PreparedStatement): Unit = {
+ fields: util.List[String],
+ item: Row,
+ dsFields: Array[String],
+ statement: PreparedStatement): Unit = {
for (i <- 0 until fields.size()) {
val field = fields.get(i)
val fieldType = tableSchema(field)
@@ -251,8 +383,7 @@ class Clickhouse extends SparkBatchSink {
fieldType match {
case "String" | "DateTime" | "Date" | Clickhouse.arrayPattern(_) =>
renderBaseTypeStatement(i, fieldIndex, fieldType, item, statement)
- case Clickhouse.floatPattern(_) | Clickhouse.intPattern(_) | Clickhouse.uintPattern(
- _) =>
+ case Clickhouse.floatPattern(_) | Clickhouse.intPattern(_) | Clickhouse.uintPattern(_) =>
renderBaseTypeStatement(i, fieldIndex, fieldType, item, statement)
case Clickhouse.nullablePattern(dataType) =>
renderBaseTypeStatement(i, fieldIndex, dataType, item, statement)
@@ -272,7 +403,6 @@ class Clickhouse extends SparkBatchSink {
val res = Try(statement.executeBatch())
res match {
case Success(_) =>
- statement.close()
case Failure(e: ClickHouseException) =>
val errorCode = e.getErrorCode
if (retryCodes.contains(errorCode)) {
@@ -280,6 +410,7 @@ class Clickhouse extends SparkBatchSink {
execute(statement, retry - 1)
} else {
statement.close()
+ throw e
}
} else {
throw e
@@ -338,4 +469,14 @@ object Clickhouse {
""
}
}
+
+ class DistributedEngine(val clusterName: String, val database: String,
+ val table: String) {
+ }
+
+ class Shard(val shardNum: Int, val shardWeight: Int, val replicaNum: Int,
+ val hostname: String, val hostAddress: String, val port: String,
+ val database: String) extends Serializable {
+ val jdbc = s"jdbc:clickhouse://$hostAddress:$port/$database"
+ }
}