You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/01/27 11:15:33 UTC
[incubator-inlong] branch master updated: [INLONG-2333] Support clickhouse sink in sort-single-tenant (#2355)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 0d7826c [INLONG-2333] Support clickhouse sink in sort-single-tenant (#2355)
0d7826c is described below
commit 0d7826c8105703e3d48ddfa5a03ea78a0e73a48d
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Thu Jan 27 19:15:25 2022 +0800
[INLONG-2333] Support clickhouse sink in sort-single-tenant (#2355)
Co-authored-by: tianqiwan <ti...@tencent.com>
---
.../flink/clickhouse/ClickHouseSinkFunction.java | 2 +-
inlong-sort/sort-single-tenant/pom.xml | 6 +++
.../inlong/sort/singletenant/flink/Entrance.java | 12 ++++-
.../clickhouse/ClickhouseRowSinkFunction.java | 53 ++++++++++++++++++++++
4 files changed, 70 insertions(+), 3 deletions(-)
diff --git a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/ClickHouseSinkFunction.java b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/ClickHouseSinkFunction.java
index fbedd7b..77996a1 100644
--- a/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/ClickHouseSinkFunction.java
+++ b/inlong-sort/sort-connectors/src/main/java/org/apache/inlong/sort/flink/clickhouse/ClickHouseSinkFunction.java
@@ -80,7 +80,7 @@ public class ClickHouseSinkFunction extends RichSinkFunction<Tuple2<Boolean, Row
}
@Override
- public void invoke(Tuple2<Boolean, Row> value) throws Exception {
+ public void invoke(Tuple2<Boolean, Row> value, Context context) throws Exception {
outputFormat.writeRecord(value);
}
}
diff --git a/inlong-sort/sort-single-tenant/pom.xml b/inlong-sort/sort-single-tenant/pom.xml
index e08031b..a94a2fd 100644
--- a/inlong-sort/sort-single-tenant/pom.xml
+++ b/inlong-sort/sort-single-tenant/pom.xml
@@ -48,6 +48,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.inlong</groupId>
+ <artifactId>sort-connectors</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<scope>provided</scope>
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
index d6faa16..c4d7364 100644
--- a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java
@@ -24,7 +24,6 @@ import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.util.Map;
-
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -34,11 +33,13 @@ import org.apache.iceberg.flink.sink.FlinkSink;
import org.apache.inlong.sort.configuration.Configuration;
import org.apache.inlong.sort.configuration.Constants;
import org.apache.inlong.sort.protocol.DataFlowInfo;
-import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
+import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
import org.apache.inlong.sort.protocol.sink.IcebergSinkInfo;
+import org.apache.inlong.sort.protocol.sink.KafkaSinkInfo;
import org.apache.inlong.sort.protocol.sink.SinkInfo;
import org.apache.inlong.sort.protocol.source.PulsarSourceInfo;
import org.apache.inlong.sort.protocol.source.SourceInfo;
+import org.apache.inlong.sort.singletenant.flink.clickhouse.ClickhouseRowSinkFunction;
import org.apache.inlong.sort.singletenant.flink.utils.CommonUtils;
import org.apache.inlong.sort.util.ParameterTool;
@@ -109,6 +110,13 @@ public class Entrance {
// TODO : implement sink functions below
switch (sinkType) {
case Constants.SINK_TYPE_CLICKHOUSE:
+ Preconditions.checkState(sinkInfo instanceof ClickHouseSinkInfo);
+ ClickHouseSinkInfo clickHouseSinkInfo = (ClickHouseSinkInfo) sinkInfo;
+
+ sourceStream.addSink(new ClickhouseRowSinkFunction(clickHouseSinkInfo))
+ .uid(Constants.SINK_UID)
+ .name("Clickhouse Sink")
+ .setParallelism(sinkParallelism);
break;
case Constants.SINK_TYPE_HIVE:
break;
diff --git a/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/clickhouse/ClickhouseRowSinkFunction.java b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/clickhouse/ClickhouseRowSinkFunction.java
new file mode 100644
index 0000000..da37cda
--- /dev/null
+++ b/inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/clickhouse/ClickhouseRowSinkFunction.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sort.singletenant.flink.clickhouse;
+
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.types.Row;
+import org.apache.inlong.sort.flink.clickhouse.ClickHouseSinkFunction;
+import org.apache.inlong.sort.protocol.sink.ClickHouseSinkInfo;
+
+public class ClickhouseRowSinkFunction extends RichSinkFunction<Row> implements CheckpointedFunction {
+
+ private final ClickHouseSinkFunction clickHouseSinkFunction;
+
+ public ClickhouseRowSinkFunction(ClickHouseSinkInfo clickHouseSinkInfo) {
+ this.clickHouseSinkFunction = new ClickHouseSinkFunction(Preconditions.checkNotNull(clickHouseSinkInfo));
+ }
+
+ @Override
+ public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
+ clickHouseSinkFunction.snapshotState(functionSnapshotContext);
+ }
+
+ @Override
+ public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
+ clickHouseSinkFunction.initializeState(functionInitializationContext);
+ }
+
+ @Override
+ public void invoke(Row value, Context context) throws Exception {
+ // Sort doesn't support retraction currently, so just set the flag to false.
+ clickHouseSinkFunction.invoke(Tuple2.of(false, value), context);
+ }
+}