You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/12/03 01:46:43 UTC

[GitHub] [pulsar] nlu90 commented on a change in pull request #12999: [Issue 12998][io] enhance clickhouse cluster sink support

nlu90 commented on a change in pull request #12999:
URL: https://github.com/apache/pulsar/pull/12999#discussion_r761586418



##########
File path: pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
##########
@@ -18,9 +18,23 @@
  */
 package org.apache.pulsar.io.jdbc;
 
+import com.google.common.collect.Lists;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
+import ru.yandex.clickhouse.BalancedClickhouseDataSource;
+import ru.yandex.clickhouse.ClickHouseConnection;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
 
+import java.sql.DriverManager;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+@Slf4j

Review comment:
       proper format

##########
File path: pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
##########
@@ -29,4 +43,48 @@
 )
 public class ClickHouseJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
 
+	@Override
+	public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+		ClickHouseProperties properties = new ClickHouseProperties();
+		String username = jdbcSinkConfig.getUserName();
+		String password = jdbcSinkConfig.getPassword();
+
+		if (username != null) {
+			properties.setUser(jdbcSinkConfig.getUserName());
+		}
+		if (password != null) {
+			properties.setPassword(jdbcSinkConfig.getPassword());
+		}
+
+		jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+		if (jdbcUrl == null) {
+			throw new IllegalArgumentException("Required jdbc Url not set.");
+		}
+		// keep connection stable
+		properties.setConnectionTimeout(300000);
+		properties.setSocketTimeout(300000);
+		// load balance strategy
+		BalancedClickhouseDataSource ckDataSource = new BalancedClickhouseDataSource(jdbcUrl, properties);
+		// to check  clickhouse node  health
+		ckDataSource.scheduleActualization(60, TimeUnit.SECONDS);
+		final ClickHouseConnection ckConnection = ckDataSource.getConnection();
+		super.connection = (ckConnection);
+		ckConnection.setAutoCommit(false);
+
+		log.info("Opened jdbc ckConnection: {}, autoCommit: {}", jdbcUrl, ckConnection.getAutoCommit());
+
+		tableName = jdbcSinkConfig.getTableName();
+		tableId = JdbcUtils.getTableId(ckConnection, tableName);
+		// Init PreparedStatement include insert, delete, update
+		initStatement();
+
+		int timeoutMs = jdbcSinkConfig.getTimeoutMs();
+		batchSize = jdbcSinkConfig.getBatchSize();
+		incomingList = Lists.newArrayList();
+		swapList = Lists.newArrayList();
+		isFlushing = new AtomicBoolean(false);
+
+		flushExecutor = Executors.newScheduledThreadPool(1);
+		flushExecutor.scheduleAtFixedRate(this::flush, timeoutMs, timeoutMs, TimeUnit.MILLISECONDS);

Review comment:
       These code are duplicated in`JdbcAbstractSink`, try to reuse them instead of copy
   
   

##########
File path: pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
##########
@@ -43,13 +44,13 @@
 @Slf4j
 public abstract class JdbcAbstractSink<T> implements Sink<T> {
     // ----- Runtime fields
-    private JdbcSinkConfig jdbcSinkConfig;
+    protected JdbcSinkConfig jdbcSinkConfig;

Review comment:
       most of the fields can keep `private` if code is reused properly

##########
File path: pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
##########
@@ -29,4 +43,48 @@
 )
 public class ClickHouseJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
 
+	@Override
+	public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+		ClickHouseProperties properties = new ClickHouseProperties();
+		String username = jdbcSinkConfig.getUserName();
+		String password = jdbcSinkConfig.getPassword();
+
+		if (username != null) {
+			properties.setUser(jdbcSinkConfig.getUserName());
+		}
+		if (password != null) {
+			properties.setPassword(jdbcSinkConfig.getPassword());
+		}
+
+		jdbcUrl = jdbcSinkConfig.getJdbcUrl();
+		if (jdbcUrl == null) {
+			throw new IllegalArgumentException("Required jdbc Url not set.");
+		}

Review comment:
       move to the beginning of the method




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org