You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by wu...@apache.org on 2022/03/04 06:11:05 UTC

[incubator-seatunnel] branch dev updated: [Feature-1358][Connector] Clickhouse add support split mode (#1358) (#1369)

This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new cb6cc1a  [Feature-1358][Connector] Clickhouse  add support split mode (#1358) (#1369)
cb6cc1a is described below

commit cb6cc1a0c6759132c704f72ace149cad7fe51d51
Author: TrickyZerg <32...@users.noreply.github.com>
AuthorDate: Fri Mar 4 14:11:00 2022 +0800

    [Feature-1358][Connector] Clickhouse  add support split mode (#1358) (#1369)
    
    * [ST-1358][feat] Clickhouse sink support split mode
    Support split mode to write data to clickhouse.
    This closes #1358
    
    * [ST-1358][feat] Clickhouse sink support split mode
    Update clickhouse sink split_mode doc
    This closes #1358
    
    * [ST-1358][feat] Clickhouse sink support split mode
    fix fields check problem
    This closes #1358
---
 .../spark/configuration/sink-plugins/Clickhouse.md |  40 ++--
 .../apache/seatunnel/spark/sink/Clickhouse.scala   | 243 ++++++++++++++++-----
 2 files changed, 219 insertions(+), 64 deletions(-)

diff --git a/docs/en/spark/configuration/sink-plugins/Clickhouse.md b/docs/en/spark/configuration/sink-plugins/Clickhouse.md
index 9dd8666..d561de2 100644
--- a/docs/en/spark/configuration/sink-plugins/Clickhouse.md
+++ b/docs/en/spark/configuration/sink-plugins/Clickhouse.md
@@ -8,19 +8,21 @@ Use [Clickhouse-jdbc](https://github.com/ClickHouse/clickhouse-jdbc) to correspo
 
 ## Options
 
-| name           | type   | required | default value |
-| -------------- | ------ | -------- | ------------- |
-| bulk_size      | number | no       | 20000         |
-| clickhouse.*   | string | no       |               |
-| database       | string | yes      | -             |
-| fields         | array  | no       | -             |
-| host           | string | yes      | -             |
-| password       | string | no       | -             |
-| retry          | number | no       | 1             |
-| retry_codes    | array  | no       | [ ]           |
-| table          | string | yes      | -             |
-| username       | string | no       | -             |
-| common-options | string | no       | -             |
+| name           | type    | required | default value |
+|----------------|---------| -------- |---------------|
+| bulk_size      | number  | no       | 20000         |
+| clickhouse.*   | string  | no       |               |
+| database       | string  | yes      | -             |
+| fields         | array   | no       | -             |
+| host           | string  | yes      | -             |
+| password       | string  | no       | -             |
+| retry          | number  | no       | 1             |
+| retry_codes    | array   | no       | [ ]           |
+| table          | string  | yes      | -             |
+| username       | string  | no       | -             |
+| split_mode     | boolean | no       | false         |
+| sharding_key   | string  | no       | -             |
+| common-options | string  | no       | -             |
 
 ### bulk_size [number]
 
@@ -66,6 +68,18 @@ In addition to the above mandatory parameters that must be specified by `clickho
 
 The way to specify the parameter is to add the prefix `clickhouse.` to the original parameter name. For example, the way to specify `socket_timeout` is: `clickhouse.socket_timeout = 50000` . If these non-essential parameters are not specified, they will use the default values given by `clickhouse-jdbc`.
 
+### split_mode [boolean]
+
+This mode only support clickhouse table which engine is 'Distributed'. They will split distributed table 
+data in seatunnel and perform the write directly on each shard. The shard weight define is clickhouse will be 
+counted.
+
+### sharding_key [string]
+
+When use split_mode, which node to send data to is a problem, the default is random selection, but the 
+'sharding_key' parameter can be used to specify the field for the sharding algorithm. This option only 
+worked when 'split_mode' is true.
+
 ### common options [string]
 
 Sink plugin common parameters, please refer to [Sink Plugin](./sink-plugin.md) for details
diff --git a/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala b/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
index f786cd1..338778c 100644
--- a/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
+++ b/seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
@@ -16,14 +16,17 @@
  */
 package org.apache.seatunnel.spark.sink
 
-import java.math.BigDecimal
+import net.jpountz.xxhash.{XXHash64, XXHashFactory}
+import org.apache.commons.lang3.StringUtils
+
+import java.math.{BigDecimal, BigInteger}
 import java.sql.PreparedStatement
 import java.text.SimpleDateFormat
 import java.util
-import java.util.Properties
+import java.util.{Objects, Properties}
 import scala.collection.JavaConversions._
 import scala.collection.immutable.HashMap
-import scala.util.{Failure, Success, Try}
+import scala.util.{Failure, Random, Success, Try}
 import scala.util.matching.Regex
 import org.apache.seatunnel.common.config.CheckConfigUtil.checkAllExists
 import org.apache.seatunnel.common.config.CheckResult
@@ -31,22 +34,38 @@ import org.apache.seatunnel.common.config.TypesafeConfigUtils.{extractSubConfig,
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory
 import org.apache.seatunnel.spark.SparkEnvironment
 import org.apache.seatunnel.spark.batch.SparkBatchSink
+import org.apache.seatunnel.spark.sink.Clickhouse.{DistributedEngine, Shard}
 import org.apache.spark.sql.{Dataset, Row}
 import ru.yandex.clickhouse.{BalancedClickhouseDataSource, ClickHouseConnectionImpl}
 import ru.yandex.clickhouse.except.{ClickHouseException, ClickHouseUnknownException}
 
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicLong
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
 import scala.annotation.tailrec
 
 class Clickhouse extends SparkBatchSink {
 
-  var tableSchema: Map[String, String] = new HashMap[String, String]()
-  var jdbcLink: String = _
-  var initSQL: String = _
-  var table: String = _
-  var fields: java.util.List[String] = _
-  var retryCodes: java.util.List[Integer] = _
-  val clickhousePrefix = "clickhouse."
-  val properties: Properties = new Properties()
+  private var tableSchema: Map[String, String] = new HashMap[String, String]()
+  private var initSQL: String = _
+  private var table: String = _
+  private var fields: java.util.List[String] = _
+  private var retryCodes: java.util.List[Integer] = _
+  private val shards = new util.TreeMap[Int, Shard]()
+  private val clickhousePrefix = "clickhouse."
+  private val properties: Properties = new Properties()
+
+  // used for split mode
+  private var splitMode: Boolean = false
+  private val random = new Random()
+  private var shardKey: String = ""
+  private var shardKeyType: String = _
+  private var shardTable: String = _
+  private var shardWeightCount = 0
+  // used for split mode end
+
 
   override def output(data: Dataset[Row], environment: SparkEnvironment): Unit = {
     val dfFields = data.schema.fieldNames
@@ -58,24 +77,33 @@ class Clickhouse extends SparkBatchSink {
       initSQL = initPrepareSQL()
     }
     data.foreachPartition { iter: Iterator[Row] =>
-      val executorBalanced = new BalancedClickhouseDataSource(this.jdbcLink, this.properties)
-      val executorConn = executorBalanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
-      val statement = executorConn.prepareStatement(this.initSQL)
 
-      var length = 0
+      val statementMap = this.shards.map(s => {
+        val executorBalanced = new BalancedClickhouseDataSource(s._2.jdbc, this.properties)
+        val executorConn = executorBalanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
+        (s._2, executorConn.prepareStatement(this.initSQL))
+      }).toMap
+
+      // hashInstance cannot be serialized, can only be created in a partition
+      val hashInstance = XXHashFactory.fastestInstance().hash64()
 
+      val lengthMap = statementMap.map(s => (s._1, new AtomicLong(0)))
       for (item <- iter) {
-        length += 1
+        val shard = getRowShard(hashInstance, item)
+        val statement = statementMap(shard)
         renderStatement(fields, item, dfFields, statement)
         statement.addBatch()
-
-        if (length >= bulkSize) {
+        val length = lengthMap(shard)
+        if (length.addAndGet(1) >= bulkSize) {
           execute(statement, retry)
-          length = 0
+          length.set(0)
         }
       }
+      statementMap.foreach(s => {
+        execute(s._2, retry)
+        s._2.close()
+      })
 
-      execute(statement, retry)
     }
   }
 
@@ -87,17 +115,56 @@ class Clickhouse extends SparkBatchSink {
           properties.put(e.getKey, String.valueOf(e.getValue.unwrapped()))
         })
       }
-      if (config.hasPath("username")) {
-        properties.put("user", config.getString("username"))
-        properties.put("password", config.getString("password"))
+
+      properties.put("user", config.getString("username"))
+      properties.put("password", config.getString("password"))
+
+      if (config.hasPath("split_mode")) {
+        splitMode = config.getBoolean("split_mode")
       }
-      jdbcLink = s"jdbc:clickhouse://${config.getString("host")}/${config.getString("database")}"
-      val conn = new BalancedClickhouseDataSource(jdbcLink, properties).getConnection
-        .asInstanceOf[ClickHouseConnectionImpl]
-      table = config.getString("table")
-      tableSchema = getClickHouseSchema(conn, table)
-      if (config.hasPath("fields")) {
-        fields = config.getStringList("fields")
+      val database = config.getString("database")
+      val host = config.getString("host")
+      val globalJdbc = String.format("jdbc:clickhouse://%s/%s", host, database)
+      val balanced: BalancedClickhouseDataSource =
+        new BalancedClickhouseDataSource(globalJdbc, properties)
+      val conn = balanced.getConnection.asInstanceOf[ClickHouseConnectionImpl]
+      val hostAndPort = host.split(":")
+      this.table = config.getString("table")
+      this.tableSchema = getClickHouseSchema(conn, this.table)
+      if (splitMode) {
+        val tableName = config.getString("table")
+        val localTable = getClickHouseDistributedTable(conn, database, tableName)
+        if (Objects.isNull(localTable)) {
+          CheckResult.error(s"split mode only support table which engine is 'Distributed' at now")
+        } else {
+          this.shardTable = localTable.table
+          val shardList = getClusterShardList(conn, localTable.clusterName, localTable.database, hostAndPort(1))
+          var weight = 0
+          for (elem <- shardList) {
+            this.shards(weight) = elem
+            weight += elem.shardWeight
+          }
+          this.shardWeightCount = weight
+          if (config.hasPath("sharding_key") && StringUtils.isNotEmpty(config.getString("sharding_key"))) {
+            this.shardKey = config.getString("sharding_key")
+          }
+        }
+      } else {
+        // only one connection
+        this.shards(0) = new Shard(1, 1, 1, hostAndPort(0),
+          hostAndPort(0), hostAndPort(1), database)
+      }
+
+      if (StringUtils.isNotEmpty(this.shardKey)) {
+        if (!this.tableSchema.containsKey(this.shardKey)) {
+          checkResult = CheckResult.error(
+            s"not find field '${this.shardKey}' in table '${this.table}' as sharding key")
+        } else {
+          this.shardKeyType = this.tableSchema(this.shardKey)
+        }
+      }
+      if (this.config.hasPath("fields")) {
+        this.fields = config.getStringList("fields")
         checkResult = acceptedClickHouseSchema()
       }
     }
@@ -120,8 +187,8 @@ class Clickhouse extends SparkBatchSink {
   }
 
   private def getClickHouseSchema(
-      conn: ClickHouseConnectionImpl,
-      table: String): Map[String, String] = {
+                                   conn: ClickHouseConnectionImpl,
+                                   table: String): Map[String, String] = {
     val sql = String.format("desc %s", table)
     val resultSet = conn.createStatement.executeQuery(sql)
     var schema = new HashMap[String, String]()
@@ -131,12 +198,77 @@ class Clickhouse extends SparkBatchSink {
     schema
   }
 
+  private def getClusterShardList(conn: ClickHouseConnectionImpl, clusterName: String,
+                                  database: String, port: String): ListBuffer[Shard] = {
+    val rs = conn.createStatement().executeQuery(
+      s"select shard_num,shard_weight,replica_num,host_name,host_address," +
+        s"port from system.clusters where cluster = '$clusterName'")
+    // TODO The port will be used for data communication of the tcp protocol in the future
+    // port is tcp protocol, need http protocol port at now
+    val nodeList = mutable.ListBuffer[Shard]()
+    while (rs.next()) {
+      nodeList += new Shard(rs.getInt(1), rs.getInt(2),
+        rs.getInt(3), rs.getString(4), rs.getString(5),
+        port, database)
+    }
+    nodeList
+  }
+
+  private def getClickHouseDistributedTable(conn: ClickHouseConnectionImpl, database: String, table: String):
+  DistributedEngine = {
+    val rs = conn.createStatement().executeQuery(
+      s"select engine_full from system.tables where " +
+        s"database = '$database' and name = '$table' and engine = 'Distributed'")
+    if (rs.next()) {
+      // engineFull field will be like : Distributed(cluster, database, table[, sharding_key[, policy_name]])
+      val engineFull = rs.getString(1)
+      val infos = engineFull.substring(12).split(",").map(s => s.replaceAll("'", ""))
+      new DistributedEngine(infos(0), infos(1).trim, infos(2).replaceAll("\\)", "").trim)
+    } else {
+      null
+    }
+  }
+
+  private def getRowShard(hashInstance: XXHash64, row: Row): Shard = {
+    if (splitMode) {
+      if (StringUtils.isEmpty(this.shardKey) || row.schema.fieldNames.indexOf(this.shardKey) == -1) {
+        this.shards.lowerEntry(this.random.nextInt(this.shardWeightCount) + 1).getValue
+      } else {
+        val fieldIndex = row.fieldIndex(this.shardKey)
+        if (row.isNullAt(fieldIndex)) {
+          this.shards.lowerEntry(this.random.nextInt(this.shardWeightCount) + 1).getValue
+        } else {
+          var offset = 0
+          this.shardKeyType match {
+            case Clickhouse.floatPattern(_) =>
+              offset = row.getFloat(fieldIndex).toInt % this.shardWeightCount
+            case Clickhouse.intPattern(_) | Clickhouse.uintPattern(_) =>
+              offset = row.getInt(fieldIndex) % this.shardWeightCount
+            case Clickhouse.decimalPattern(_) =>
+              offset = row.getDecimal(fieldIndex).toBigInteger.mod(BigInteger.valueOf(this
+                .shardWeightCount)).intValue()
+            case _ =>
+              offset = (hashInstance.hash(ByteBuffer.wrap(row.getString(fieldIndex).getBytes), 0) & Long.MaxValue %
+                this.shardWeightCount).toInt
+          }
+          this.shards.lowerEntry(offset + 1).getValue
+        }
+      }
+    } else {
+      this.shards.head._2
+    }
+  }
+
   private def initPrepareSQL(): String = {
 
     val prepare = List.fill(fields.size)("?")
+    var table = this.table
+    if (splitMode) {
+      table = this.shardTable
+    }
     val sql = String.format(
       "insert into %s (%s) values (%s)",
-      this.table,
+      table,
       this.fields.map(a => a).mkString(","),
       prepare.mkString(","))
 
@@ -170,9 +302,9 @@ class Clickhouse extends SparkBatchSink {
 
   @tailrec
   private def renderDefaultStatement(
-      index: Int,
-      fieldType: String,
-      statement: PreparedStatement): Unit = {
+                                      index: Int,
+                                      fieldType: String,
+                                      statement: PreparedStatement): Unit = {
     fieldType match {
       case "DateTime" | "Date" | "String" =>
         statement.setString(index + 1, Clickhouse.renderStringDefault(fieldType))
@@ -192,9 +324,9 @@ class Clickhouse extends SparkBatchSink {
   }
 
   private def renderNullStatement(
-      index: Int,
-      fieldType: String,
-      statement: PreparedStatement): Unit = {
+                                   index: Int,
+                                   fieldType: String,
+                                   statement: PreparedStatement): Unit = {
     fieldType match {
       case "String" =>
         statement.setNull(index + 1, java.sql.Types.VARCHAR)
@@ -210,11 +342,11 @@ class Clickhouse extends SparkBatchSink {
   }
 
   private def renderBaseTypeStatement(
-      index: Int,
-      fieldIndex: Int,
-      fieldType: String,
-      item: Row,
-      statement: PreparedStatement): Unit = {
+                                       index: Int,
+                                       fieldIndex: Int,
+                                       fieldType: String,
+                                       item: Row,
+                                       statement: PreparedStatement): Unit = {
     fieldType match {
       case "DateTime" | "Date" | "String" =>
         statement.setString(index + 1, item.getAs[String](fieldIndex))
@@ -232,10 +364,10 @@ class Clickhouse extends SparkBatchSink {
   }
 
   private def renderStatement(
-      fields: util.List[String],
-      item: Row,
-      dsFields: Array[String],
-      statement: PreparedStatement): Unit = {
+                               fields: util.List[String],
+                               item: Row,
+                               dsFields: Array[String],
+                               statement: PreparedStatement): Unit = {
     for (i <- 0 until fields.size()) {
       val field = fields.get(i)
       val fieldType = tableSchema(field)
@@ -251,8 +383,7 @@ class Clickhouse extends SparkBatchSink {
           fieldType match {
             case "String" | "DateTime" | "Date" | Clickhouse.arrayPattern(_) =>
               renderBaseTypeStatement(i, fieldIndex, fieldType, item, statement)
-            case Clickhouse.floatPattern(_) | Clickhouse.intPattern(_) | Clickhouse.uintPattern(
-                  _) =>
+            case Clickhouse.floatPattern(_) | Clickhouse.intPattern(_) | Clickhouse.uintPattern(_) =>
               renderBaseTypeStatement(i, fieldIndex, fieldType, item, statement)
             case Clickhouse.nullablePattern(dataType) =>
               renderBaseTypeStatement(i, fieldIndex, dataType, item, statement)
@@ -272,7 +403,6 @@ class Clickhouse extends SparkBatchSink {
     val res = Try(statement.executeBatch())
     res match {
       case Success(_) =>
-        statement.close()
       case Failure(e: ClickHouseException) =>
         val errorCode = e.getErrorCode
         if (retryCodes.contains(errorCode)) {
@@ -280,6 +410,7 @@ class Clickhouse extends SparkBatchSink {
             execute(statement, retry - 1)
           } else {
             statement.close()
+            throw e
           }
         } else {
           throw e
@@ -338,4 +469,14 @@ object Clickhouse {
         ""
     }
   }
+
+  class DistributedEngine(val clusterName: String, val database: String,
+                          val table: String) {
+  }
+
+  class Shard(val shardNum: Int, val shardWeight: Int, val replicaNum: Int,
+              val hostname: String, val hostAddress: String, val port: String,
+              val database: String) extends Serializable {
+    val jdbc = s"jdbc:clickhouse://$hostAddress:$port/$database"
+  }
 }