You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/16 04:49:59 UTC

[incubator-inlong-website] branch master updated: [INLONG-403][Sort] Add doc about how to extend Extract or Load node (#404)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
     new 8dce291a4 [INLONG-403][Sort] Add doc about how to extend Extract or Load node (#404)
8dce291a4 is described below

commit 8dce291a4777d37460fd3e99393ec05ec4cd837d
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Thu Jun 16 12:49:55 2022 +0800

    [INLONG-403][Sort] Add doc about how to extend Extract or Load node (#404)
    
    Co-authored-by: onealliu <on...@tencent.com>
    Co-authored-by: healzhou <he...@gmail.com>
---
 .../how_to_extend_extract_or_load_node_en.md       | 203 ++++++++++++++++++++
 .../design_and_concept/how_to_write_plugin_sort.md | 107 -----------
 docs/design_and_concept/img/sort_uml.png           | Bin 0 -> 162399 bytes
 .../how_to_extend_extract_or_load_node_ch.md       | 208 +++++++++++++++++++++
 .../design_and_concept/how_to_write_plugin_sort.md | 108 -----------
 .../current/design_and_concept/img/sort_uml.png    | Bin 0 -> 162399 bytes
 6 files changed, 411 insertions(+), 215 deletions(-)

diff --git a/docs/design_and_concept/how_to_extend_extract_or_load_node_en.md b/docs/design_and_concept/how_to_extend_extract_or_load_node_en.md
new file mode 100644
index 000000000..dfed47276
--- /dev/null
+++ b/docs/design_and_concept/how_to_extend_extract_or_load_node_en.md
@@ -0,0 +1,203 @@
+---
+title: Sort Plugin
+sidebar_position: 3
+---
+
+## Overview
+
+InLong-Sort is known as a real-time ETL system. Currently, supported extract or load includes FileSystemExtractNode, KafkaExtractNode, MongoExtractNode, MySqlExtractNode, OracleExtractNode , PostgresExtractNode , PulsarExtractNode, SqlServerExtractNode, ClickHouseLoadNode, ElasticsearchLoadNode, FileSystemLoadNode, GreenplumLoadNode, HbaseLoadNode, HiveLoadNode, IcebergLoadNode, KafkaLoadNode, MySqlLoadNode, OracleLoadNode, PostgresLoadNode, SqlServerLoadNode, TDSQLPostgresLoadNode, etc. [...]
+
+This article describes how to extend a new source (abstracted as extract node in inlong) or a new sink (abstracted as load node in inlong) in InLong-Sort. After understanding the InLong-Sort architecture, you can understand how the source corresponds to the extract node, and how the sink corresponds to the load node. The architecture of inlong sort can be represented by UML object relation diagram as:
+
+![sort_UML](img/sort_uml.png)
+
+The concepts of each component are:
+
+| **Name**              | **Description**                                              |
+| --------------------- | ------------------------------------------------------------ |
+| **Group**             | data flow group, including multiple data flows, one group represents one data access |
+| **Stream**            | data flow, a data flow has a specific flow direction         |
+| **GroupInfo**         | encapsulation of data flow in sort. a groupinfo can contain multiple dataflowinfo |
+| **StreamInfo**        | abstract of data flow in sort, including various sources, transformations, destinations, etc. |
+| **Node**              | abstraction of data source, data transformation and data destination in data synchronization |
+| **ExtractNode**       | source-side abstraction for data synchronization             |
+| **TransformNode**     | transformation process abstraction of data synchronization   |
+| **LoadNode**          | destination abstraction for data synchronization             |
+| **NodeRelationShip**  | abstraction of each node relationship in data synchronization |
+| **FieldRelationShip** | abstraction of the relationship between upstream and downstream node fields in data synchronization |
+| **FieldInfo**         | node field                                                   |
+| **MetaFieldInfo**     | node meta fields                                             |
+| **Function**          | abstraction of transformation function                       |
+| **FunctionParam**     | input parameter abstraction of function                      |
+| **ConstantParam**     | constant parameters                                          |
+
+To extend the extract node or load node, you need to do the following:
+
+- Inherit the node class (such as MyExtractNode) and build specific extract or load usage logic;
+- In a specific node class (such as MyExtractNode), specify the corresponding Flink connector;
+- Use specific node classes in specific ETL implementation logic (such as MyExtractNode)
+
+In the second step, you can use the existing flick connector or extend it yourself. How to extend the flink connector, please refer to the official flink documentation[DataStream Connectors ](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/#datastream-connectors).
+
+## Extend a new extract node
+
+There are three steps to extend an ExtractNode: 
+
+**Step 1**:Inherit the ExtractNode class,the location of the class is: `incubator-inlong/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java`,  Specify the connector in the implemented ExtractNode.
+
+```Java
+// Inherit ExtractNode class and implement specific classes, such as MongoExtractNode
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("MongoExtract")
+@Data
+public class MongoExtractNode extends ExtractNode implements Serializable {
+    @JsonInclude(Include.NON_NULL)
+    @JsonProperty("primaryKey")
+    private String primaryKey;
+    ...
+
+    @JsonCreator
+    public MongoExtractNode(@JsonProperty("id") String id, ...) { ... }
+
+    @Override
+    public Map<String, String> tableOptions() {
+        Map<String, String> options = super.tableOptions();
+        // configure the specified connector, here is mongodb-cdc
+        options.put("connector", "mongodb-cdc");
+        ...
+        return options;
+    }
+}
+```
+
+**Step 2**:add the Extract to JsonSubTypes in ExtractNode and Node
+
+```java
+// add field in JsonSubTypes of ExtractNode and Node
+...
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = MongoExtractNode.class, name = "mongoExtract")
+})
+...
+public abstract class ExtractNode implements Node{...}
+
+...
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = MongoExtractNode.class, name = "mongoExtract")
+})
+public interface Node {...}
+```
+
+**Step 3**:Expand the Sort connector and check whether the corresponding connector already exists in the (`/incubator-inlong/inlong-sort/sort-connectors/mongodb-cdc`) directory. If you haven't already, you need to refer to the official flink documentation [DataStream Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/#datastream-connectors) to extend, directly call the existing flink-connector (such as`incubator-inlong/inlong-sort/s [...]
+
+## Extend a new load node
+
+There are three steps to extend an LoadNode: 
+
+**Step 1**:Inherit the LoadNode class, the location of the class is `incubator-inlong/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java`, specify the connector in the implemented LoadNode.
+
+```java
+// Inherit LoadNode class and implement specific classes, such as KafkaLoadNode
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("kafkaLoad")
+@Data
+@NoArgsConstructor
+public class KafkaLoadNode extends LoadNode implements Serializable {
+    @Nonnull
+    @JsonProperty("topic")
+    private String topic;
+    ...
+
+    @JsonCreator
+    public KafkaLoadNode(@Nonnull @JsonProperty("topic") String topic, ...) {...}
+
+    // configure and use different connectors according to different conditions
+    @Override
+    public Map<String, String> tableOptions() {
+      ...
+        if (format instanceof JsonFormat || format instanceof AvroFormat || format instanceof CsvFormat) {
+            if (StringUtils.isEmpty(this.primaryKey)) {
+                // kafka connector
+                options.put("connector", "kafka");
+                options.putAll(format.generateOptions(false));
+            } else {
+                options.put("connector", "upsert-kafka"); // upsert-kafka connector
+                options.putAll(format.generateOptions(true));
+            }
+        } else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) {
+            // kafka-inlong connector
+            options.put("connector", "kafka-inlong");
+            options.putAll(format.generateOptions(false));
+        } else {
+            throw new IllegalArgumentException("kafka load Node format is IllegalArgument");
+        }
+        return options;
+    }
+}
+```
+
+**Step 2**:add the Load to JsonSubTypes in ExtractNode and Node
+
+```java
+// add field in JsonSubTypes of LoadNode and Node
+...
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad")
+})
+...
+public abstract class LoadNode implements Node{...}
+
+...
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad")
+})
+public interface Node {...}
+```
+
+**Step 3**:Extend the Sort connector, Kafka's sort connector is in incubator-inlong/inlong-sort/sort-connectors/kafka.
+
+## Bundle extract node and load node into InLong-Sort
+
+To integrate extract and load into the InLong-Sort mainstream, you need to implement the semantics mentioned in the overview section: group, stream, node, etc. The entry class of InLong-sort is in `inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java`. How to integrate extract and load into InLong-Sort can refer to the following ut. First, build the corresponding extractnode and loadnode, then build noderelation, streaminfo and groupinfo, and finally use FlinkSqlParse [...]
+
+```java
+public class MongoExtractToKafkaLoad extends AbstractTestBase {
+
+    // create MongoExtractNode
+    private MongoExtractNode buildMongoNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("name", new StringFormatInfo()), ...);
+        return new MongoExtractNode(..., fields, ...);
+    }
+
+    // create KafkaLoadNode
+    private KafkaLoadNode buildAllMigrateKafkaNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("name", new StringFormatInfo()), ...);
+        List<FieldRelation> relations = Arrays.asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()), ...), ...);
+        CsvFormat csvFormat = new CsvFormat();
+        return new KafkaLoadNode(..., fields, relations, csvFormat, ...);
+    }
+
+    // create NodeRelation
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelation(inputIds, outputIds);
+    }
+
+    // test the main flow: mongodb to kafka
+    @Test
+    public void testMongoDbToKafka() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings. ... .build();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        ...
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildMongoNode();
+        Node outputNode = buildAllMigrateKafkaNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode), ...);
+        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+}
+```
diff --git a/docs/design_and_concept/how_to_write_plugin_sort.md b/docs/design_and_concept/how_to_write_plugin_sort.md
deleted file mode 100644
index ca589372a..000000000
--- a/docs/design_and_concept/how_to_write_plugin_sort.md
+++ /dev/null
@@ -1,107 +0,0 @@
----
-title: Sort Plugin
-sidebar_position: 3
----
-
-# Overview
-InLong-Sort is known as a real-time ETL system. Currently, supported sinks are hive, kafka, clickhouse and iceberg.
-This article introduces how to extend a new type of sink in InLong-Sort.
-
-# Extend a new sink function
-InLong-Sort is based on flink, when extending a new type of sink in InLong-Sort, either a new type of flink sink
-or a predefined sink in flink is required.
-refer to [DataStream Connectors ](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/#datastream-connectors).
-
-# Extend a new sink protocol
-Firstly, implement a new sink protocol which extends
-`inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/SinkInfo.java`
-
-All necessary attributes used by the corresponding flink sink which extended before should be placed in the protocol
-
-Example
-```java
-public class DemoSinkInfo extends SinkInfo {
-    
-    // Place necessary attributes here
-
-    @JsonCreator
-    public DemoSinkInfo(FieldInfo[] fields) {
-        super(fields);
-    }
-}
-```
-
-Secondly, mark the new sink protocol as subtype of SinkInfo and give it a name
-
-Example : A new sink protocol DemoSinkInfo whose subtype name is `Constants.SINK_TYPE_DEMO`
-```java
-/**
- * The base class of the data sink in the metadata.
- */
-@JsonTypeInfo(
-        use = JsonTypeInfo.Id.NAME,
-        include = JsonTypeInfo.As.PROPERTY,
-        property = "type")
-@JsonSubTypes({
-        @Type(value = ClickHouseSinkInfo.class, name = Constants.SINK_TYPE_CLICKHOUSE),
-        @Type(value = HiveSinkInfo.class, name = Constants.SINK_TYPE_HIVE),
-        @Type(value = KafkaSinkInfo.class, name = Constants.SINK_TYPE_KAFKA),
-        @Type(value = IcebergSinkInfo.class, name = Constants.SINK_TYPE_ICEBERG),
-        
-        // The new sink protocol
-        @Type(value = DemoSinkInfo.class, name = Constants.SINK_TYPE_DEMO)}
-)
-public abstract class SinkInfo implements Serializable {
-
-    private static final long serialVersionUID = 1485856855405721745L;
-
-    @JsonProperty("fields")
-    private final FieldInfo[] fields;
-
-    public SinkInfo(@JsonProperty("fields") FieldInfo[] fields) {
-        this.fields = checkNotNull(fields);
-    }
-
-    @JsonProperty("fields")
-    public FieldInfo[] getFields() {
-        return fields;
-    }
-}
-```
-
-# Bundle the new sink, make it an alternative sink in Inlong_sort
-After extending a new flink sink and a new sink protocol in InLong-Sort, we can bundle the new sink into the flink job
-(the entrance of InLong-Sort)
-
-The entrance of the flink job is `inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java`
-
-Example
-
-``` java
-private static void buildSinkStream(
-        DataStream<Row> sourceStream,
-        Configuration config,
-        SinkInfo sinkInfo,
-        Map<String, Object> properties,
-        long dataflowId) throws IOException, ClassNotFoundException {
-    final String sinkType = checkNotNull(config.getString(Constants.SINK_TYPE));
-
-    switch (sinkType) {
-        case Constants.SINK_TYPE_CLICKHOUSE:
-            break;
-        case Constants.SINK_TYPE_HIVE:
-            break;
-        case Constants.SINK_TYPE_ICEBERG:
-            break;
-        case Constants.SINK_TYPE_KAFKA:
-            break;
-        case Constants.SINK_TYPE_DEMO:
-            // Add the extended sink function here
-            break;
-        default:
-            throw new IllegalArgumentException("Unsupported sink type " + sinkType);
-    }
-
-}
-
-```
\ No newline at end of file
diff --git a/docs/design_and_concept/img/sort_uml.png b/docs/design_and_concept/img/sort_uml.png
new file mode 100644
index 000000000..35c964cac
Binary files /dev/null and b/docs/design_and_concept/img/sort_uml.png differ
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/how_to_extend_extract_or_load_node_ch.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/how_to_extend_extract_or_load_node_ch.md
new file mode 100644
index 000000000..b43494ec5
--- /dev/null
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/how_to_extend_extract_or_load_node_ch.md
@@ -0,0 +1,208 @@
+---
+title: Sort 插件
+sidebar_position: 3
+---
+
+## 总览
+
+InLong-Sort 是一个 ETL 系统,当前支持的 extract 或 load 包括 FileSystemExtractNode,KafkaExtractNode,MongoExtractNode,MySqlExtractNode,OracleExtractNode,PostgresExtractNode,PulsarExtractNode,SqlServerExtractNode,ClickHouseLoadNode ,ElasticsearchLoadNode,FileSystemLoadNode,GreenplumLoadNode,HbaseLoadNode,HiveLoadNode,IcebergLoadNode,KafkaLoadNode,MySqlLoadNode,OracleLoadNode,PostgresLoadNode,SqlServerLoadNode,TDSQLPostgresLoadNode 等。InLong-Sort是基于Flink SQL的ETL方案,Flink SQL强大的表达能力带来的高可扩展性、灵活性,基本上 Flink SQ [...]
+
+本文介绍如何在 InLong-Sort 中扩展一个新的 source(在 InLong 中抽象为 Extract Node)或一个新的 sink(在InLong中抽象为 Load Node )。在弄清楚 InLong 的架构之后,就可以明白 Source 与 Extract Node 如何对应,Sink 与 Load Node 如何对应。InLong-Sort 架构的 UML 对象关系图如下:
+
+![sort_uml](img/sort_uml.png)
+
+其中各个组件的概念为:
+
+| **名称**          | **描述**                                                   |
+| ----------------- | --------------------------------------------------------- |
+| Group             | 数据流组,包含多个数据流,一个 Group 代表一个数据接入             |
+| Stream            | 数据流,一个数据流有具体的流向                                 |
+| GroupInfo         | Sort 中对数据流向的封装,一个 GroupInfo 可包含多个 DataFlowInfo |
+| StreamInfo        | Sort 中数据流向的抽象,包含该数据流的各种来源、转换、去向等        |
+| Node              | 数据同步中数据源、数据转换、数据去向的抽象                       |
+| ExtractNode       | 数据同步的来源端抽象                                         |
+| TransformNode     | 数据同步的转换过程抽象                                       |
+| LoadNode          | 数据同步的去向端抽象                                         |
+| NodeRelationShip  | 数据同步中各个节点关系抽象                                    |
+| FieldRelationShip | 数据同步中上下游节点字段间关系的抽象                            |
+| FieldInfo         | 节点字段                                                   |
+| MetaFieldInfo     | 节点 Meta 字段                                             |
+| Function          | 转换函数的抽象                                              |
+| FunctionParam     | 函数的入参抽象                                              |
+| ConstantParam     | 常量参数                                                   |
+
+扩展 Extract Node 或 Load Node 需要做的工作是:
+
+- 继承 Node 类(例如 MyExtractNode),构建具体的 extract 或 load 使用逻辑;
+- 在具体的 Node 类(例如 MyExtractNode)中,指定对应 Flink connector;
+- 在具体的 ETL 实现逻辑中使用具体的 Node 类(例如 MyExtractNode)。
+
+其中第二步中可以使用已有的 Flink Connector,或者用户自己扩展,如何扩展 Flink Connector 请参考 Flink 官方文档[DataStream Connectors ](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/#datastream-connectors).
+
+## 扩展 Extract Node
+
+扩展一个 ExtractNode 分为三个步骤:
+
+**第一步**:继承 ExtractNode 类,类的位置在 `incubator-inlong/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java`;在实现的 ExtractNode 中指定 connecter;
+
+```Java
+// 继承 ExtractNode 类,实现具体的类,例如 MongoExtractNode
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("MongoExtract")
+@Data
+public class MongoExtractNode extends ExtractNode implements Serializable {
+    @JsonInclude(Include.NON_NULL)
+    @JsonProperty("primaryKey")
+    private String primaryKey;
+    ...
+
+    @JsonCreator
+    public MongoExtractNode(@JsonProperty("id") String id, ...) { ... }
+
+    @Override
+    public Map<String, String> tableOptions() {
+        Map<String, String> options = super.tableOptions();
+      	// 配置指定的 connector,这里指定的是 mongodb-cdc
+        options.put("connector", "mongodb-cdc");
+        ...
+        return options;
+    }
+}
+```
+
+**第二步**:在 ExtractNode 和 Node 中的 JsonSubTypes 添加该 Extract
+
+```java
+// 在 ExtractNode 和 Node 的 JsonSubTypes 中添加字段
+...
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = MongoExtractNode.class, name = "mongoExtract")
+})
+...
+public abstract class ExtractNode implements Node{...}
+
+...
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = MongoExtractNode.class, name = "mongoExtract")
+})
+public interface Node {...}
+```
+
+**第三步**:扩展 Sort Connector,查看此(`/incubator-inlong/inlong-sort/sort-connectors/mongodb-cdc`)目录下是否已经存在对应的 connector。如果没有,则需要参考 Flink 官方文档 [DataStream Connectors ](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/#datastream-connectors) 来扩展,调用已有的 Flink-connector(例如`incubator-inlong/inlong-sort/sort-connectors/mongodb-cdc`)或自行实现相关的 connecter 均可。
+
+## 扩展 Load Node
+
+扩展一个 LoadNode 分为三个步骤:
+
+**第一步**:继承 LoadNode 类,类的位置在 `incubator-inlong/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java`;在实现的LoadNode 中指定 connecter;
+
+```java
+// 继承 LoadNode 类,实现具体的类,例如 KafkaLoadNode
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeName("kafkaLoad")
+@Data
+@NoArgsConstructor
+public class KafkaLoadNode extends LoadNode implements Serializable {
+    @Nonnull
+    @JsonProperty("topic")
+    private String topic;
+    ...
+
+    @JsonCreator
+    public KafkaLoadNode(@Nonnull @JsonProperty("topic") String topic, ...) {...}
+
+    // 根据不同的条件配置使用不同的 connector
+    @Override
+    public Map<String, String> tableOptions() {
+        ...
+        if (format instanceof JsonFormat || format instanceof AvroFormat || format instanceof CsvFormat) {
+            if (StringUtils.isEmpty(this.primaryKey)) {
+                // kafka connector
+                options.put("connector", "kafka");
+                options.putAll(format.generateOptions(false));
+            } else {
+                // upsert-kafka connector
+                options.put("connector", "upsert-kafka");
+                options.putAll(format.generateOptions(true));
+            }
+        } else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) {
+            // kafka-inlong connector
+            options.put("connector", "kafka-inlong");
+            options.putAll(format.generateOptions(false));
+        } else {
+            throw new IllegalArgumentException("kafka load Node format is IllegalArgument");
+        }
+        return options;
+    }
+}
+```
+
+**第二步**:在 LoadNode 和 Node 中的 JsonSubTypes 添加该 Load
+
+```java
+// 在 LoadNode 和 Node 的 JsonSubTypes 中添加字段
+...
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad")
+})
+...
+public abstract class LoadNode implements Node{...}
+
+...
+@JsonSubTypes({
+        @JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad")
+})
+public interface Node {...}
+```
+
+**第三步**:扩展 Sort Connector,Kafka 的 sort connector 在 `incubator-inlong/inlong-sort/sort-connectors/kafka` 目录下。
+
+## 集成 Extract 和 Load 到 InLong-Sort 主流程
+
+将 Extract 和 Load 集成到 InLong-Sort 主流程中,需要构建总览小节中提到的语意:Group、Stream、Node 等。
+InLong-Sort 的入口类在`inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java`。
+
+Extract 和 Load 如何集成至 InLong-Sort,可参考下面的 UT,首先构建对应的 ExtractNode、LoadNode,再构建 NodeRelation、StreamInfo、GroupInfo,最后通过 FlinkSqlParser 执行。
+
+```java
+public class MongoExtractToKafkaLoad extends AbstractTestBase {
+
+    // 构建 MongoExtractNode
+    private MongoExtractNode buildMongoNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("name", new StringFormatInfo()), ...);
+        return new MongoExtractNode(..., fields, ...);
+    }
+
+    // 构建 KafkaLoadNode
+    private KafkaLoadNode buildAllMigrateKafkaNode() {
+        List<FieldInfo> fields = Arrays.asList(new FieldInfo("name", new StringFormatInfo()), ...);
+        List<FieldRelation> relations = Arrays.asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()), ...), ...);
+        CsvFormat csvFormat = new CsvFormat();
+        return new KafkaLoadNode(..., fields, relations, csvFormat, ...);
+    }
+
+    // 构建 NodeRelation
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelation(inputIds, outputIds);
+    }
+
+    // 测试主流程 MongoDB to Kafka
+    @Test
+    public void testMongoDbToKafka() throws Exception {
+        EnvironmentSettings settings = EnvironmentSettings. ... .build();
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+      	...
+        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
+        Node inputNode = buildMongoNode();
+        Node outputNode = buildAllMigrateKafkaNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode), ...);
+        GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
+        FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
+        ParseResult result = parser.parse();
+        Assert.assertTrue(result.tryExecute());
+    }
+}
+```
+
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/how_to_write_plugin_sort.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/how_to_write_plugin_sort.md
deleted file mode 100644
index 896b7456b..000000000
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/how_to_write_plugin_sort.md
+++ /dev/null
@@ -1,108 +0,0 @@
----
-title: Sort 插件
-sidebar_position: 3
----
-
-# 总览
-InLong-Sort是一个ETL系统,当前支持的sink类型包括hive,kafka,clickhouse以及iceberg。
-
-本文介绍如何在InLong-Sort中扩展一个新的sink类型。
-
-# 扩展flink sink
-InLong-Sort是一套基于flink计算引擎的ETL系统,在扩展新的sink到InLong-Sort前,首先需要扩展一个新的flink connector。
-
-如何扩展flink connector请参考flink官方文档[DataStream Connectors ](https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/#datastream-connectors).
-
-# 扩展sink协议
-扩展完flink sink后,需要在InLong-Sort中扩展对应的sink协议。该协议用来描述该sink所需要的一些必要信息。
-
-扩展协议首先需要实现一个具体的类,继承父类
-`inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/sink/SinkInfo.java`
-
-
-举例
-```java
-public class DemoSinkInfo extends SinkInfo {
-    
-    // Place necessary attributes here
-
-    @JsonCreator
-    public DemoSinkInfo(FieldInfo[] fields) {
-        super(fields);
-    }
-}
-```
-
-扩展完协议后,需要将该协议标记为SinkInfo的子类型,通过全局唯一的name进行标识。
-
-举例:扩展一个名为`Constants.SINK_TYPE_DEMO`的协议,该协议类为DemoSinkInfo
-```java
-/**
- * The base class of the data sink in the metadata.
- */
-@JsonTypeInfo(
-        use = JsonTypeInfo.Id.NAME,
-        include = JsonTypeInfo.As.PROPERTY,
-        property = "type")
-@JsonSubTypes({
-        @Type(value = ClickHouseSinkInfo.class, name = Constants.SINK_TYPE_CLICKHOUSE),
-        @Type(value = HiveSinkInfo.class, name = Constants.SINK_TYPE_HIVE),
-        @Type(value = KafkaSinkInfo.class, name = Constants.SINK_TYPE_KAFKA),
-        @Type(value = IcebergSinkInfo.class, name = Constants.SINK_TYPE_ICEBERG),
-        
-        // The new sink protocol
-        @Type(value = DemoSinkInfo.class, name = Constants.SINK_TYPE_DEMO)}
-)
-public abstract class SinkInfo implements Serializable {
-
-    private static final long serialVersionUID = 1485856855405721745L;
-
-    @JsonProperty("fields")
-    private final FieldInfo[] fields;
-
-    public SinkInfo(@JsonProperty("fields") FieldInfo[] fields) {
-        this.fields = checkNotNull(fields);
-    }
-
-    @JsonProperty("fields")
-    public FieldInfo[] getFields() {
-        return fields;
-    }
-}
-```
-
-# 集成新的Sink到InLong-Sort的主流程
-扩展完flink sink并实现了对应的sink协议后,我们就可以将该sink集成到InLong-Sort的主流程中了。
-
-InLong-Sort本质是一个flink的job,入口为`inlong-sort/sort-single-tenant/src/main/java/org/apache/inlong/sort/singletenant/flink/Entrance.java`
-
-举例 :扩展一个名为`Constants.SINK_TYPE_DEMO`的sink
-
-``` java
-private static void buildSinkStream(
-        DataStream<Row> sourceStream,
-        Configuration config,
-        SinkInfo sinkInfo,
-        Map<String, Object> properties,
-        long dataflowId) throws IOException, ClassNotFoundException {
-    final String sinkType = checkNotNull(config.getString(Constants.SINK_TYPE));
-
-    switch (sinkType) {
-        case Constants.SINK_TYPE_CLICKHOUSE:
-            break;
-        case Constants.SINK_TYPE_HIVE:
-            break;
-        case Constants.SINK_TYPE_ICEBERG:
-            break;
-        case Constants.SINK_TYPE_KAFKA:
-            break;
-        case Constants.SINK_TYPE_DEMO:
-            // Add the extended sink function here
-            break;
-        default:
-            throw new IllegalArgumentException("Unsupported sink type " + sinkType);
-    }
-
-}
-
-```
\ No newline at end of file
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/sort_uml.png b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/sort_uml.png
new file mode 100644
index 000000000..35c964cac
Binary files /dev/null and b/i18n/zh-CN/docusaurus-plugin-content-docs/current/design_and_concept/img/sort_uml.png differ