You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/06/16 04:52:10 UTC

[incubator-inlong-website] branch master updated: [INLONG-411][Blog] Add an blog about InLong sort ETL (#415)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
     new 1299e5c43 [INLONG-411][Blog] Add an blog about InLong sort ETL (#415)
1299e5c43 is described below

commit 1299e5c43e0e8c97da02d27c301d3ea1fc89df87
Author: Oneal65 <li...@foxmail.com>
AuthorDate: Thu Jun 16 12:52:05 2022 +0800

    [INLONG-411][Blog] Add an blog about InLong sort ETL (#415)
---
 blog/InLong_Sort_ETL_en.md                         | 363 ++++++++++++++++++++
 blog/img/architecture.png                          | Bin 0 -> 50147 bytes
 blog/img/sort-module-structure.png                 | Bin 0 -> 149482 bytes
 blog/img/sort-operation-flow.png                   | Bin 0 -> 11916 bytes
 blog/img/sort-usecase.png                          | Bin 0 -> 38548 bytes
 blog/img/sort_UML.png                              | Bin 0 -> 197443 bytes
 .../InLong_Sort_ETL_ch.md                          | 364 +++++++++++++++++++++
 .../img/architecture.png                           | Bin 0 -> 50147 bytes
 .../img/sort-module-structure.png                  | Bin 0 -> 149482 bytes
 .../img/sort-operation-flow.png                    | Bin 0 -> 11916 bytes
 .../img/sort-usecase.png                           | Bin 0 -> 38548 bytes
 .../img/sort_UML.png                               | Bin 0 -> 197443 bytes
 12 files changed, 727 insertions(+)

diff --git a/blog/InLong_Sort_ETL_en.md b/blog/InLong_Sort_ETL_en.md
new file mode 100644
index 000000000..5a84f6247
--- /dev/null
+++ b/blog/InLong_Sort_ETL_en.md
@@ -0,0 +1,363 @@
+---
+title: Analysis of InLong Sort ETL Solution Based on Apache Flink SQL
+sidebar_position: 4
+---
+
+# Analysis of InLong Sort ETL Solution Based on Apache Flink SQL
+
+# 1. Background
+
+With the increasing number of users and developers of Apache InLong(incubating), the demand for richer usage scenarios and low-cost operation is getting stronger and stronger. Among them, the demand for adding Transform (T) to the whole link of InLong has received the most feedback. After the research and design of @yunqingmoswu, @EMsnap, @gong, @thexiay community developers, the InLong Sort ETL solution based on Flink SQL has been completed. This article will introduce the implementatio [...]
+
+First of all, based on Apache Flink SQL, there are mainly the following considerations:
+
+-  Flink SQL has high scalability and flexibility brought about by its powerful expression ability. Basically, Flink SQL can support most demand scenarios in the community. When the built-in functions of Flink SQL do not meet the requirements, we can also extend them through various UDFs.
+-  Compared with the implementation of the underlying API of Flink, the development cost of Flink SQL is lower. Only for the first time, the conversion logic of Flink SQL needs to be implemented. In the future, we can focus on the construction of the ability of Flink SQL, such as the extension connector and the UDF.
+- In general, Flink SQL will be more robust and run more stable. The reason is that Flink SQL shields a lot of the underlying details of Flink, has strong community support, and has been practiced by a large number of users.
+- For users, Flink SQL is also easier to understand, especially for users who have used SQL, the usage is simple and familiar, which helps users to land quickly.
+- For the migration of existing real-time tasks, if they are originally SQL-type tasks, especially Flink SQL tasks, the migration cost is extremely low, and in some cases, no changes are even required.
+
+**Note**:  for all codes of this scheme, please refer to [Apache inlong sort]( https://github.com/apache/incubator-inlong/tree/master/inlong-sort )Module, which can be downloaded and used in the upcoming version 1.2.0.
+
+# 2. Introduction
+
+## 2.1 Requirements
+
+The main requirements of this solution are the completed inlong sort module transform (T) capability, including:
+
+|          Transform          |                            Notes                             |
+| :-------------------------: | :----------------------------------------------------------: |
+| Deduplication in the window |            Deduplicate data within a time window             |
+|   time window aggregation   |             Aggregate data within a time window              |
+|   time format conversion    | Converts the value of a field to a string in the target time format |
+|     field segmentation      |    Split a field into multiple new fields by a delimiter     |
+|     string replacement      |    Replace some or all of the contents of a string field     |
+|       Data filtering        |   Discard or retain data that meets the filter conditions    |
+|     Content extraction      |        Extract part of a field to create a new field         |
+|            Join             |                    Support two table join                    |
+|     Value substitution      | Given a matching value, if the field's value is equal to that value, replace it with the target value |
+
+## 2.2 Usage Scenarios
+
+Users of big data integration have transform requirements such as data transformation, connection and filtering in many business scenarios.
+
+## 2.3 Design Goal
+
+This design needs to achieve the following goals:
+
+- Functionality: Under InLong Sort's existing architecture and data flow model, it covers basic Transform capabilities and has the ability to expand rapidly.
+- Compatibility: The new InLong Sort data model is forward compatible to ensure that historical tasks can be configured and run properly.
+- Maintainability: The conversion of the InLong Sort data model to Flink SQL only needs to be implemented once. When there are new functional requirements later, this part does not need to be changed, even if there are changes, it can be supported with a small amount of changes.
+- Extensibility: When the open source Flink Connector or the built-in Flink SQL function does not meet the requirements, you can customize the Flink Connector and UDF to achieve its function expansion.
+
+## 2.4 Basic Concepts
+
+The core concept refers to the explanation of terms in the outline design
+
+|            Name             |                           Meaning                            |
+| :-------------------------: | :----------------------------------------------------------: |
+|      InLong Dashborad       |            Inlong front end management interface             |
+|    InLong Manager Client    | Wrap the interface in the manager for external user programs to call without going through the front-end inlong dashboard |
+|   InLong Manager Openapi    |      Inlong manager and external system call interface       |
+|   InLong Manager metaData   | Inlong manager metadata management, including metadata information of group and stream dimensions |
+| InLong Manager task manager | Inlong manager manages the data source collection task module, manages agent task distribution, instruction distribution, and heartbeat reporting |
+|        InLong Group         | Data flow group, including multiple data flows, one group represents one data access |
+|        InLong Stream        |     Data flow: a data flow has a specific flow direction     |
+|        Stream Source        | There are corresponding acquisition end and sink end in the stream. This design only involves the stream source |
+|         Stream Info         | Abstract of data flow in sort, including various sources, transformations, destinations, etc. of the data flow |
+|         Group Info          | Encapsulation of data flow in sort. A groupinfo can contain multiple stream infos |
+|            Node             | Abstraction of data source, data transformation and data destination in data synchronization |
+|        Extract Node         |       Source side abstraction of data synchronization        |
+|          Load Node          |       Destination abstraction of data synchronization        |
+|     MySQL Extract Node      |                MySQL data source abstraction                 |
+|       Kafka Load Node       |              Kafka data destination abstraction              |
+|       Transform Node        |  Transformation process abstraction of data synchronization  |
+|  Aggregate Transform Node   | Data synchronization aggregation class transformation process abstraction |
+|        Node Relation        |  Relationship abstraction of nodes in data synchronization   |
+|       Field Relation        | Abstraction of the relationship between upstream and downstream node fields in data synchronization |
+|          Function           | Abstraction of the relationship between upstream and downstream node fields in data synchronization |
+|     Substring Function      |         Abstraction of string interception function          |
+|       Filter Function       |             Abstraction of data filter function              |
+|       Function Param        |           Input parameter abstraction of function            |
+|       Constant Param        |                     Constant parameters                      |
+|         Field Info          |                          Node field                          |
+|       Meta FieldInfo        |                 Node meta information field                  |
+
+
+
+## 2.5 Domain Model
+
+This design mainly involves the following entities: 
+
+Group、Stream、GroupInfo、StreamInfo、Node、NodeRelation、FieldRelation、Function、FilterFunction、SubstringFunction、FunctionParam、FieldInfo、MetaFieldInfo、MySQLExtractNode、KafkaLoadNode and etc.
+
+For ease of understanding, this section will model and analyze the relationship between entities. Description of entity correspondence of domain model:
+
+- One group corresponds to one groupinfo
+- A group contains one or more streams
+- One stream corresponds to one streaminfo
+- A groupinfo contains one or more streaminfo
+- A streaminfo contains multiple nodes
+- A streaminfo contains one or more NodeRelations
+- A noderelation contains one or more fieldrelations
+- A NodeRelation contains 0 or more filterfunctions
+- A fieldrelation contains one function or one fieldinfo as the source field and one fieldinfo as the target field
+- A function contains one or more FunctionParams
+
+The above relationship can be represented by UML object relationship diagram as:
+
+![sort_UML](./img/sort_UML.png)
+
+## 2.6 Function Use-case Diagram
+
+![sort-usecase](./img/sort-usecase.png)
+
+# 3. System Outline Design
+
+## 3.1 System Architecture Diagram
+
+![architecture](./img/architecture.png)
+
+- Serialization: Serialization Implementation Module
+- Deserialization: Deserialization Implementation Module
+- Flink Source: Custom Flink source implementation module
+- Flink Sink: Custom Flink sink implementation module
+- Transformation: Custom Transform implementation module
+- GroupInfo: Corresponding to Inlong group
+- StreamInfo: Corresponding to inlong stream
+- Node: Abstraction of data source, data conversion and data destination in data synchronization
+- FlinkSQLParser: SQL parser
+
+## 3.2 InLong Sort Internal Operation Flow Chart
+
+![sort-operation-flow](./img/sort-operation-flow.png)
+
+## 3.3 Module Design
+
+This design only adds Flink connector and flinksql generator to the original system, and modifies the data model module.
+
+### 3.3.1 Module Structure
+
+![sort-module-structure](./img/sort-module-structure.png)
+
+### 3.3.2 Module Division
+
+Description of important module division:
+
+|       Name        |                         Description                          |
+| :---------------: | :----------------------------------------------------------: |
+|  FlinkSQLParser   | Used to generate flinksql core classes, including references to groupinfo |
+|     GroupInfo     | The internal abstraction of sort for inlong group is used to encapsulate the synchronization related information of the entire inlong group, including the reference to list\<streaminfo\> |
+|    StreamInfo     | The internal abstraction of sort to inlong stream is used to encapsulate inlong stream synchronization related information, including references to list\<node\>, list\<noderelation\> |
+|       Node        | The top-level interface of the synchronization node. Its subclass implementation is mainly used to encapsulate the data of the synchronization data source and the transformation node |
+|    ExtractNode    |      Data extract node abstraction, inherited from node      |
+|     LoadNode      |       Data load node abstraction, inherited from node        |
+|   TransformNode   |  Data transformation node abstraction, inherited from node   |
+|   NodeRelation    |              Define relationships between nodes              |
+|   FieldRelation   |           Define field relationships between nodes           |
+|     Function      |           Abstract of T-ability execution function           |
+|  FilterFunction   | Function abstraction for data filtering, inherited from function |
+| SubstringFunction | Used for string interception function abstraction, inherited from function |
+|   FunctionParam   |             Abstraction for function parameters              |
+|   ConstantParam   | Encapsulation of function constant parameters, inherited from FunctionParam |
+|     FieldInfo     | The encapsulation of node fields can also be used as function input parameters, inherited from functionparam |
+|   MetaFieldInfo   | The encapsulation of built-in fields is currently mainly used in the metadata field scenario of canal JSON, which is inherited from fieldinfo |
+
+# 4. Detailed System Design
+
+The following describes the principle of SQL generation by taking MySQL synchronizing data to Kafka as an example
+
+## 4.1 Node Described in SQL
+
+### 4.1.1 ExtractNode Described in SQL
+
+The node configuration is:
+
+**nodeconfig1**
+
+```java
+ private Node buildMySQLExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()));
+        return new MySqlExtractNode("1", "mysql_input", fields,
+                null, null, "id",
+                Collections.singletonList("tableName"), "localhost", "root", "password",
+                "inlong", null, null,
+                null, null);
+    }
+```
+
+The generated SQL is:
+
+**ss**
+
+```sql
+CREATE TABLE `mysql_1` (`name` string,`age` int) 
+with 
+('connector' = 'mysql-cdc-inlong',
+'hostname' = 'localhost',
+'username' = 'root',
+'password' = 'password',
+'database-name' = 'inlong',
+'table-name' = 'tableName')
+
+```
+
+
+
+### 4.1.2 TransformNode  Described in SQL
+
+The node configuration is:
+
+**nodeconfig2**
+
+```java
+ List<FilterFunction> filters = Arrays.asList(
+                new SingleValueFilterFunction(EmptyOperator.getInstance(),
+                        new FieldInfo("age", new IntFormatInfo()),
+                        LessThanOperator.getInstance(), new ConstantParam(25)),
+                new SingleValueFilterFunction(AndOperator.getInstance(),
+                        new FieldInfo("age", new IntFormatInfo()),
+                        MoreThanOrEqualOperator.getInstance(), new ConstantParam(18))
+        );
+
+```
+
+The generated SQL is:
+
+**ss2**
+
+```sql
+SELECT `name` AS `name`,`age` AS `age` FROM `mysql_1` WHERE `age` < 25 AND `age` >= 18
+
+```
+
+
+
+### 4.1.3 LoadNode Described in SQL
+
+The node configuration is:
+
+**nodeconfig3**
+
+```java
+ private Node buildKafkaLoadNode(FilterStrategy filterStrategy) {
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo())
+        );
+        List<FieldRelation> relations = Arrays
+                .asList(
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
+                                new FieldInfo("age", new IntFormatInfo()))
+                );
+        List<FilterFunction> filters = Arrays.asList(
+                new SingleValueFilterFunction(EmptyOperator.getInstance(),
+                        new FieldInfo("age", new IntFormatInfo()),
+                        LessThanOperator.getInstance(), new ConstantParam(25)),
+                new SingleValueFilterFunction(AndOperator.getInstance(),
+                        new FieldInfo("age", new IntFormatInfo()),
+                        MoreThanOrEqualOperator.getInstance(), new ConstantParam(18))
+        );
+        return new KafkaLoadNode("2", "kafka_output", fields, relations, filters,
+                filterStrategy, "topic1", "localhost:9092",
+                new CanalJsonFormat(), null,
+                null, "id");
+    }
+
+```
+
+The generated SQL is:
+
+**ss3**
+
+```sql
+CREATE TABLE `kafka_3` (`name` string,`age` int) 
+with (
+'connector' = 'kafka-inlong',
+'topic' = 'topic1',
+'properties.bootstrap.servers' = 'localhost:9092',
+'format' = 'canal-json-inlong',
+'canal-json-inlong.ignore-parse-errors' = 'true',
+'canal-json-inlong.map-null-key.mode' = 'DROP',
+'canal-json-inlong.encode.decimal-as-plain-number' = 'true',
+'canal-json-inlong.timestamp-format.standard' = 'SQL',
+'canal-json-inlong.map-null-key.literal' = 'null'
+)
+
+```
+
+
+
+## 4.2 Field T Described in SQL
+
+### 4.2.1 Filter operator
+
+See 4.1 node configuration for relevant configurations
+
+The generated SQL is:
+
+**ss4**
+
+```sql
+INSERT INTO `kafka_3` SELECT `name` AS `name`,`age` AS `age` FROM `mysql_1` WHERE `age` < 25 AND `age` >= 18
+
+```
+
+### 4.2.2 Watermark
+
+The complete configuration of groupinfo is as follows:
+
+**nodeconfig3**
+
+```java
+private Node buildMySqlExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()));
+        WatermarkField wk = new WatermarkField(new FieldInfo("ts", new TimestampFormatInfo()),
+                new StringConstantParam("1"),
+                new TimeUnitConstantParam(TimeUnit.MINUTE));
+        return new MySqlExtractNode("1", "mysql_input", fields,
+                wk, null, "id",
+                Collections.singletonList("tableName"), "localhost", "root", "password",
+                "inlong", null, null,
+                null, null);
+    }
+
+    private Node buildKafkaNode() {
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()));
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
+                                new FieldInfo("age", new IntFormatInfo()))
+                );
+        return new KafkaLoadNode("2", "kafka_output", fields, relations, null, null,
+                "topic", "localhost:9092", new JsonFormat(),
+                1, null, "id");
+    }
+
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelation(inputIds, outputIds);
+    }
+
+    @Override
+    public GroupInfo getTestObject() {
+        Node input = buildMySqlExtractNode();
+        Node output = buildKafkaNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(input, output), Collections.singletonList(
+                buildNodeRelation(Collections.singletonList(input), Collections.singletonList(output))));
+        return new GroupInfo("1", Collections.singletonList(streamInfo));
+    }
+```
+
diff --git a/blog/img/architecture.png b/blog/img/architecture.png
new file mode 100644
index 000000000..0f4e23d1d
Binary files /dev/null and b/blog/img/architecture.png differ
diff --git a/blog/img/sort-module-structure.png b/blog/img/sort-module-structure.png
new file mode 100644
index 000000000..786b7db5f
Binary files /dev/null and b/blog/img/sort-module-structure.png differ
diff --git a/blog/img/sort-operation-flow.png b/blog/img/sort-operation-flow.png
new file mode 100644
index 000000000..b41a1ee25
Binary files /dev/null and b/blog/img/sort-operation-flow.png differ
diff --git a/blog/img/sort-usecase.png b/blog/img/sort-usecase.png
new file mode 100644
index 000000000..08b4d2d6d
Binary files /dev/null and b/blog/img/sort-usecase.png differ
diff --git a/blog/img/sort_UML.png b/blog/img/sort_UML.png
new file mode 100644
index 000000000..cfcd07f77
Binary files /dev/null and b/blog/img/sort_UML.png differ
diff --git a/i18n/zh-CN/docusaurus-plugin-content-blog/InLong_Sort_ETL_ch.md b/i18n/zh-CN/docusaurus-plugin-content-blog/InLong_Sort_ETL_ch.md
new file mode 100644
index 000000000..641163f2d
--- /dev/null
+++ b/i18n/zh-CN/docusaurus-plugin-content-blog/InLong_Sort_ETL_ch.md
@@ -0,0 +1,364 @@
+---
+title: 基于 Apache Flink SQL 的 InLong Sort ETL 方案解析
+sidebar_position: 4
+---
+
+# 基于 Apache Flink SQL 的 InLong Sort ETL 方案解析
+
+# 一、背景
+
+随着 Apache InLong(incubating) 的用户和开发者逐渐增多,更丰富的使用场景和低成本运营诉求越来越强烈,其中,InLong 全链路增加 Transform(T)的需求反馈最多。经过@yunqingmoswu、@EMsnap、@gong、@thexiay 社区开发者的调研和设计,完成了基于 Flink SQL 的 InLong Sort ETL 方案,本文将详细介绍该方案的实现细节。
+
+首先,基于 Apache Flink SQL 主要有以下方面的考量:
+
+-  Flink SQL 拥有强大的表达能力带来的高可扩展性、灵活性,基本上 Flink SQL 能支持社区大多数需求场景。当 Flink SQL 内置的函数不满足需求时,我们还可通过各种UDF来扩展。
+-  Flink SQL 相比 Flink 底层 API 实现开发成本更低,只有第一次需要实现 Flink SQL 的转换逻辑,后续可专注于 Flink SQL 能力本身的构建,比如扩展 Connector、自定义函数UDF等。
+- 一般来说,Flink SQL 将更健壮、运行也将更稳定。原因在于 Flink SQL 屏蔽了 Flink 底层大量的细节,有强大的社区支持,并且经过大量用户的实践。
+- 对用户来说,Flink SQL 也更加通俗易懂,特别是对使用过 SQL 用户来说,使用方式简单、熟悉,这有助于用户快速落地。
+- 对于存量实时任务的迁移,如果其原本就是 SQL 类型的任务,尤其是 Flink SQL 任务,其迁移成本极低,部分情况下甚至都不用做任何改动。
+
+注意:本方案的所有代码,可以参考[ Apache InLong Sort ](https://github.com/apache/incubator-inlong/tree/master/inlong-sort)模块,所含功能可在即将发布的 1.2.0 版本中下载使用。
+
+# 二、方案介绍
+
+## 2.1 方案需求
+
+该方案的主要需求,是完成的 InLong Sort 模块 Transform(T)能力,包括:
+
+|  Transform   |                            说明                            |
+| :----------: | :--------------------------------------------------------: |
+|  窗口内去重  |                 在一个时间窗口内对数据去重                 |
+| 时间窗口聚合 |             在一个时间窗口内对数据进行聚合操作             |
+| 时间格式转换 |          将一个字段的值转换为目标时间格式的字符串          |
+|   字段分割   |         将一个字段通过某个分割符分割为多个新的字段         |
+|  字符串替换  |           将替换一个字符串字段中的部分或全部内容           |
+|   数据过滤   |              将满足过滤条件的数据舍弃或者保留              |
+|   内容提取   |            提取一个字段的一部分产生一个新的字段            |
+|     连接     |                       支持两表 Join                        |
+|    值替换    | 给定一个匹配值,如果该字段的值等于该值,则将其替换为目标值 |
+
+## 2.2 使用场景
+
+大数据集成的用户,在很多业务场景下都有数据转换、连接、过滤等 Transform 需求。
+
+## 2.3 设计目标
+
+本次设计需要达到以下目标:
+
+- 功能性:在 InLong Sort 现有架构、数据流模型下,覆盖基本的 Transform 能力,并具备快速扩张的能力。
+- 兼容性:新的 InLong Sort 数据模型向前兼容,确保历史任务能够正常配置运行。
+- 可维护性:InLong Sort 数据模型转 Flink SQL 只需实现一遍,后期有新增的功能需求时,这块不需要改动,哪怕有改动也是少量改动即可支持。
+- 可扩展性:当出现开源 Flink Connector 或者内置 Flink SQL 函数不满足需求时,可通过自定义 Flink Connector、UDF 来实现其功能扩展。
+
+## 2.4 基本概念
+
+核心概念参照概要设计中的名词解释
+
+|            名称             |                             含义                             |
+| :-------------------------: | :----------------------------------------------------------: |
+|      InLong Dashborad       |                     Inlong 前端管理界面                      |
+|    InLong Manager Client    | 将 Manager 当中的接口进行包装,供外部用户程序调用,不经过前端 InLong Dashboard |
+|   InLong Manager Openapi    |               Inlong manager与外部系统调用接口               |
+|   InLong Manager metaData   | Inlong manager 元数据管理,包括group、stream纬度的元数据信息 |
+| InLong Manager task manager | Inlong manager中管理数据源采集任务模块,管理agent的任务下发,指令下发,心跳上报 |
+|        InLong Group         |     数据流组,包含多个数据流,一个Group 代表一个数据接入     |
+|        InLong Stream        |                数据流,一个数据流有具体的流向                |
+|        Stream Source        |  流中有对应的采集端和sink端,本设计中只涉及到 stream source  |
+|         Stream Info         |  Sort中数据流向的抽象,包含该数据流的各种来源、转换、去向等  |
+|         Group Info          |  Sort中对数据流向的封装,一个GroupInfo可包含多个Stream Info  |
+|            Node             |          数据同步中数据源、数据转换、数据去向的抽象          |
+|        Extract Node         |                     数据同步的来源端抽象                     |
+|          Load Node          |                     数据同步的去向端抽象                     |
+|     MySQL Extract Node      |                      MySQL 数据来源抽象                      |
+|       Kafka Load Node       |                      kafka 数据去向抽象                      |
+|       Transform Node        |                    数据同步的转换过程抽象                    |
+|  Aggregate Transform Node   |                  数据同步聚合类转换过程抽象                  |
+|        Node Relation        |                  数据同步中各个节点关系抽象                  |
+|       Field Relation        |             数据同步中上下游节点字段间关系的抽象             |
+|          Function           |       转换函数的抽象,即数据同步T中各个T能力实现的抽象       |
+|     Substring Function      |                     字符串截取函数的抽象                     |
+|       Filter Function       |                      数据过滤函数的抽象                      |
+|       Function Param        |                        函数的入参抽象                        |
+|       Constant Param        |                           常量参数                           |
+|         Field Info          |                           节点字段                           |
+|       Meta FieldInfo        |                        节点元信息字段                        |
+
+
+
+## 2.5 领域模型
+
+本次设计主要涉及到以下实体:
+
+Group、Stream、GroupInfo、StreamInfo、Node、NodeRelation、FieldRelation、Function、FilterFunction、SubstringFunction、FunctionParam、FieldInfo、MetaFieldInfo、MySQLExtractNode、KafkaLoadNode 等
+
+为了便于理解,本小节将对实体之间关系进行建模分析。领域模型的实体对应关系说明:
+
+- 一个 Group 对应一个 GroupInfo
+- 一个 Group 包含一个或者多个 Stream
+- 一个 Stream 对应一个 StreamInfo
+- 一个 GroupInfo 包含一个或者多个 StreamInfo
+- 一个 StreamInfo 包含多个 Node
+- 一个 StreamInfo 包含 1 个或者多个 NodeRelation
+- 一个 NodeRelation 包含 1 个或者多个 FieldRelation
+- 一个 NodeRelation 包含 0 个或者多个 FilterFunction
+- 一个 FieldRelation 包含 1 个Function或者一个 FieldInfo 作为来源字段,一个 FieldInfo 作为目标字段
+- 一个 Function 包含 1 个或者多个 FunctionParam
+
+上述关系由UML对象关系图可以表示为:
+
+![sort_UML](./img/sort_UML.png)
+
+## 2.6 功能用例图
+
+![sort-usecase](./img/sort-usecase.png)
+
+# 三、系统概要设计
+
+## 3.1 系统架构图
+
+![architecture](./img/architecture.png)
+
+- Serialization: 序列化实现模块
+- Deserialization: 反序列化实现模块
+- Flink Source: 自定义Flink source实现模块
+- Flink Sink:自定义的Flink sink实现模块
+- Transformation: 自定义的Transform实现模块
+- GroupInfo: 对应 Inlong group
+- StreamInfo: 对应 inlong stream
+- Node: 对数据同步中数据来源、数据转换、数据去向的抽象
+- FlinkSQLParser: SQL解析器
+
+## 3.2 InLong Sort 内部运行流程图
+
+![](./img/sort-operation-flow.png)
+
+## 3.3 模块设计
+
+本次设计只对原有系统增加 Flink Connector、FlinkSQL Generator 两个模块,对 Data Model 模块有修改。
+
+### 3.3.1 模块结构
+
+![](./img/sort-module-structure.png)
+
+### 3.3.2 模块划分
+
+重要模块划分说明:
+
+|       名称        |                             说明                             |
+| :---------------: | :----------------------------------------------------------: |
+|  FlinkSQLParser   |       用于生成 FlinkSQL 核心类,包含 GroupInfo 的引用        |
+|     GroupInfo     | Sort内部对 inlong group 的抽象,用于封装整个 inlong group 同步相关信息,包含对 List\<StreamInfo\> 的引用 |
+|    StreamInfo     | Sort内部对 inlong stream 的抽象,用于封装 inlong stream 同步相关信息,包含List\<Node\>、List\<NodeRelation\> 的引用 |
+|       Node        | 同步节点的顶层接口,它的各个子类实现主要用于对同步数据源、转换节点的数据封装 |
+|    ExtractNode    |               数据extract节点抽象,继承自 Node                |
+|     LoadNode      |                 数据load节点抽象,继承自 Node                 |
+|   TransformNode   |                数据转换节点抽象,继承自 Node                 |
+|   NodeRelation    |                       定义节点间的关系                       |
+|   FieldRelation   |                     定义节点间字段的关系                     |
+|     Function      |                     T能力执行函数的抽象                      |
+|  FilterFunction   |         用于数据过滤的 Function 抽象,继承自 Function         |
+| SubstringFunction |         用于字符串截取 Function 抽象,继承自 Function         |
+|   FunctionParam   |                      用于函数参数的抽象                      |
+|   ConstantParam   |           函数常量参数的封装,继承自 FunctionParam           |
+|     FieldInfo     |   节点字段的封装,也可做函数入参使用,继承自 FunctionParam    |
+|   MetaFieldInfo   | 内置字段的封装,目前主要用于 canal-json 的元数据字段场景,继承自 FieldInfo |
+
+# 四、系统详细设计
+
+下面具体以 MySQL 同步数据到Kafka为例来说明 SQL 生成的原理
+
+## 4.1 Node 生成 SQL
+
+### 4.1.1 ExtractNode 生成 SQL
+
+节点配置为:
+
+**nodeconfig1**
+
+```java
+ private Node buildMySQLExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()));
+        return new MySqlExtractNode("1", "mysql_input", fields,
+                null, null, "id",
+                Collections.singletonList("tableName"), "localhost", "root", "password",
+                "inlong", null, null,
+                null, null);
+    }
+```
+
+生成的SQL为:
+
+**ss**
+
+```sql
+CREATE TABLE `mysql_1` (`name` string,`age` int) 
+with 
+('connector' = 'mysql-cdc-inlong',
+'hostname' = 'localhost',
+'username' = 'root',
+'password' = 'password',
+'database-name' = 'inlong',
+'table-name' = 'tableName')
+
+```
+
+
+
+### 4.1.2 TransformNode 生成 SQL
+
+节点配置为:
+
+**nodeconfig2**
+
+```java
+ List<FilterFunction> filters = Arrays.asList(
+                new SingleValueFilterFunction(EmptyOperator.getInstance(),
+                        new FieldInfo("age", new IntFormatInfo()),
+                        LessThanOperator.getInstance(), new ConstantParam(25)),
+                new SingleValueFilterFunction(AndOperator.getInstance(),
+                        new FieldInfo("age", new IntFormatInfo()),
+                        MoreThanOrEqualOperator.getInstance(), new ConstantParam(18))
+        );
+
+```
+
+生成的SQL为:
+
+**ss2**
+
+```sql
+SELECT `name` AS `name`,`age` AS `age` FROM `mysql_1` WHERE `age` < 25 AND `age` >= 18
+
+```
+
+
+
+### 4.1.3 LoadNode 生成 SQL
+
+节点配置为:
+
+**nodeconfig3**
+
+```java
+ private Node buildKafkaLoadNode(FilterStrategy filterStrategy) {
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo())
+        );
+        List<FieldRelation> relations = Arrays
+                .asList(
+                        new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
+                                new FieldInfo("age", new IntFormatInfo()))
+                );
+        List<FilterFunction> filters = Arrays.asList(
+                new SingleValueFilterFunction(EmptyOperator.getInstance(),
+                        new FieldInfo("age", new IntFormatInfo()),
+                        LessThanOperator.getInstance(), new ConstantParam(25)),
+                new SingleValueFilterFunction(AndOperator.getInstance(),
+                        new FieldInfo("age", new IntFormatInfo()),
+                        MoreThanOrEqualOperator.getInstance(), new ConstantParam(18))
+        );
+        return new KafkaLoadNode("2", "kafka_output", fields, relations, filters,
+                filterStrategy, "topic1", "localhost:9092",
+                new CanalJsonFormat(), null,
+                null, "id");
+    }
+
+```
+
+
+
+生成的SQL为:
+
+**ss3**
+
+```sql
+CREATE TABLE `kafka_3` (`name` string,`age` int) 
+with (
+'connector' = 'kafka-inlong',
+'topic' = 'topic1',
+'properties.bootstrap.servers' = 'localhost:9092',
+'format' = 'canal-json-inlong',
+'canal-json-inlong.ignore-parse-errors' = 'true',
+'canal-json-inlong.map-null-key.mode' = 'DROP',
+'canal-json-inlong.encode.decimal-as-plain-number' = 'true',
+'canal-json-inlong.timestamp-format.standard' = 'SQL',
+'canal-json-inlong.map-null-key.literal' = 'null'
+)
+
+```
+
+## 4.2 字段 T 生成 SQL
+
+### 4.2.1 过滤算子
+
+相关配置见 4.1 节点配置
+
+生成的SQL分别为:
+
+**ss4**
+
+```sql
+INSERT INTO `kafka_3` SELECT `name` AS `name`,`age` AS `age` FROM `mysql_1` WHERE `age` < 25 AND `age` >= 18
+
+```
+
+### 4.2.2 水位线
+
+GroupInfo 完整配置如下:
+
+**nodeconfig3**
+
+```java
+private Node buildMySqlExtractNode() {
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()));
+        WatermarkField wk = new WatermarkField(new FieldInfo("ts", new TimestampFormatInfo()),
+                new StringConstantParam("1"),
+                new TimeUnitConstantParam(TimeUnit.MINUTE));
+        return new MySqlExtractNode("1", "mysql_input", fields,
+                wk, null, "id",
+                Collections.singletonList("tableName"), "localhost", "root", "password",
+                "inlong", null, null,
+                null, null);
+    }
+
+    private Node buildKafkaNode() {
+        List<FieldInfo> fields = Arrays.asList(
+                new FieldInfo("name", new StringFormatInfo()),
+                new FieldInfo("age", new IntFormatInfo()),
+                new FieldInfo("ts", new TimestampFormatInfo()));
+        List<FieldRelation> relations = Arrays
+                .asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()),
+                                new FieldInfo("name", new StringFormatInfo())),
+                        new FieldRelation(new FieldInfo("age", new IntFormatInfo()),
+                                new FieldInfo("age", new IntFormatInfo()))
+                );
+        return new KafkaLoadNode("2", "kafka_output", fields, relations, null, null,
+                "topic", "localhost:9092", new JsonFormat(),
+                1, null, "id");
+    }
+
+    private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
+        List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
+        List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
+        return new NodeRelation(inputIds, outputIds);
+    }
+
+    @Override
+    public GroupInfo getTestObject() {
+        Node input = buildMySqlExtractNode();
+        Node output = buildKafkaNode();
+        StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(input, output), Collections.singletonList(
+                buildNodeRelation(Collections.singletonList(input), Collections.singletonList(output))));
+        return new GroupInfo("1", Collections.singletonList(streamInfo));
+    }
+
+```
+
diff --git a/i18n/zh-CN/docusaurus-plugin-content-blog/img/architecture.png b/i18n/zh-CN/docusaurus-plugin-content-blog/img/architecture.png
new file mode 100644
index 000000000..0f4e23d1d
Binary files /dev/null and b/i18n/zh-CN/docusaurus-plugin-content-blog/img/architecture.png differ
diff --git a/i18n/zh-CN/docusaurus-plugin-content-blog/img/sort-module-structure.png b/i18n/zh-CN/docusaurus-plugin-content-blog/img/sort-module-structure.png
new file mode 100644
index 000000000..786b7db5f
Binary files /dev/null and b/i18n/zh-CN/docusaurus-plugin-content-blog/img/sort-module-structure.png differ
diff --git a/i18n/zh-CN/docusaurus-plugin-content-blog/img/sort-operation-flow.png b/i18n/zh-CN/docusaurus-plugin-content-blog/img/sort-operation-flow.png
new file mode 100644
index 000000000..b41a1ee25
Binary files /dev/null and b/i18n/zh-CN/docusaurus-plugin-content-blog/img/sort-operation-flow.png differ
diff --git a/i18n/zh-CN/docusaurus-plugin-content-blog/img/sort-usecase.png b/i18n/zh-CN/docusaurus-plugin-content-blog/img/sort-usecase.png
new file mode 100644
index 000000000..08b4d2d6d
Binary files /dev/null and b/i18n/zh-CN/docusaurus-plugin-content-blog/img/sort-usecase.png differ
diff --git a/i18n/zh-CN/docusaurus-plugin-content-blog/img/sort_UML.png b/i18n/zh-CN/docusaurus-plugin-content-blog/img/sort_UML.png
new file mode 100644
index 000000000..cfcd07f77
Binary files /dev/null and b/i18n/zh-CN/docusaurus-plugin-content-blog/img/sort_UML.png differ