You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/03/16 07:44:35 UTC

[incubator-inlong-website] branch master updated: [INLONG-295] Add document for introducing how to extend a new type of sink in sort (#296)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
     new e454617  [INLONG-295] Add document for introducing how to extend a new type of sink in sort (#296)
e454617 is described below

commit e454617469cb6615fdc6d5434ea2d8adfbaef560
Author: TianqiWan <52...@users.noreply.github.com>
AuthorDate: Wed Mar 16 15:43:30 2022 +0800

    [INLONG-295] Add document for introducing how to extend a new type of sink in sort (#296)
    
    Co-authored-by: tianqiwan <ti...@tencent.com>
---
 docs/design_and_concept/extend_sink_in_sort.md     | 106 ++++++++++++++++++++
 .../design_and_concept/extend_sink_in_sort.md      | 107 +++++++++++++++++++++
 2 files changed, 213 insertions(+)

diff --git a/docs/design_and_concept/extend_sink_in_sort.md b/docs/design_and_concept/extend_sink_in_sort.md
new file mode 100644
index 0000000..8aec561
--- /dev/null
+++ b/docs/design_and_concept/extend_sink_in_sort.md
@@ -0,0 +1,106 @@
+---
+title: Sort Plugin
+---
+
+# Overview
+InLong-Sort is known as a real-time ETL system. Currently, supported sinks are hive, kafka, clickhouse and iceberg.
+This article introduces how to extend a new type of sink in InLong-Sort.
+
+# Extend a new sink function
+InLong-Sort is based on flink, when extending a new type of sink in InLong-Sort, either a new type of flink sink
+or a predefined sink in flink is required.
+refer to [DataStream Connectors ](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/#datastream-connectors).
+
+# Extend a new sink protocol
+Firstly, implement a new sink protocol which extends
+`inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/SinkInfo.java`
+
+All necessary attributes used by the corresponding flink sink which extended before should be placed in the protocol
+
+Example
+```java
+public class DemoSinkInfo extends SinkInfo {
+    
+    // Place necessary attributes here
+
+    @JsonCreator
+    public DemoSinkInfo(FieldInfo[] fields) {
+        super(fields);
+    }
+}
+```
+
+Secondly, mark the new sink protocol as subtype of SinkInfo and give it a name
+
+Example : A new sink protocol DemoSinkInfo whose subtype name is `Constants.SINK_TYPE_DEMO`
+```java
+/**
+ * The base class of the data sink in the metadata.
+ */
+@JsonTypeInfo(
+        use = JsonTypeInfo.Id.NAME,
+        include = JsonTypeInfo.As.PROPERTY,
+        property = "type")
+@JsonSubTypes({
+        @Type(value = ClickHouseSinkInfo.class, name = Constants.SINK_TYPE_CLICKHOUSE),
+        @Type(value = HiveSinkInfo.class, name = Constants.SINK_TYPE_HIVE),
+        @Type(value = KafkaSinkInfo.class, name = Constants.SINK_TYPE_KAFKA),
+        @Type(value = IcebergSinkInfo.class, name = Constants.SINK_TYPE_ICEBERG),
+        
+        // The new sink protocol
+        @Type(value = DemoSinkInfo.class, name = Constants.SINK_TYPE_DEMO)}
+)
+public abstract class SinkInfo implements Serializable {
+
+    private static final long serialVersionUID = 1485856855405721745L;
+
+    @JsonProperty("fields")
+    private final FieldInfo[] fields;
+
+    public SinkInfo(@JsonProperty("fields") FieldInfo[] fields) {
+        this.fields = checkNotNull(fields);
+    }
+
+    @JsonProperty("fields")
+    public FieldInfo[] getFields() {
+        return fields;
+    }
+}
+```
+
+# Bundle the new sink, make it an alternative sink in Inlong_sort
+After extending a new flink sink and a new sink protocol in InLong-Sort, we can bundle the new sink into the flink job
+(the entrance of InLong-Sort)
+
+The entrance of the flink job is `inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java`
+
+Example
+
+``` java
+private static void buildSinkStream(
+        DataStream<Row> sourceStream,
+        Configuration config,
+        SinkInfo sinkInfo,
+        Map<String, Object> properties,
+        long dataflowId) throws IOException, ClassNotFoundException {
+    final String sinkType = checkNotNull(config.getString(Constants.SINK_TYPE));
+
+    switch (sinkType) {
+        case Constants.SINK_TYPE_CLICKHOUSE:
+            break;
+        case Constants.SINK_TYPE_HIVE:
+            break;
+        case Constants.SINK_TYPE_ICEBERG:
+            break;
+        case Constants.SINK_TYPE_KAFKA:
+            break;
+        case Constants.SINK_TYPE_DEMO:
+            // Add the extended sink function here
+            break;
+        default:
+            throw new IllegalArgumentException("Unsupported sink type " + sinkType);
+    }
+
+}
+
+```
\ No newline at end of file
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/extend_sink_in_sort.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/extend_sink_in_sort.md
new file mode 100644
index 0000000..30c9f68
--- /dev/null
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/extend_sink_in_sort.md
@@ -0,0 +1,107 @@
+---
+Sort插件
+---
+
+# 总览
+InLong-Sort是一个ETL系统,当前支持的sink类型包括hive,kafka,clickhouse以及iceberg。
+
+本文介绍如何在InLong-Sort中扩展一个新的sink类型。
+
+# 扩展flink sink
+InLong-Sort是一套基于flink计算引擎的ETL系统,在扩展新的sink到InLong-Sort前,首先需要扩展一个新的flink connector。
+
+如何扩展flink connector请参考flink官方文档[DataStream Connectors ](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/#datastream-connectors).
+
+# 扩展sink协议
+扩展完flink sink后,需要在InLong-Sort中扩展对应的sink协议。该协议用来描述该sink所需要的一些必要信息。
+
+扩展协议首先需要实现一个具体的类,继承父类
+`inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/SinkInfo.java`
+
+
+举例
+```java
+public class DemoSinkInfo extends SinkInfo {
+    
+    // Place necessary attributes here
+
+    @JsonCreator
+    public DemoSinkInfo(FieldInfo[] fields) {
+        super(fields);
+    }
+}
+```
+
+扩展完协议后,需要将该协议标记为SinkInfo的子类型,通过全局唯一的name进行标识。
+
+举例:扩展一个名为`Constants.SINK_TYPE_DEMO`的协议,该协议类为DemoSinkInfo
+```java
+/**
+ * The base class of the data sink in the metadata.
+ */
+@JsonTypeInfo(
+        use = JsonTypeInfo.Id.NAME,
+        include = JsonTypeInfo.As.PROPERTY,
+        property = "type")
+@JsonSubTypes({
+        @Type(value = ClickHouseSinkInfo.class, name = Constants.SINK_TYPE_CLICKHOUSE),
+        @Type(value = HiveSinkInfo.class, name = Constants.SINK_TYPE_HIVE),
+        @Type(value = KafkaSinkInfo.class, name = Constants.SINK_TYPE_KAFKA),
+        @Type(value = IcebergSinkInfo.class, name = Constants.SINK_TYPE_ICEBERG),
+        
+        // The new sink protocol
+        @Type(value = DemoSinkInfo.class, name = Constants.SINK_TYPE_DEMO)}
+)
+public abstract class SinkInfo implements Serializable {
+
+    private static final long serialVersionUID = 1485856855405721745L;
+
+    @JsonProperty("fields")
+    private final FieldInfo[] fields;
+
+    public SinkInfo(@JsonProperty("fields") FieldInfo[] fields) {
+        this.fields = checkNotNull(fields);
+    }
+
+    @JsonProperty("fields")
+    public FieldInfo[] getFields() {
+        return fields;
+    }
+}
+```
+
+# 集成新的Sink到InLong-Sort的主流程
+扩展完flink sink并实现了对应的sink协议后,我们就可以将该sink集成到InLong-Sort的主流程中了。
+
+InLong-Sort本质是一个flink的job,入口为`inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java`
+
+举例 :扩展一个名为`Constants.SINK_TYPE_DEMO`的sink
+
+``` java
+private static void buildSinkStream(
+        DataStream<Row> sourceStream,
+        Configuration config,
+        SinkInfo sinkInfo,
+        Map<String, Object> properties,
+        long dataflowId) throws IOException, ClassNotFoundException {
+    final String sinkType = checkNotNull(config.getString(Constants.SINK_TYPE));
+
+    switch (sinkType) {
+        case Constants.SINK_TYPE_CLICKHOUSE:
+            break;
+        case Constants.SINK_TYPE_HIVE:
+            break;
+        case Constants.SINK_TYPE_ICEBERG:
+            break;
+        case Constants.SINK_TYPE_KAFKA:
+            break;
+        case Constants.SINK_TYPE_DEMO:
+            // Add the extended sink function here
+            break;
+        default:
+            throw new IllegalArgumentException("Unsupported sink type " + sinkType);
+    }
+
+}
+
+```
\ No newline at end of file