You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/06 15:20:04 UTC

[GitHub] [pulsar] tisonkun commented on pull request #12999: [Issue 12998][io] enhance clickhouse cluster sink support

tisonkun commented on PR #12999:
URL: https://github.com/apache/pulsar/pull/12999#issuecomment-1339542460

   This will be supported at https://github.com/ClickHouse/clickhouse-jdbc/issues/894 0.3.3 directly by:
   
   ```java
   String connString = "jdbc:ch://server1,server2,server3/database"
       + "?load_balancing_policy=random&health_check_interval=5000&failover=2";
   ```
   
   Closing and waiting for a version bump...
   
   If you do want to continue this patch, I suggest:
   
   ```diff
   diff --git a/pulsar-io/jdbc/clickhouse/pom.xml b/pulsar-io/jdbc/clickhouse/pom.xml
   index 82c5983bb2..50092e749d 100644
   --- a/pulsar-io/jdbc/clickhouse/pom.xml
   +++ b/pulsar-io/jdbc/clickhouse/pom.xml
   @@ -41,7 +41,6 @@
          <groupId>ru.yandex.clickhouse</groupId>
          <artifactId>clickhouse-jdbc</artifactId>
          <version>${clickhouse-jdbc.version}</version>
   -      <scope>runtime</scope>
        </dependency>
      </dependencies>
    
   diff --git a/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java b/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
   index 1dde785292..2437ed108c 100644
   --- a/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
   +++ b/pulsar-io/jdbc/clickhouse/src/main/java/org/apache/pulsar/io/jdbc/ClickHouseJdbcAutoSchemaSink.java
   @@ -18,8 +18,13 @@
     */
    package org.apache.pulsar.io.jdbc;
    
   +import java.sql.Connection;
   +import java.sql.SQLException;
   +import java.util.Properties;
    import org.apache.pulsar.io.core.annotations.Connector;
    import org.apache.pulsar.io.core.annotations.IOType;
   +import ru.yandex.clickhouse.BalancedClickhouseDataSource;
   +import ru.yandex.clickhouse.ClickHouseDriver;
    
    @Connector(
        name = "jdbc-clickhouse",
   @@ -28,5 +33,9 @@ import org.apache.pulsar.io.core.annotations.IOType;
        configClass = JdbcSinkConfig.class
    )
    public class ClickHouseJdbcAutoSchemaSink extends BaseJdbcAutoSchemaSink {
   -
   +    @Override
   +    protected Connection createConnection(String jdbcUrl, Properties properties) throws SQLException {
   +        final BalancedClickhouseDataSource ds = new BalancedClickhouseDataSource(jdbcUrl, properties);
   +        return ds.getConnection();
   +    }
    }
   diff --git a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
   index 4586fcebcf..cbfa9b82c9 100644
   --- a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
   +++ b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
   @@ -94,7 +94,7 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
                properties.setProperty("password", password);
            }
    
   -        connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), properties);
   +        connection = createConnection(jdbcSinkConfig.getJdbcUrl(), properties);
            connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions());
            log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit());
    
   @@ -114,6 +114,10 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
            }
        }
    
   +    protected Connection createConnection(String jdbcUrl, Properties properties) throws SQLException {
   +        return DriverManager.getConnection(jdbcUrl, properties);
   +    }
   +
        private void initStatement()  throws Exception {
            List<String> keyList = getListFromConfig(jdbcSinkConfig.getKey());
            List<String> nonKeyList = getListFromConfig(jdbcSinkConfig.getNonKey());
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org