You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@bahir.apache.org by GitBox <gi...@apache.org> on 2020/05/28 02:51:19 UTC

[GitHub] [bahir-flink] pyscala opened a new pull request #85: [BAHIR-234] Add ClickHouse Connector for Flink

pyscala opened a new pull request #85:
URL: https://github.com/apache/bahir-flink/pull/85


   Implement Streaming ClickHouseSink,support Flink Table API & Flink SQL
   for ClickHouse connector


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] pyscala commented on pull request #85: [BAHIR-234] Add ClickHouse Connector for Flink

Posted by GitBox <gi...@apache.org>.
pyscala commented on pull request #85:
URL: https://github.com/apache/bahir-flink/pull/85#issuecomment-807901922


   @eskabetxe  Optimization complete, looking forward to your reply.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] pyscala removed a comment on pull request #85: [BAHIR-234] Add ClickHouse Connector for Flink

Posted by GitBox <gi...@apache.org>.
pyscala removed a comment on pull request #85:
URL: https://github.com/apache/bahir-flink/pull/85#issuecomment-635063992


   Add ClickHouse Connector for Flink #85
   Implement Streaming ClickHouseSink,support Flink Table API & Flink SQL
   for ClickHouse connector


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] bakey commented on pull request #85: [BAHIR-234] Add ClickHouse Connector for Flink

Posted by GitBox <gi...@apache.org>.
bakey commented on pull request #85:
URL: https://github.com/apache/bahir-flink/pull/85#issuecomment-1010733192


   Is this PR still alive?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@bahir.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] pyscala commented on a change in pull request #85: [BAHIR-234] Add ClickHouse Connector for Flink

Posted by GitBox <gi...@apache.org>.
pyscala commented on a change in pull request #85:
URL: https://github.com/apache/bahir-flink/pull/85#discussion_r594007169



##########
File path: flink-connector-clickhouse/pom.xml
##########
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink-parent_2.11</artifactId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <!--<groupId>com.apache.flink</groupId>-->

Review comment:
       thanks for your reply. done.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] pyscala commented on a change in pull request #85: [BAHIR-234] Add ClickHouse Connector for Flink

Posted by GitBox <gi...@apache.org>.
pyscala commented on a change in pull request #85:
URL: https://github.com/apache/bahir-flink/pull/85#discussion_r598497998



##########
File path: flink-connector-clickhouse/pom.xml
##########
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink-parent_2.11</artifactId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-clickhouse_${scala.binary.version}</artifactId>
+    <name>flink-connector-clickhouse</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>ru.yandex.clickhouse</groupId>
+            <artifactId>clickhouse-jdbc</artifactId>
+            <version>0.1.50</version>

Review comment:
       thanks for your reply,i will fix it later.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] gj-zhang commented on pull request #85: [BAHIR-234] Add ClickHouse Connector for Flink

Posted by GitBox <gi...@apache.org>.
gj-zhang commented on pull request #85:
URL: https://github.com/apache/bahir-flink/pull/85#issuecomment-872690757


   How is it going?
   
   How to contribute code?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@bahir.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] pyscala commented on pull request #85: [BAHIR-234] Add ClickHouse Connector for Flink

Posted by GitBox <gi...@apache.org>.
pyscala commented on pull request #85:
URL: https://github.com/apache/bahir-flink/pull/85#issuecomment-635074588


   @lresende @mbalassi  Appreciate for your time to review this PR.thanks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] eskabetxe commented on a change in pull request #85: [BAHIR-234] Add ClickHouse Connector for Flink

Posted by GitBox <gi...@apache.org>.
eskabetxe commented on a change in pull request #85:
URL: https://github.com/apache/bahir-flink/pull/85#discussion_r592311845



##########
File path: flink-connector-clickhouse/src/test/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSinkTest.java
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.connectors.clickhouse;
+
+import org.junit.Test;
+
+/**
+ * Created by liufangliang on 2020/4/16.
+ */
+public class ClickHouseTableSinkTest {

Review comment:
       all methods are empty

##########
File path: flink-connector-clickhouse/pom.xml
##########
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink-parent_2.11</artifactId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <!--<groupId>com.apache.flink</groupId>-->

Review comment:
       remove comment lines

##########
File path: flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunction.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.flink.streaming.connectors.clickhouse;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ru.yandex.clickhouse.BalancedClickhouseDataSource;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.Properties;
+
+/**
+ * @author liufangliang
+ * @date 2020/3/20 10:21 AM
+ */
+public class ClickHouseAppendSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
+    private static final String USERNAME = "user";
+private static final String PASSWORD = "password";
+
+
+    private static final Logger log = LoggerFactory.getLogger(ClickHouseAppendSinkFunction.class);
+    private static final long serialVersionUID = 1L;
+
+    private  Connection connection;
+    private  BalancedClickhouseDataSource dataSource;
+    private  PreparedStatement pstat;
+
+    private String address;
+    private String username;
+    private String password;
+
+    private String prepareStatement;
+    private Integer batchSize;
+    private Long commitPadding;
+
+    private Integer retries;
+    private Long retryInterval;
+
+    private Boolean ignoreInsertError;
+
+    private Integer currentSize;
+    private Long lastExecuteTime;
+
+
+    public ClickHouseAppendSinkFunction(Properties properties){

Review comment:
       empty constructor?

##########
File path: flink-connector-clickhouse/pom.xml
##########
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink-parent_2.11</artifactId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <!--<groupId>com.apache.flink</groupId>-->
+    <artifactId>flink-connector-clickhouse_${scala.binary.version}</artifactId>
+    <!--<version>1.0-SNAPSHOT</version>-->
+
+    <name>flink-connector-clickhouse</name>
+
+    <packaging>jar</packaging>
+
+    <!-- FIXME change it to the project's website -->
+    <!--<url>http://www.example.com</url>-->
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_2.11</artifactId>

Review comment:
       replace 2.11 with ${scala.binary.version}

##########
File path: flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunction.java
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.flink.streaming.connectors.clickhouse;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ru.yandex.clickhouse.BalancedClickhouseDataSource;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.Properties;
+
+/**

Review comment:
       please remove this javadoc, author an date are in git

##########
File path: flink-connector-clickhouse/src/test/java/com/apache/flink/table/descriptors/ClickHouseTest.java
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.table.descriptors;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+/**
+ * Created by liufangliang on 2020/4/11.

Review comment:
       remove javadoc

##########
File path: flink-connector-clickhouse/src/test/java/com/apache/flink/table/descriptors/ClickHouseValidatorTest.java
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.table.descriptors;
+
+
+import org.junit.Test;
+
+/**
+ * Created by liufangliang on 2020/4/11.

Review comment:
       remove this

##########
File path: flink-connector-clickhouse/pom.xml
##########
@@ -0,0 +1,118 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink-parent_2.11</artifactId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <!--<groupId>com.apache.flink</groupId>-->
+    <artifactId>flink-connector-clickhouse_${scala.binary.version}</artifactId>
+    <!--<version>1.0-SNAPSHOT</version>-->
+
+    <name>flink-connector-clickhouse</name>
+
+    <packaging>jar</packaging>
+
+    <!-- FIXME change it to the project's website -->
+    <!--<url>http://www.example.com</url>-->
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>ru.yandex.clickhouse</groupId>
+            <artifactId>clickhouse-jdbc</artifactId>
+            <version>0.1.50</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.7</version>
+            <scope>runtime</scope>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
+            <scope>runtime</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>

Review comment:
       this are in parent pom no?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] pyscala commented on a change in pull request #85: [BAHIR-234] Add ClickHouse Connector for Flink

Posted by GitBox <gi...@apache.org>.
pyscala commented on a change in pull request #85:
URL: https://github.com/apache/bahir-flink/pull/85#discussion_r601276879



##########
File path: flink-connector-clickhouse/src/test/java/com/apache/flink/table/descriptors/ClickHouseTest.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.table.descriptors;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+public class ClickHouseTest {
+
+    @Test
+    public void toConnectorProperties() throws Exception {
+        ClickHouse clickhouse = new ClickHouse()
+            .version("1")
+            .address("jdbc:clickhouse://localhost:8123/default")
+            .batchSize(1)
+            .database("qtt")
+            .ignoreError(false)
+            .padding(2000L)
+            .table("insert_test")
+            .username("admin")
+            .password("admin")
+            .retryAttempts(3)
+            .retryInterval(3000L);
+        Map<String, String> connectorProperties = clickhouse.toConnectorProperties();
+        for (Map.Entry<String, String> entry : connectorProperties.entrySet()) {
+            System.out.println(entry.getKey() + ":" + entry.getValue());

Review comment:
       @eskabetxe  thanks for your reply, related optimization has been completed, review again.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] eskabetxe commented on a change in pull request #85: [BAHIR-234] Add ClickHouse Connector for Flink

Posted by GitBox <gi...@apache.org>.
eskabetxe commented on a change in pull request #85:
URL: https://github.com/apache/bahir-flink/pull/85#discussion_r597075880



##########
File path: flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunction.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.flink.streaming.connectors.clickhouse;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ru.yandex.clickhouse.BalancedClickhouseDataSource;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.Properties;
+
+public class ClickHouseAppendSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
+    private static final String USERNAME = "user";
+private static final String PASSWORD = "password";

Review comment:
       identation

##########
File path: flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunction.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.flink.streaming.connectors.clickhouse;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import ru.yandex.clickhouse.BalancedClickhouseDataSource;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.Properties;
+
+public class ClickHouseAppendSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
+    private static final String USERNAME = "user";
+private static final String PASSWORD = "password";
+
+    private static final Logger log = LoggerFactory.getLogger(ClickHouseAppendSinkFunction.class);
+    private static final long serialVersionUID = 1L;
+
+    private  Connection connection;
+    private  BalancedClickhouseDataSource dataSource;
+    private  PreparedStatement pstat;
+
+    private String address;
+    private String username;
+    private String password;
+
+    private String prepareStatement;
+    private Integer batchSize;
+    private Long commitPadding;
+
+    private Integer retries;
+    private Long retryInterval;
+
+    private Boolean ignoreInsertError;
+
+    private Integer currentSize;
+    private Long lastExecuteTime;
+
+    public ClickHouseAppendSinkFunction(String address, String username, String password, String prepareStatement, Integer batchSize, Long commitPadding, Integer retries, Long retryInterval, Boolean ignoreInsertError) {
+        this.address = address;
+        this.username = username;
+        this.password = password;
+        this.prepareStatement = prepareStatement;
+        this.batchSize = batchSize;
+        this.commitPadding = commitPadding;
+        this.retries = retries;
+        this.retryInterval = retryInterval;
+        this.ignoreInsertError = ignoreInsertError;
+    }
+
+    @Override
+    public void open(Configuration parameters) throws Exception {
+        super.open(parameters);
+        Properties properties = new Properties();
+        properties.setProperty(USERNAME, username);
+        properties.setProperty(PASSWORD, password);
+        ClickHouseProperties clickHouseProperties = new ClickHouseProperties(properties);
+        dataSource = new BalancedClickhouseDataSource(address, clickHouseProperties);
+        connection = dataSource.getConnection();
+        pstat = connection.prepareStatement(prepareStatement);
+        lastExecuteTime = System.currentTimeMillis();
+        currentSize = 0;
+
+    }
+
+    @Override
+    public void invoke(Row value, Context context) throws Exception {
+        for (int i = 0; i < value.getArity(); i++) {
+            pstat.setObject(i + 1, value.getField(i));
+        }
+        pstat.addBatch();
+        currentSize++;
+        if (currentSize >= batchSize || (System.currentTimeMillis() - lastExecuteTime) > commitPadding) {
+            try {
+                doExecuteRetries(retries, retryInterval);
+            } catch (Exception e) {
+                log.error("clickhouse-insert-error ( maxRetries:" + retries + " , retryInterval : " + retryInterval + " millisecond )" + e.getMessage());
+            } finally {
+                pstat.clearBatch();
+                currentSize = 0;
+                lastExecuteTime = System.currentTimeMillis();
+            }
+        }
+    }
+
+    public void doExecuteRetries(int count, long retryInterval) throws Exception {
+
+        int retrySize = 0;
+        Exception resultException = null;
+        for (int i = 0; i < count; i++) {
+            try {
+                pstat.executeBatch();
+                break;
+            } catch (Exception e) {
+                retrySize++;
+                resultException = e;
+            }
+            try {
+                Thread.sleep(retryInterval);
+            } catch (InterruptedException e) {
+                e.printStackTrace();

Review comment:
       if needed log the exception

##########
File path: flink-connector-clickhouse/pom.xml
##########
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink-parent_2.11</artifactId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-clickhouse_${scala.binary.version}</artifactId>
+    <name>flink-connector-clickhouse</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>ru.yandex.clickhouse</groupId>
+            <artifactId>clickhouse-jdbc</artifactId>
+            <version>0.1.50</version>

Review comment:
       this should be in <properties> so user can change this version

##########
File path: flink-connector-clickhouse/src/test/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunctionTest.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.connectors.clickhouse;
+
+import org.apache.flink.types.Row;
+import org.junit.Test;
+import ru.yandex.clickhouse.BalancedClickhouseDataSource;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.Properties;
+
+public class ClickHouseAppendSinkFunctionTest {
+
+
+    private static final String USERNAME = "user";
+    private static final String PASSWORD = "password";
+    private Connection connection;
+    private BalancedClickhouseDataSource dataSource;
+    private  PreparedStatement pstat;
+
+    @Test
+    public void open() throws Exception {
+    }
+
+    @Test
+    public void invoke() throws Exception {
+        Properties properties = new Properties();
+        properties.setProperty(USERNAME, "admin");
+        properties.setProperty(PASSWORD, "admin");
+        ClickHouseProperties clickHouseProperties = new ClickHouseProperties(properties);
+        dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://localhost:8123/default", clickHouseProperties);
+        connection = dataSource.getConnection();
+        pstat = connection.prepareStatement("");
+        Row value = new Row(2);
+        for (int i = 0; i < value.getArity(); i++) {
+            pstat.setObject(i + 1, value.getField(i));
+        }
+

Review comment:
       what is the test?

##########
File path: flink-connector-clickhouse/src/test/java/com/apache/flink/table/descriptors/ClickHouseTest.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.table.descriptors;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+public class ClickHouseTest {
+
+    @Test
+    public void toConnectorProperties() throws Exception {
+        ClickHouse clickhouse = new ClickHouse()
+            .version("1")
+            .address("jdbc:clickhouse://localhost:8123/default")
+            .batchSize(1)
+            .database("qtt")
+            .ignoreError(false)
+            .padding(2000L)
+            .table("insert_test")
+            .username("admin")
+            .password("admin")
+            .retryAttempts(3)
+            .retryInterval(3000L);
+        Map<String, String> connectorProperties = clickhouse.toConnectorProperties();
+        for (Map.Entry<String, String> entry : connectorProperties.entrySet()) {
+            System.out.println(entry.getKey() + ":" + entry.getValue());

Review comment:
       we should test that the map is what you expect and not print it to console

##########
File path: flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSink.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.flink.streaming.connectors.clickhouse;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An at-least-once Table sink for ClickHouse.
+ *
+ * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink (if
+ * checkpointing is enabled). However, one common use case is to run idempotent queries
+ * (e.g., <code>REPLACE</code> or <code>INSERT OVERWRITE</code>) to upsert into the database and
+ * achieve exactly-once semantic.</p>
+ */
+public class ClickHouseTableSink implements AppendStreamTableSink<Row> {
+
+    private static final Logger log = LoggerFactory.getLogger(ClickHouseTableSink.class);
+    private static final Integer BATCH_SIZE_DEFAULT = 5000;
+    private static final Long COMMIT_PADDING_DEFAULT = 5000L;
+    private static final Integer RETRIES_DEFAULT = 3;
+    private static final Long RETRY_INTERVAL_DEFAULT = 3000L;
+    private static final Boolean IGNORE_INSERT_ERROR = false;
+    private String address;
+    private String username;
+    private String password;
+    private String database;
+    private String table;
+    private TableSchema schema;
+    private Integer batchSize;
+    private Long commitPadding;
+    private Integer retries;
+    private Long retryInterval;
+    private Boolean ignoreInsertError;
+
+
+    public ClickHouseTableSink(String address, String username, String password, String database, String table, TableSchema schema, Integer batchSize, Long commitPadding, Integer retries, Long retryInterval, Boolean ignoreInsertError) {
+        this.address = address;
+        this.username = username;
+        this.password = password;
+        this.database = database;
+        this.table = table;
+        this.schema = schema;
+        this.batchSize = batchSize;
+        this.commitPadding = commitPadding;
+        this.retries = retries;
+        this.retryInterval = retryInterval;
+        this.ignoreInsertError = ignoreInsertError;
+
+    }
+
+    /**
+     * @param dataStream
+     * @deprecated
+     */
+    @Override
+    public void emitDataStream(DataStream<Row> dataStream) {
+        consumeDataStream(dataStream);
+    }
+
+    /**
+     *
+     * @return ClickHouseAppendSinkFunction
+     */
+    private ClickHouseAppendSinkFunction initSink() {
+        String prepareStatement = createPrepareStatement(schema, database, table);
+        return new ClickHouseAppendSinkFunction(address, username, password, prepareStatement, batchSize, commitPadding, retries, retryInterval, ignoreInsertError);
+    }
+
+    @Override
+    public TableSink<Row> configure(String[] strings, TypeInformation<?>[] typeInformations) {
+
+        ClickHouseTableSink copy;
+        try {
+            copy = new ClickHouseTableSink(address, username, password, database, table, schema, batchSize, commitPadding, retries, retryInterval, ignoreInsertError);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+        return copy;
+    }
+
+    @Override
+    public String[] getFieldNames() {
+        return schema.getFieldNames();
+    }
+
+    @Override
+    public TypeInformation<?>[] getFieldTypes() {
+        return schema.getFieldTypes();
+    }
+
+    @Override
+    public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
+        return dataStream.addSink(initSink())
+                .setParallelism(dataStream.getParallelism())
+                .name(TableConnectorUtils.generateRuntimeName(this.getClass(), schema.getFieldNames()));
+    }
+
+    @Override
+    public DataType getConsumedDataType() {
+        return schema.toRowDataType();
+    }
+
+
+    @Override
+    public TypeInformation<Row> getOutputType() {
+        return new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames());
+    }
+
+
+    @Override
+    public TableSchema getTableSchema() {
+        return schema;
+    }
+
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+
+    public String createPrepareStatement(TableSchema tableSchema, String database, String table) {
+        String[] fieldNames = tableSchema.getFieldNames();
+        String columns = String.join(",", fieldNames);
+        String[] questionMark = new String[fieldNames.length];
+        for (int i = 0; i < questionMark.length; i++) {

Review comment:
       this could be change to a stream map no?
   
   String questionMark = Arrays.stream(fieldNames)
                   .map(field -> "?")
                   .reduce((left,right) -> left+","+right)
                   .get();

##########
File path: flink-connector-clickhouse/src/main/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSink.java
##########
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.apache.flink.streaming.connectors.clickhouse;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.sinks.AppendStreamTableSink;
+import org.apache.flink.table.sinks.TableSink;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.utils.TableConnectorUtils;
+import org.apache.flink.types.Row;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An at-least-once Table sink for ClickHouse.
+ *
+ * <p>The mechanisms of Flink guarantees delivering messages at-least-once to this sink (if
+ * checkpointing is enabled). However, one common use case is to run idempotent queries
+ * (e.g., <code>REPLACE</code> or <code>INSERT OVERWRITE</code>) to upsert into the database and
+ * achieve exactly-once semantic.</p>
+ */
+public class ClickHouseTableSink implements AppendStreamTableSink<Row> {
+
+    private static final Logger log = LoggerFactory.getLogger(ClickHouseTableSink.class);
+    private static final Integer BATCH_SIZE_DEFAULT = 5000;
+    private static final Long COMMIT_PADDING_DEFAULT = 5000L;
+    private static final Integer RETRIES_DEFAULT = 3;
+    private static final Long RETRY_INTERVAL_DEFAULT = 3000L;
+    private static final Boolean IGNORE_INSERT_ERROR = false;
+    private String address;
+    private String username;
+    private String password;
+    private String database;
+    private String table;
+    private TableSchema schema;
+    private Integer batchSize;
+    private Long commitPadding;
+    private Integer retries;
+    private Long retryInterval;
+    private Boolean ignoreInsertError;
+
+
+    public ClickHouseTableSink(String address, String username, String password, String database, String table, TableSchema schema, Integer batchSize, Long commitPadding, Integer retries, Long retryInterval, Boolean ignoreInsertError) {
+        this.address = address;
+        this.username = username;
+        this.password = password;
+        this.database = database;
+        this.table = table;
+        this.schema = schema;
+        this.batchSize = batchSize;
+        this.commitPadding = commitPadding;
+        this.retries = retries;
+        this.retryInterval = retryInterval;
+        this.ignoreInsertError = ignoreInsertError;
+
+    }
+
+    /**
+     * @param dataStream
+     * @deprecated
+     */
+    @Override
+    public void emitDataStream(DataStream<Row> dataStream) {
+        consumeDataStream(dataStream);
+    }
+
+    /**
+     *
+     * @return ClickHouseAppendSinkFunction
+     */
+    private ClickHouseAppendSinkFunction initSink() {
+        String prepareStatement = createPrepareStatement(schema, database, table);
+        return new ClickHouseAppendSinkFunction(address, username, password, prepareStatement, batchSize, commitPadding, retries, retryInterval, ignoreInsertError);
+    }
+
+    @Override
+    public TableSink<Row> configure(String[] strings, TypeInformation<?>[] typeInformations) {
+
+        ClickHouseTableSink copy;
+        try {
+            copy = new ClickHouseTableSink(address, username, password, database, table, schema, batchSize, commitPadding, retries, retryInterval, ignoreInsertError);
+        } catch (Exception e) {
+            throw new RuntimeException(e);

Review comment:
       add a error message

##########
File path: flink-connector-clickhouse/src/test/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseAppendSinkFunctionTest.java
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.connectors.clickhouse;
+
+import org.apache.flink.types.Row;
+import org.junit.Test;
+import ru.yandex.clickhouse.BalancedClickhouseDataSource;
+import ru.yandex.clickhouse.settings.ClickHouseProperties;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.util.Properties;
+
+public class ClickHouseAppendSinkFunctionTest {
+
+
+    private static final String USERNAME = "user";
+    private static final String PASSWORD = "password";
+    private Connection connection;
+    private BalancedClickhouseDataSource dataSource;
+    private  PreparedStatement pstat;
+
+    @Test
+    public void open() throws Exception {

Review comment:
       empty test

##########
File path: flink-connector-clickhouse/src/test/java/com/apache/flink/streaming/connectors/clickhouse/ClickHouseTableSourceSinkFactoryTest.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.apache.flink.streaming.connectors.clickhouse;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.descriptors.DescriptorProperties;
+import org.apache.flink.table.descriptors.Schema;
+import org.apache.flink.table.sinks.StreamTableSink;
+import org.apache.flink.types.Row;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static com.apache.flink.table.descriptors.ClickHouseValidator.*;
+
+public class ClickHouseTableSourceSinkFactoryTest {
+
+
+    @Test
+    public void createStreamTableSink() throws Exception {
+

Review comment:
       you are testing with local clickhouse?
   we should have a testcontainer that create a clickhouse instance to test 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] eskabetxe commented on a change in pull request #85: [BAHIR-234] Add ClickHouse Connector for Flink

Posted by GitBox <gi...@apache.org>.
eskabetxe commented on a change in pull request #85:
URL: https://github.com/apache/bahir-flink/pull/85#discussion_r597075569



##########
File path: flink-connector-clickhouse/pom.xml
##########
@@ -0,0 +1,111 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.bahir</groupId>
+        <artifactId>bahir-flink-parent_2.11</artifactId>
+        <version>1.1-SNAPSHOT</version>
+        <relativePath>../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>flink-connector-clickhouse_${scala.binary.version}</artifactId>
+    <name>flink-connector-clickhouse</name>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>ru.yandex.clickhouse</groupId>
+            <artifactId>clickhouse-jdbc</artifactId>
+            <version>0.1.50</version>

Review comment:
       this should be in properties so user can change this version




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] pyscala commented on pull request #85: [BAHIR-234] Add ClickHouse Connector for Flink

Posted by GitBox <gi...@apache.org>.
pyscala commented on pull request #85:
URL: https://github.com/apache/bahir-flink/pull/85#issuecomment-635063992


   Add ClickHouse Connector for Flink #85
   Implement Streaming ClickHouseSink,support Flink Table API & Flink SQL
   for ClickHouse connector


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [bahir-flink] YikSanChan commented on pull request #85: [BAHIR-234] Add ClickHouse Connector for Flink

Posted by GitBox <gi...@apache.org>.
YikSanChan commented on pull request #85:
URL: https://github.com/apache/bahir-flink/pull/85#issuecomment-801860310


   @eskabetxe Is there remained concern on the PR? I'd love to see the PR merged so that I can use it in work :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org