You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/27 11:15:33 UTC

[incubator-inlong] branch master updated: [INLONG-2333] Support clickhouse sink in sort-single-tenant (#2355)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 0d7826c  [INLONG-2333] Support clickhouse sink in sort-single-tenant (#2355)
0d7826c is described below

commit 0d7826c8105703e3d48ddfa5a03ea78a0e73a48d
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Thu Jan 27 19:15:25 2022 +0800

    [INLONG-2333] Support clickhouse sink in sort-single-tenant (#2355)
    
    Co-authored-by: tianqiwan <ti...@tencent.com>
---
 .../flink/clickhouse/ClickHouseSinkFunction.java   |  2 +-
 inlong-sort/sort-single-tenant/pom.xml             |  6 +++
 .../inlong/sort/singletenant/flink/Entrance.java   | 12 ++++-
 .../clickhouse/ClickhouseRowSinkFunction.java      | 53 ++++++++++++++++++++++
 4 files changed, 70 insertions(+), 3 deletions(-)

diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/ClickHouseSinkFunction.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/ClickHouseSinkFunction.java
index fbedd7b..77996a1 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/ClickHouseSinkFunction.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/ClickHouseSinkFunction.java
@@ -80,7 +80,7 @@ public class ClickHouseSinkFunction extends RichSinkFunction<Tuple2<Boolean, Row
     }
 
     @Override
-    public void invoke(Tuple2<Boolean, Row> value) throws Exception {
+    public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
         outputFormat.writeRecord(value);
     }
 }
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index e08031b..a94a2fd 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -48,6 +48,12 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.inlong</groupId>
+            <artifactId>sort-connectors</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-core</artifactId>
             <scope>provided</scope>
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
index d6faa16..c4d7364 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
@@ -24,7 +24,6 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
 import java.util.Map;
-
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,11 +33,13 @@ import org.apache.iceberg.flink.sink.FlinkSink;
 import org.apache.inlong.sort.configuration.Configuration;
 import org.apache.inlong.sort.configuration.Constants;
 import org.apache.inlong.sort.protocol.DataFlowInfo;
-import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
+import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
 import org.apache.inlong.sort.protocol.sink.IcebergSinkInfo;
+import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
 import org.apache.inlong.sort.protocol.sink.SinkInfo;
 import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
 import org.apache.inlong.sort.protocol.source.SourceInfo;
+import org.apache.inlong.sort.singletenant.flink.clickhouse.ClickhouseRowSinkFunction;
 import org.apache.inlong.sort.singletenant.flink.utils.CommonUtils;
 import org.apache.inlong.sort.util.ParameterTool;
 
@@ -109,6 +110,13 @@ public class Entrance {
         // TODO : implement sink functions below
         switch (sinkType) {
             case Constants.SINK_TYPE_CLICKHOUSE:
+                Preconditions.checkState(sinkInfo instanceof ClickHouseSinkInfo);
+                ClickHouseSinkInfo clickHouseSinkInfo = (ClickHouseSinkInfo) sinkInfo;
+
+                sourceStream.addSink(new ClickhouseRowSinkFunction(clickHouseSinkInfo))
+                        .uid(Constants.SINK_UID)
+                        .name("Clickhouse Sink")
+                        .setParallelism(sinkParallelism);
                 break;
             case Constants.SINK_TYPE_HIVE:
                 break;
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/clickhouse/ClickhouseRowSinkFunction.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/clickhouse/ClickhouseRowSinkFunction.java
new file mode 100644
index 0000000..da37cda
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/clickhouse/ClickhouseRowSinkFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.clickhouse;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.flink.clickhouse.ClickHouseSinkFunction;
+import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
+
+public class ClickhouseRowSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
+
+    private final ClickHouseSinkFunction clickHouseSinkFunction;
+
+    public ClickhouseRowSinkFunction(ClickHouseSinkInfo clickHouseSinkInfo) {
+        this.clickHouseSinkFunction = new ClickHouseSinkFunction(Preconditions.checkNotNull(clickHouseSinkInfo));
+    }
+
+    @Override
+    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+        clickHouseSinkFunction.snapshotState(functionSnapshotContext);
+    }
+
+    @Override
+    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
+        clickHouseSinkFunction.initializeState(functionInitializationContext);
+    }
+
+    @Override
+    public void invoke(Row value, Context context) throws Exception {
+        // Sort doesn't support retraction currently, so just set the flag to false.
+        clickHouseSinkFunction.invoke(Tuple2.of(false, value), context);
+    }
+}