You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by GitBox <gi...@apache.org> on 2022/03/02 04:06:53 UTC

[GitHub] [incubator-seatunnel] BenJFan opened a new pull request #1369: [Feature][Connector] Clickhouse support split mode (#1358)

BenJFan opened a new pull request #1369:
URL: https://github.com/apache/incubator-seatunnel/pull/1369


   <!--
   
   Thank you for contributing to SeaTunnel! Please make sure that your code changes
   are covered with tests. And in case of new features or big changes
   remember to adjust the documentation.
   
   Feel free to ping committers for the review!
   
   ## Contribution Checklist
   
     - Make sure that the pull request corresponds to a [GITHUB issue](https://github.com/apache/incubator-seatunnel/issues).
   
     - Name the pull request in the form "[Feature] [component] Title of the pull request", where *Feature* can be replaced by `Hotfix`, `Bug`, etc.
   
     - Minor fixes should be named following this pattern: `[hotfix] [docs] Fix typo in README.md doc`.
   
   -->
   
   ## Purpose of this pull request
   Clickhouse support split mode and update corresponding document
   <!-- Describe the purpose of this pull request. For example: This pull request adds checkstyle plugin.-->
   
   ## Check list
   
   * [ ] Code changed are covered with tests, or it does not need tests for reason:
   * [ ] If any new Jar binary package adding in you PR, please add License Notice according
     [New License Guide](https://github.com/apache/incubator-seatunnel/blob/dev/docs/en/developement/NewLicenseGuide.md)
   * [ ] If necessary, please update the documentation to describe the new feature. https://github.com/apache/incubator-seatunnel/tree/dev/docs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] BenJFan commented on a change in pull request #1369: [Feature-1358][Connector] Clickhouse support split mode (#1358)

Posted by GitBox <gi...@apache.org>.
BenJFan commented on a change in pull request #1369:
URL: https://github.com/apache/incubator-seatunnel/pull/1369#discussion_r819198857



##########
File path: seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
##########
@@ -16,37 +16,56 @@
  */
 package org.apache.seatunnel.spark.sink
 
-import java.math.BigDecimal
+import net.jpountz.xxhash.{XXHash64, XXHashFactory}

Review comment:
       When sharding key isn't a number, we need use hash to get which shard we should write.
   ``` scala
   val offset = (hashInstance.hash(ByteBuffer.wrap(row.getString(fieldIndex).getBytes), 0) & Long.MaxValue %
                   this.shardWeightCount).toInt
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] wuchunfu commented on a change in pull request #1369: [Feature-1358][Connector] Clickhouse support split mode (#1358)

Posted by GitBox <gi...@apache.org>.
wuchunfu commented on a change in pull request #1369:
URL: https://github.com/apache/incubator-seatunnel/pull/1369#discussion_r819233566



##########
File path: seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
##########
@@ -16,37 +16,56 @@
  */
 package org.apache.seatunnel.spark.sink
 
-import java.math.BigDecimal
+import net.jpountz.xxhash.{XXHash64, XXHashFactory}

Review comment:
       @BenJFan I don't know much about `XXHash64` , is there any performance improvement for `XXHash64` using this? Why not use `Hash` in `jdk`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] BenJFan commented on pull request #1369: [Feature-1358][Connector] Clickhouse support split mode (#1358)

Posted by GitBox <gi...@apache.org>.
BenJFan commented on pull request #1369:
URL: https://github.com/apache/incubator-seatunnel/pull/1369#issuecomment-1058850036


   > @BenJFan Has this been tested?
   
   Yes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] wuchunfu commented on a change in pull request #1369: [Feature-1358][Connector] Clickhouse support split mode (#1358)

Posted by GitBox <gi...@apache.org>.
wuchunfu commented on a change in pull request #1369:
URL: https://github.com/apache/incubator-seatunnel/pull/1369#discussion_r818782916



##########
File path: seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
##########
@@ -131,12 +198,77 @@ class Clickhouse extends SparkBatchSink {
     schema
   }
 
+  private def getClusterShardList(conn: ClickHouseConnectionImpl, clusterName: String,
+                                  database: String, port: String): ListBuffer[Shard] = {
+    val rs = conn.createStatement().executeQuery(
+      s"select shard_num,shard_weight,replica_num,host_name,host_address," +
+        s"port from system.clusters where cluster = '$clusterName'")
+    // TODO The port will be used for data communication of the tcp protocol in the future
+    // port is tcp protocol, need http protocol port at now
+    val nodeList = mutable.ListBuffer[Shard]()
+    while (rs.next()) {
+      nodeList += new Shard(rs.getInt(1), rs.getInt(2),
+        rs.getInt(3), rs.getString(4), rs.getString(5),
+        port, database)

Review comment:
       @BenJFan It would be better to get it by field here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] wuchunfu commented on a change in pull request #1369: [Feature-1358][Connector] Clickhouse support split mode (#1358)

Posted by GitBox <gi...@apache.org>.
wuchunfu commented on a change in pull request #1369:
URL: https://github.com/apache/incubator-seatunnel/pull/1369#discussion_r818778479



##########
File path: seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
##########
@@ -16,37 +16,56 @@
  */
 package org.apache.seatunnel.spark.sink
 
-import java.math.BigDecimal
+import net.jpountz.xxhash.{XXHash64, XXHashFactory}

Review comment:
       @BenJFan Could you please talk about the purpose of using `XXHash64` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] wuchunfu commented on pull request #1369: [Feature-1358][Connector] Clickhouse support split mode (#1358)

Posted by GitBox <gi...@apache.org>.
wuchunfu commented on pull request #1369:
URL: https://github.com/apache/incubator-seatunnel/pull/1369#issuecomment-1058849267


   @BenJFan Has this been tested?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] wuchunfu commented on pull request #1369: [Feature-1358][Connector] Clickhouse support split mode (#1358)

Posted by GitBox <gi...@apache.org>.
wuchunfu commented on pull request #1369:
URL: https://github.com/apache/incubator-seatunnel/pull/1369#issuecomment-1058865863


   Well done, looking forward to your next contribution


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] wuchunfu commented on pull request #1369: [Feature-1358][Connector] Clickhouse support split mode (#1358)

Posted by GitBox <gi...@apache.org>.
wuchunfu commented on pull request #1369:
URL: https://github.com/apache/incubator-seatunnel/pull/1369#issuecomment-1056480613


   @RickyHuo @leo65535 PTAL thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] BenJFan commented on a change in pull request #1369: [Feature-1358][Connector] Clickhouse support split mode (#1358)

Posted by GitBox <gi...@apache.org>.
BenJFan commented on a change in pull request #1369:
URL: https://github.com/apache/incubator-seatunnel/pull/1369#discussion_r819280258



##########
File path: seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
##########
@@ -131,12 +198,77 @@ class Clickhouse extends SparkBatchSink {
     schema
   }
 
+  private def getClusterShardList(conn: ClickHouseConnectionImpl, clusterName: String,
+                                  database: String, port: String): ListBuffer[Shard] = {
+    val rs = conn.createStatement().executeQuery(
+      s"select shard_num,shard_weight,replica_num,host_name,host_address," +
+        s"port from system.clusters where cluster = '$clusterName'")
+    // TODO The port will be used for data communication of the tcp protocol in the future
+    // port is tcp protocol, need http protocol port at now
+    val nodeList = mutable.ListBuffer[Shard]()
+    while (rs.next()) {
+      nodeList += new Shard(rs.getInt(1), rs.getInt(2),
+        rs.getInt(3), rs.getString(4), rs.getString(5),
+        port, database)

Review comment:
       This code need change? If yes I will do that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] BenJFan commented on a change in pull request #1369: [Feature-1358][Connector] Clickhouse support split mode (#1358)

Posted by GitBox <gi...@apache.org>.
BenJFan commented on a change in pull request #1369:
URL: https://github.com/apache/incubator-seatunnel/pull/1369#discussion_r819202361



##########
File path: seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
##########
@@ -131,12 +198,77 @@ class Clickhouse extends SparkBatchSink {
     schema
   }
 
+  private def getClusterShardList(conn: ClickHouseConnectionImpl, clusterName: String,
+                                  database: String, port: String): ListBuffer[Shard] = {
+    val rs = conn.createStatement().executeQuery(
+      s"select shard_num,shard_weight,replica_num,host_name,host_address," +
+        s"port from system.clusters where cluster = '$clusterName'")
+    // TODO The port will be used for data communication of the tcp protocol in the future
+    // port is tcp protocol, need http protocol port at now
+    val nodeList = mutable.ListBuffer[Shard]()
+    while (rs.next()) {
+      nodeList += new Shard(rs.getInt(1), rs.getInt(2),
+        rs.getInt(3), rs.getString(4), rs.getString(5),
+        port, database)

Review comment:
       Use field is ok, but use offset could be faster.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] BenJFan commented on a change in pull request #1369: [Feature-1358][Connector] Clickhouse support split mode (#1358)

Posted by GitBox <gi...@apache.org>.
BenJFan commented on a change in pull request #1369:
URL: https://github.com/apache/incubator-seatunnel/pull/1369#discussion_r819199528



##########
File path: seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
##########
@@ -131,12 +198,77 @@ class Clickhouse extends SparkBatchSink {
     schema
   }
 
+  private def getClusterShardList(conn: ClickHouseConnectionImpl, clusterName: String,
+                                  database: String, port: String): ListBuffer[Shard] = {
+    val rs = conn.createStatement().executeQuery(
+      s"select shard_num,shard_weight,replica_num,host_name,host_address," +
+        s"port from system.clusters where cluster = '$clusterName'")

Review comment:
       Yes. System.clusters is clickhouse's system table. Use for save cluster info which user config




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] BenJFan commented on a change in pull request #1369: [Feature-1358][Connector] Clickhouse support split mode (#1358)

Posted by GitBox <gi...@apache.org>.
BenJFan commented on a change in pull request #1369:
URL: https://github.com/apache/incubator-seatunnel/pull/1369#discussion_r819236573



##########
File path: seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
##########
@@ -16,37 +16,56 @@
  */
 package org.apache.seatunnel.spark.sink
 
-import java.math.BigDecimal
+import net.jpountz.xxhash.{XXHash64, XXHashFactory}

Review comment:
       This is benchmarks: https://cyan4973.github.io/xxHash/. Faster than MD5 etc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] wuchunfu commented on a change in pull request #1369: [Feature-1358][Connector] Clickhouse support split mode (#1358)

Posted by GitBox <gi...@apache.org>.
wuchunfu commented on a change in pull request #1369:
URL: https://github.com/apache/incubator-seatunnel/pull/1369#discussion_r818785042



##########
File path: seatunnel-connectors/seatunnel-connector-spark-clickhouse/src/main/scala/org/apache/seatunnel/spark/sink/Clickhouse.scala
##########
@@ -131,12 +198,77 @@ class Clickhouse extends SparkBatchSink {
     schema
   }
 
+  private def getClusterShardList(conn: ClickHouseConnectionImpl, clusterName: String,
+                                  database: String, port: String): ListBuffer[Shard] = {
+    val rs = conn.createStatement().executeQuery(
+      s"select shard_num,shard_weight,replica_num,host_name,host_address," +
+        s"port from system.clusters where cluster = '$clusterName'")

Review comment:
       @BenJFan Is `system.clusters` a fixed value? Is this the only way to get it?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-seatunnel] wuchunfu merged pull request #1369: [Feature-1358][Connector] Clickhouse support split mode (#1358)

Posted by GitBox <gi...@apache.org>.
wuchunfu merged pull request #1369:
URL: https://github.com/apache/incubator-seatunnel/pull/1369


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@seatunnel.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org