You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/27 08:56:40 UTC

[GitHub] [pulsar] huzk8 opened a new pull request #12999: [Issue 12998][io] enhance clickhouse cluster sink support

huzk8 opened a new pull request #12999:
URL: https://github.com/apache/pulsar/pull/12999


   <!--
   ### Contribution Checklist
     
     - Name the pull request in the form "[Issue XYZ][component] Title of the pull request", where *XYZ* should be replaced by the actual issue number.
       Skip *Issue XYZ* if there is no associated github issue for this pull request.
       Skip *component* if you are unsure about which is the best component. E.g. `[docs] Fix typo in produce method`.
   
     - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review.
     
     - Each pull request should address only one issue, not mix up code from multiple issues.
     
     - Each commit in the pull request has a meaningful commit message
   
     - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below.
   
   **(The sections below can be removed for hotfixes of typos)**
   -->
   
   *(If this PR fixes a github issue, please add `Fixes #<xyz>`.)*
   
   Fixes #12998 
   
   *(or if this PR is one task of a github issue, please add `Master Issue: #<xyz>` to link to the master issue.)*
   
   Master Issue: #<xyz>
   
   ### Motivation
    Io moudle support clickhouse loadbalance connection
   
   
   ### Modifications
   
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - Dependencies (does it add or upgrade a dependency): (yes)
     - The public API: (yes)
     - The schema: (no)
     - The default values of configurations: (no)
     - The wire protocol: (no)
     - The rest endpoints: (no)
     - The admin cli options: (no)
     - Anything that affects deployment: (no )
   
   ### Documentation
   
   Check the box below and label this PR (if you have committer privilege).
   
   Need to update docs? 
   
   - [ ] `doc-required` 
     
     (If you need help on updating docs, create a doc issue)
     
   - [x] `no-need-doc` 
     
     (Please explain why)
     
   - [ ] `doc` 
     
     (If this PR contains doc changes)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] nlu90 commented on a change in pull request #12999: [Issue 12998][io] enhance clickhouse cluster sink support

Posted by GitBox <gi...@apache.org>.
nlu90 commented on a change in pull request #12999:
URL: https://github.com/apache/pulsar/pull/12999#discussion_r761586418



##########
File path: pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
##########
@@ -18,9 +18,23 @@
  */
 package org.apache.pulsar.io.jdbc;
 
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
+import ru.yandex.clickhouse.BalancedClickhouseDataSource;
+import ru.yandex.clickhouse.ClickHouseConnection;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
 
+import java.sql.DriverManager;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+@Slf4j

Review comment:
       proper format

##########
File path: pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
##########
@@ -29,4 +43,48 @@
 )
 public class ClickHouseJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
 
+	@Override
+	public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+		ClickHouseProperties properties = new ClickHouseProperties();
+		String username = jdbcSinkConfig.getUserName();
+		String password = jdbcSinkConfig.getPassword();
+
+		if (username != null) {
+			properties.setUser(jdbcSinkConfig.getUserName());
+		}
+		if (password != null) {
+			properties.setPassword(jdbcSinkConfig.getPassword());
+		}
+
+		jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+		if (jdbcUrl == null) {
+			throw new IllegalArgumentException("Required jdbc Url not set.");
+		}
+		// keep connection stable
+		properties.setConnectionTimeout(300000);
+		properties.setSocketTimeout(300000);
+		// load balance strategy
+		BalancedClickhouseDataSource ckDataSource = new BalancedClickhouseDataSource(jdbcUrl, properties);
+		// to check  clickhouse node  health
+		ckDataSource.scheduleActualization(60, TimeUnit.SECONDS);
+		final ClickHouseConnection ckConnection = ckDataSource.getConnection();
+		super.connection = (ckConnection);
+		ckConnection.setAutoCommit(false);
+
+		log.info("Opened jdbc ckConnection: {}, autoCommit: {}", jdbcUrl, ckConnection.getAutoCommit());
+
+		tableName = jdbcSinkConfig.getTableName();
+		tableId = JdbcUtils.getTableId(ckConnection, tableName);
+		// Init PreparedStatement include insert, delete, update
+		initStatement();
+
+		int timeoutMs = jdbcSinkConfig.getTimeoutMs();
+		batchSize = jdbcSinkConfig.getBatchSize();
+		incomingList = Lists.newArrayList();
+		swapList = Lists.newArrayList();
+		isFlushing = new AtomicBoolean(false);
+
+		flushExecutor = Executors.newScheduledThreadPool(1);
+		flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);

Review comment:
       These code are duplicated in`JdbcAbstractSink`, try to reuse them instead of copy
   
   

##########
File path: pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
##########
@@ -43,13 +44,13 @@
 @Slf4j
 public abstract class JdbcAbstractSink<T> implements Sink<T> {
     // ----- Runtime fields
-    private JdbcSinkConfig jdbcSinkConfig;
+    protected JdbcSinkConfig jdbcSinkConfig;

Review comment:
       most of the fields can keep `private` if code is reused properly

##########
File path: pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
##########
@@ -29,4 +43,48 @@
 )
 public class ClickHouseJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
 
+	@Override
+	public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+		ClickHouseProperties properties = new ClickHouseProperties();
+		String username = jdbcSinkConfig.getUserName();
+		String password = jdbcSinkConfig.getPassword();
+
+		if (username != null) {
+			properties.setUser(jdbcSinkConfig.getUserName());
+		}
+		if (password != null) {
+			properties.setPassword(jdbcSinkConfig.getPassword());
+		}
+
+		jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+		if (jdbcUrl == null) {
+			throw new IllegalArgumentException("Required jdbc Url not set.");
+		}

Review comment:
       move to the beginning of the method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #12999: [Issue 12998][io] enhance clickhouse cluster sink support

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #12999:
URL: https://github.com/apache/pulsar/pull/12999#discussion_r758032106



##########
File path: pulsar-io/jdbc/clickhouse/pom.xml
##########
@@ -41,7 +41,6 @@
       <groupId>ru.yandex.clickhouse</groupId>
       <artifactId>clickhouse-jdbc</artifactId>
       <version>${clickhouse-jdbc.version}</version>
-      <scope>runtime</scope>

Review comment:
       Why remove the runtime scope here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] github-actions[bot] commented on pull request #12999: [Issue 12998][io] enhance clickhouse cluster sink support

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #12999:
URL: https://github.com/apache/pulsar/pull/12999#issuecomment-1051440821


   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] huzk8 commented on a change in pull request #12999: [Issue 12998][io] enhance clickhouse cluster sink support

Posted by GitBox <gi...@apache.org>.
huzk8 commented on a change in pull request #12999:
URL: https://github.com/apache/pulsar/pull/12999#discussion_r758968343



##########
File path: pulsar-io/jdbc/clickhouse/pom.xml
##########
@@ -41,7 +41,6 @@
       <groupId>ru.yandex.clickhouse</groupId>
       <artifactId>clickhouse-jdbc</artifactId>
       <version>${clickhouse-jdbc.version}</version>
-      <scope>runtime</scope>

Review comment:
        need to user `ru.yandex.clickhouse.BalancedClickhouseDataSource`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] hangc0276 commented on pull request #12999: [Issue 12998][io] enhance clickhouse cluster sink support

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on pull request #12999:
URL: https://github.com/apache/pulsar/pull/12999#issuecomment-984746123


   @nlu90 Would you please help review this Pr?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org