You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by zi...@apache.org on 2022/11/09 09:39:26 UTC

[inlong-website] branch master updated: [INLONG-585][Doc] Add document for multiple sink of Doris (#586)

This is an automated email from the ASF dual-hosted git repository.

zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong-website.git


The following commit(s) were added to refs/heads/master by this push:
     new 05d012e20a [INLONG-585][Doc] Add document for multiple sink of Doris (#586)
05d012e20a is described below

commit 05d012e20a3a8c6e392160115ed4c4aaf3f1ab50
Author: kuansix <49...@qq.com>
AuthorDate: Wed Nov 9 17:39:22 2022 +0800

    [INLONG-585][Doc] Add document for multiple sink of Doris (#586)
    
    fix error in kafka doc
    Co-authored-by: EMsnap <st...@tencent.com>
    Co-authored-by: EMsnap <zp...@connect.ust.hk>
---
 docs/data_node/load_node/doris.md                  | 163 +++++++++++++++++++--
 docs/data_node/load_node/kafka.md                  |   6 +-
 .../current/data_node/load_node/doris.md           | 161 ++++++++++++++++++--
 .../current/data_node/load_node/kafka.md           |   6 +-
 4 files changed, 302 insertions(+), 34 deletions(-)

diff --git a/docs/data_node/load_node/doris.md b/docs/data_node/load_node/doris.md
index 2facf8e0dd..8dd6a15770 100644
--- a/docs/data_node/load_node/doris.md
+++ b/docs/data_node/load_node/doris.md
@@ -6,9 +6,11 @@ sidebar_position: 16
 import {siteVariables} from '../../version';
 
 ## Overview
-
-The `Doris Load` node supports writing data to the Doris database. This document describes how to set up a
-Doris Load node to run SQL queries against the Doris database.
+ - `Doris Load` node supports writing data to the Doris database.
+ - Two modes are supported for sink to Doris: 
+Single-sink for specify fixed database name and table name to sink. 
+Multi-sink for custom database name and table name according to src format, which suitable for scenarios such as multi-table writing or whole database synchronization.
+ - This document describes how to set up a Doris Load node to sink to Doris.
 
 ## Supported Version
 
@@ -33,11 +35,11 @@ such as Maven or SBT is provided below.
 </code></pre>
 
 ## Prepare
-### Create a MySql Extract table
-First create a table `cdc_mysql_source` in the MySql database, the command is as follows:
+### Create MySql Extract table
+- For Single-sink: Create a table `cdc.cdc_mysql_source` in the MySQL database. The command is as follows:
 ```sql
 [root@fe001 ~]# mysql -u root -h localhost -P 3306 -p123456
-mysql> use test;
+mysql> use cdc;
 Database changed
 mysql> CREATE TABLE `cdc_mysql_source` (
        `id` int(11) NOT NULL AUTO_INCREMENT,
@@ -59,18 +61,65 @@ mysql> select * from cdc_mysql_source;
 |  2 | lisi     |  0 |
 |  3 | wangwu   |  0 |
 +----+----------+----+
-3 rows in set (0.07 sec)     
+3 rows in set (0.07 sec)
+```
+- For Multi-sink: Create tables `user_db.user_id_name`、`user_db.user_id_name` in the MySQL database. The command is as follows:
+```sql
+[root@fe001 ~]# mysql -u root -h localhost -P 3306 -p123456
+mysql> use user_db;
+Database changed
+mysql> CREATE TABLE `user_id_name` (
+       `id` int(11) NOT NULL AUTO_INCREMENT,
+       `name` varchar(64) DEFAULT NULL
+       PRIMARY KEY (`id`)
+       );
+Query OK, 0 rows affected (0.02 sec)
+
+mysql> CREATE TABLE `user_id_score` (
+       `id` int(11) NOT NULL AUTO_INCREMENT,
+       `score` double default 0,
+       PRIMARY KEY (`id`)
+       );
+Query OK, 0 rows affected (0.02 sec)
+
+mysql> insert into user_id_name values(1001, 'lily'),(1002, 'tom'),(1003, 'alan');
+Query OK, 3 rows affected (0.01 sec)
+Records: 3  Duplicates: 0  Warnings: 0 
+
+mysql> insert into user_id_score values(1001, 99),(1002, 96),(1003, 98);
+Query OK, 3 rows affected (0.01 sec)
+Records: 3  Duplicates: 0  Warnings: 0 
+
+mysql> select * from user_id_name;
++------+--------+
+|  id  | name   |
++------+--------+
+| 1001 | lily   |
+| 1002 | tom    |
+| 1003 | alan   |
++----+----------+
+3 rows in set (0.07 sec)    
+
+mysql> select * from user_id_score;
++------+------+
+|  id  | name |
++------+------+
+| 1001 | 99   |
+| 1002 | 96   |
+| 1003 | 98   |
++----+--------+
+3 rows in set (0.07 sec)  
 ```
 
-### Create a Doris Load table
-Create a table `cdc_doris_sink` in the Doris database, the command is as follows:
+### Create Doris Load table
+- For Single-sink: Create a table `cdc.cdc_doris_sink` in the Doris database. The command is as follows:
 ```sql
 [root@fe001 ~]# mysql -u root -h localhost -P 9030 -p000000
-mysql> use test;
+mysql> use cdc;
 Reading table information for completion of table and column names
 You can turn off this feature to get a quicker startup with -A
-
 Database changed
+
 mysql> CREATE TABLE `cdc_doris_sink` (
        `id` int(11) NOT NULL COMMENT "user id",
        `name` varchar(50) NOT NULL COMMENT "user name",
@@ -84,11 +133,43 @@ mysql> CREATE TABLE `cdc_doris_sink` (
        );
 Query OK, 0 rows affected (0.06 sec)
 ```
+- For Multi-sink: Create tables `user_db.doris_user_id_name`、`user_db.doris_user_id_score` in the Doris database. The command is as follows:
+```sql
+[root@fe001 ~]# mysql -u root -h localhost -P 9030 -p000000
+mysql> use user_db;
+Reading table information for completion of table and column names
+You can turn off this feature to get a quicker startup with -A
+Database changed
+
+mysql> CREATE TABLE `doris_user_id_name` (
+       `id` int(11) NOT NULL COMMENT "用户id",
+       `name` varchar(50) NOT NULL COMMENT "昵称"
+       ) ENGINE=OLAP
+       UNIQUE KEY(`id`)
+       COMMENT "OLAP"
+       DISTRIBUTED BY HASH(`id`) BUCKETS 1
+       PROPERTIES (
+       "replication_allocation" = "tag.location.default: 1"
+       );
+Query OK, 0 rows affected (0.06 sec)
+
+mysql> CREATE TABLE `doris_user_id_score` (
+       `id` int(11) NOT NULL COMMENT "用户id",
+       `score` double default 0
+       ) ENGINE=OLAP
+       UNIQUE KEY(`id`)
+       COMMENT "OLAP"
+       DISTRIBUTED BY HASH(`id`) BUCKETS 1
+       PROPERTIES (
+       "replication_allocation" = "tag.location.default: 1"
+       );
+Query OK, 0 rows affected (0.06 sec)
+```
 
 ## How to create a Doris Load Node
 
 ### Usage for SQL API
-
+- For Single-sink: Doris load
 ```sql
 [root@tasknode001 flink-1.13.5]# ./bin/sql-client.sh -l ./opt/connectors/mysql-cdc-inlong/ -l ./opt/connectors/doris/
 Flink SQL> SET 'execution.checkpointing.interval' = '3s';
@@ -108,7 +189,7 @@ Flink SQL> CREATE TABLE cdc_mysql_source (
     >  'port' = '3306',
     >  'username' = 'root',
     >  'password' = '123456',
-    >  'database-name' = 'test',
+    >  'database-name' = 'cdc',
     >  'table-name' = 'cdc_mysql_source'
     > );
 [INFO] Execute statement succeed.
@@ -118,9 +199,9 @@ Flink SQL> CREATE TABLE cdc_doris_sink (
     > name STRING,
     > dr TINYINT
     > ) WITH (
-    >  'connector' = 'doris',
+    >  'connector' = 'doris-inlong',
     >  'fenodes' = 'localhost:8030',
-    >  'table.identifier' = 'test.cdc_doris_sink',
+    >  'table.identifier' = 'cdc.cdc_doris_sink',
     >  'username' = 'root',
     >  'password' = '000000',
     >  'sink.properties.format' = 'json',
@@ -134,7 +215,54 @@ Flink SQL> insert into cdc_doris_sink select * from cdc_mysql_source /*+ OPTIONS
 [INFO] Submitting SQL update statement to the cluster...
 [INFO] SQL update statement has been successfully submitted to the cluster:
 Job ID: 5f89691571d7b3f3ca446589e3d0c3d3
+```
+- For Single-sink: Doris load
+```sql
+./bin/sql-client.sh -l ./opt/connectors/mysql-cdc-inlong/ -l ./opt/connectors/doris/
+Flink SQL> SET 'execution.checkpointing.interval' = '3s';
+[INFO] Session property has been set.
+
+Flink SQL> SET 'table.dynamic-table-options.enabled' = 'true';
+[INFO] Session property has been set.
 
+Flink SQL> CREATE TABLE cdc_mysql_source (
+    >   id int
+    >   ,name VARCHAR
+    >   ,dr TINYINT
+    >   ,PRIMARY KEY (id) NOT ENFORCED
+    > ) WITH (
+    >  'connector' = 'mysql-cdc-inlong',
+    >  'hostname' = 'localhost',
+    >  'port' = '3306',
+    >  'username' = 'root',
+    >  'password' = '123456',
+    >  'database-name' = 'test',
+    >  'table-name' = 'cdc_mysql_source'
+    > );
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE cdc_doris_sink (
+    > id INT,
+    > name STRING,
+    > dr TINYINT
+    > ) WITH (
+    >  'connector' = 'doris-inlong',
+    >  'fenodes' = 'localhost:8030',
+    >  'username' = 'root',
+    >  'password' = '000000',
+    >  'sink.enable-delete' = 'true',
+    >  'sink.multiple.enable' = 'true',
+    >  'sink.multiple.format' = 'canal-json',
+    >  'sink.multiple.database-pattern' = '${database}',
+    >  'sink.multiple.table-pattern' = 'doris_${table}'
+    > );
+[INFO] Execute statement succeed.
+
+-- Support delete event synchronization (sink.enable-delete='true'), requires Doris table to enable batch delete function
+Flink SQL> insert into cdc_doris_sink select * from cdc_mysql_source /*+ OPTIONS('server-id'='5402') */;
+[INFO] Submitting SQL update statement to the cluster...
+[INFO] SQL update statement has been successfully submitted to the cluster:
+Job ID: 30feaa0ede92h6b6e25ea0cfda26df5e
 ```
 
 ### Usage for InLong Dashboard
@@ -170,6 +298,11 @@ TODO: It will be supported in the future.
 | sink.batch.interval               | optional     | 10s               | string  | The flush interval, after which the asynchronous thread will write the data in the cache to BE. The default value is 10 second, and the time units are ms, s, min, h, and d. Set to 0 to turn off periodic writing.                                                                                                                                                                                                     |
 | sink.properties.*                 | optional     | (none)            | string  | The stream load parameters.<br /> <br /> eg:<br /> sink.properties.column_separator' = ','<br /> <br />  Setting 'sink.properties.escape_delimiters' = 'true' if you want to use a control char as a separator, so that such as '\\x01' will translate to binary 0x01<br /><br />  Support JSON format import, you need to enable both 'sink.properties.format' ='json' and 'sink.properties.strip_outer_array' ='true' |
 | sink.enable-delete                | optional     | true              | boolean | Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Uniq model.                                                                                                                                                                                                                                                       |
+| sink.multiple.enable              | optional   | false             | boolean  | Determine whether to support multiple sink writing, default is `false`. when `sink.multiple.enable` is `true`, need `sink.multiple.format`、`sink.multiple.database-pattern`、`sink.multiple.table-pattern` be correctly set.        |
+| sink.multiple.format              | optional   | (none)            | string   | The format of multiple sink, it represents the real format of the raw binary data. can be `canal-json` or `debezium-json` at present. See [kafka -- Dynamic Topic Extraction](https://github.com/apache/inlong-website/blob/master/docs/data_node/load_node/kafka.md) for more details.  |
+| sink.multiple.database-pattern    | optional   | (none)            | string   | Extract database name from the raw binary data, this is only used in the multiple sink writing scenario.                 | 
+| sink.multiple.table-pattern       | optional   | (none)            | string   | Extract table name from the raw binary data, this is only used in the multiple sink writing scenario.                           |
+| sink.multiple.ignore-single-table-errors | optional | true         | boolean  | Whether ignore the single table erros when multiple sink writing scenario. When it is `true`,sink continue when one table occur exception, only stop the exception table sink. When it is `false`, stop the whole sink when one table occur exception.     |
 
 ## Data Type Mapping
 
diff --git a/docs/data_node/load_node/kafka.md b/docs/data_node/load_node/kafka.md
index b683df182e..bb42c88307 100644
--- a/docs/data_node/load_node/kafka.md
+++ b/docs/data_node/load_node/kafka.md
@@ -166,9 +166,9 @@ The upstream data is:
   "type": "UPDATE"
 } 
 ```
-'topic-pattern' is '{database}_${table}', and the extracted topic is 'inventory_products' ('database', 'table' are metadata fields, and 'id' are physical fields)
+'topic-pattern' is '{database}_${table}', and the extracted topic is 'inventory_products' ('database', 'table' are metadata fields)
 
-'topic-pattern' is '{database}_${table}_${id}', and the extracted topic is 'inventory_products_4' ('database', 'table' are metadata fields, and 'id' are physical fields)
+'topic-pattern' is '{database}_${table}_${id}', and the extracted topic is 'inventory_products_111' ('database', 'table' are metadata fields, and 'id' are physical fields)
 
 - 'sink.multiple.format' is 'debezium-json':
 
@@ -196,7 +196,7 @@ The upstream data is:
   "transaction": null
 }
 ```
-'topic-pattern' is '{source.db}_${source.table}', and the extracted topic is 'inventory_products' ('source.db', 'source.table' are metadata fields, and 'id' are physical fields)
+'topic-pattern' is '{source.db}_${source.table}', and the extracted topic is 'inventory_products' ('source.db', 'source.table' are metadata fields)
 
 'topic-pattern' is '{source.db}_${source.table}_${id}', and the extracted topic is 'inventory_products_4' ('source.db', 'source.table' are metadata fields, and 'id' are physical fields)
 
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/doris.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/doris.md
index 8298516f6f..ceadea0e21 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/doris.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/doris.md
@@ -7,7 +7,9 @@ import {siteVariables} from '../../version';
 
 ## 概览
 
-`Doris Load` 节点支持将数据写入 Doris 数据库。 本文档介绍如何设置 Doris Load 节点以对 Doris 数据库运行 SQL 查询。
+`Doris Load` 节点支持将数据写入 Doris 数据库。 
+支持单表写入和多表写入两种模式:单表写入为指定固定库名表名写入;多表写入支持根据源端数据格式自定义库名表名写入,适用于源端多表写入或者整库同步等场景。
+本文档介绍如何设置 Doris Load 节点实现写入 Doris 数据库表。
 
 ## 支持的版本
 
@@ -31,11 +33,11 @@ import {siteVariables} from '../../version';
 </code></pre>
 
 ## 准备
-### 创建 MySql Extract 表
-在 MySql 数据库中创建表 `cdc_mysql_source`, 命令如下:
+### 创建 MySQL Extract 表
+- 单表写入:在 MySQL `cdc` 数据库中创建表 `cdc_mysql_source`。 命令如下:
 ```sql
 [root@fe001 ~]# mysql -u root -h localhost -P 3306 -p123456
-mysql> use test;
+mysql> use cdc;
 Database changed
 mysql> CREATE TABLE `cdc_mysql_source` (
        `id` int(11) NOT NULL AUTO_INCREMENT,
@@ -57,18 +59,65 @@ mysql> select * from cdc_mysql_source;
 |  2 | lisi     |  0 |
 |  3 | wangwu   |  0 |
 +----+----------+----+
-3 rows in set (0.07 sec)     
+3 rows in set (0.07 sec)
+```
+- 多表写入:在 MySQL `user_db` 数据库中创建表 `user_id_name`、`user_id_score`。 命令如下:
+```sql
+[root@fe001 ~]# mysql -u root -h localhost -P 3306 -p123456
+mysql> use user_db;
+Database changed
+mysql> CREATE TABLE `user_id_name` (
+       `id` int(11) NOT NULL AUTO_INCREMENT,
+       `name` varchar(64) DEFAULT NULL
+       PRIMARY KEY (`id`)
+       );
+Query OK, 0 rows affected (0.02 sec)
+
+mysql> CREATE TABLE `user_id_score` (
+       `id` int(11) NOT NULL AUTO_INCREMENT,
+       `score` double default 0,
+       PRIMARY KEY (`id`)
+       );
+Query OK, 0 rows affected (0.02 sec)
+
+mysql> insert into user_id_name values(1001, 'lily'),(1002, 'tom'),(1003, 'alan');
+Query OK, 3 rows affected (0.01 sec)
+Records: 3  Duplicates: 0  Warnings: 0 
+
+mysql> insert into user_id_score values(1001, 99),(1002, 96),(1003, 98);
+Query OK, 3 rows affected (0.01 sec)
+Records: 3  Duplicates: 0  Warnings: 0 
+
+mysql> select * from user_id_name;
++------+--------+
+|  id  | name   |
++------+--------+
+| 1001 | lily   |
+| 1002 | tom    |
+| 1003 | alan   |
++----+----------+
+3 rows in set (0.07 sec)    
+
+mysql> select * from user_id_score;
++------+------+
+|  id  | name |
++------+------+
+| 1001 | 99   |
+| 1002 | 96   |
+| 1003 | 98   |
++----+--------+
+3 rows in set (0.07 sec)  
 ```
 
 ### 创建 Doris Load 表
-在 Doris 数据库中创建表 `cdc_doris_sink`, 命令如下:
+- 单表写入:在 Doris `cdc`数据库中创建表`cdc_doris_sink`。命令如下:
 ```sql
 [root@fe001 ~]# mysql -u root -h localhost -P 9030 -p000000
-mysql> use test;
+mysql> use cdc;
 Reading table information for completion of table and column names
 You can turn off this feature to get a quicker startup with -A
-
 Database changed
+
 mysql> CREATE TABLE `cdc_doris_sink` (
        `id` int(11) NOT NULL COMMENT "用户id",
        `name` varchar(50) NOT NULL COMMENT "昵称",
@@ -82,11 +131,43 @@ mysql> CREATE TABLE `cdc_doris_sink` (
        );
 Query OK, 0 rows affected (0.06 sec)
 ```
+- 多表写入:在 Doris `user_db`数据库中创建表`doris_user_id_name`、`doris_user_id_score`。命令如下:
+```sql
+[root@fe001 ~]# mysql -u root -h localhost -P 9030 -p000000
+mysql> use user_db;
+Reading table information for completion of table and column names
+You can turn off this feature to get a quicker startup with -A
+Database changed
+
+mysql> CREATE TABLE `doris_user_id_name` (
+       `id` int(11) NOT NULL COMMENT "用户id",
+       `name` varchar(50) NOT NULL COMMENT "昵称"
+       ) ENGINE=OLAP
+       UNIQUE KEY(`id`)
+       COMMENT "OLAP"
+       DISTRIBUTED BY HASH(`id`) BUCKETS 1
+       PROPERTIES (
+       "replication_allocation" = "tag.location.default: 1"
+       );
+Query OK, 0 rows affected (0.06 sec)
+
+mysql> CREATE TABLE `doris_user_id_score` (
+       `id` int(11) NOT NULL COMMENT "用户id",
+       `score` double default 0
+       ) ENGINE=OLAP
+       UNIQUE KEY(`id`)
+       COMMENT "OLAP"
+       DISTRIBUTED BY HASH(`id`) BUCKETS 1
+       PROPERTIES (
+       "replication_allocation" = "tag.location.default: 1"
+       );
+Query OK, 0 rows affected (0.06 sec)
+```
 
 ## 如何创建 Doris Load 节点
 
 ### SQL API 用法
-
+- 单表写入: Doris 单表写入
 ```sql
 [root@tasknode001 flink-1.13.5]# ./bin/sql-client.sh -l ./opt/connectors/mysql-cdc-inlong/ -l ./opt/connectors/doris/
 Flink SQL> SET 'execution.checkpointing.interval' = '3s';
@@ -106,7 +187,7 @@ Flink SQL> CREATE TABLE cdc_mysql_source (
     >  'port' = '3306',
     >  'username' = 'root',
     >  'password' = '123456',
-    >  'database-name' = 'test',
+    >  'database-name' = 'cdc',
     >  'table-name' = 'cdc_mysql_source'
     > );
 [INFO] Execute statement succeed.
@@ -116,14 +197,13 @@ Flink SQL> CREATE TABLE cdc_doris_sink (
     > name STRING,
     > dr TINYINT
     > ) WITH (
-    >  'connector' = 'doris',
+    >  'connector' = 'doris-inlong',
     >  'fenodes' = 'localhost:8030',
-    >  'table.identifier' = 'test.cdc_doris_sink',
+    >  'table.identifier' = 'cdc.cdc_doris_sink',
     >  'username' = 'root',
     >  'password' = '000000',
     >  'sink.properties.format' = 'json',
     >  'sink.properties.strip_outer_array' = 'true',
-    >  'sink.enable-delete' = 'true'
     > );
 [INFO] Execute statement succeed.
 
@@ -132,7 +212,55 @@ Flink SQL> insert into cdc_doris_sink select * from cdc_mysql_source /*+ OPTIONS
 [INFO] Submitting SQL update statement to the cluster...
 [INFO] SQL update statement has been successfully submitted to the cluster:
 Job ID: 5f89691571d7b3f3ca446589e3d0c3d3
+```
+
+- 多表写入: Doris 多表写入
+```sql
+./bin/sql-client.sh -l ./opt/connectors/mysql-cdc-inlong/ -l ./opt/connectors/doris/
+Flink SQL> SET 'execution.checkpointing.interval' = '3s';
+[INFO] Session property has been set.
+
+Flink SQL> SET 'table.dynamic-table-options.enabled' = 'true';
+[INFO] Session property has been set.
 
+Flink SQL> CREATE TABLE cdc_mysql_source (
+    >   id int
+    >   ,name VARCHAR
+    >   ,dr TINYINT
+    >   ,PRIMARY KEY (id) NOT ENFORCED
+    > ) WITH (
+    >  'connector' = 'mysql-cdc-inlong',
+    >  'hostname' = 'localhost',
+    >  'port' = '3306',
+    >  'username' = 'root',
+    >  'password' = '123456',
+    >  'database-name' = 'test',
+    >  'table-name' = 'cdc_mysql_source'
+    > );
+[INFO] Execute statement succeed.
+
+Flink SQL> CREATE TABLE cdc_doris_sink (
+    > id INT,
+    > name STRING,
+    > dr TINYINT
+    > ) WITH (
+    >  'connector' = 'doris-inlong',
+    >  'fenodes' = 'localhost:8030',
+    >  'username' = 'root',
+    >  'password' = '000000',
+    >  'sink.enable-delete' = 'true',
+    >  'sink.multiple.enable' = 'true',
+    >  'sink.multiple.format' = 'canal-json',
+    >  'sink.multiple.database-pattern' = '${database}',
+    >  'sink.multiple.table-pattern' = 'doris_${table}'
+    > );
+[INFO] Execute statement succeed.
+
+-- 支持删除事件同步(sink.enable-delete='true'), 需要 Doris 表开启批量删除功能
+Flink SQL> insert into cdc_doris_sink select * from cdc_mysql_source /*+ OPTIONS('server-id'='5402') */;
+[INFO] Submitting SQL update statement to the cluster...
+[INFO] SQL update statement has been successfully submitted to the cluster:
+Job ID: 30feaa0ede92h6b6e25ea0cfda26df5e
 ```
 
 ### InLong Dashboard 用法
@@ -168,6 +296,13 @@ TODO: 将在未来支持此功能。
 | sink.batch.interval               | 可选   | 10s               | string   | Flush 间隔时间,超过该时间后异步线程将缓存中数据写入 BE。 默认值为10秒,支持时间单位 ms、s、min、h和d。设置为0表示关闭定期写入。                                                                                                                                                                                                                            |
 | sink.properties.*                 | 可选   | (none)            | string   | Stream load 的导入参数<br /><br />例如:<br />'sink.properties.column_separator' = ', '<br />定义列分隔符<br /><br />'sink.properties.escape_delimiters' = 'true'<br />特殊字符作为分隔符,'\\x01'会被转换为二进制的0x01<br /><br /> 'sink.properties.format' = 'json'<br />'sink.properties.strip_outer_array' = 'true' <br />JSON格式导入 |
 | sink.enable-delete                | 可选   | true              | boolean  | 是否启用删除。此选项需要 Doris 表开启批量删除功能(0.15+版本默认开启),只支持 Uniq 模型。                                                                                                                                                                                                                                                 |
+| sink.enable-delete                | 可选   | true              | boolean  | 是否启用删除。此选项需要 Doris 表开启批量删除功能(0.15+版本默认开启),只支持 Uniq 模型。                                                                                                                                                                                                                                                 |
+| sink.multiple.enable              | 可选   | false             | boolean  | 是否支持 Doris 多表写入。 `sink.multiple.enable` 为 `true` 时,需要 `sink.multiple.format` 、 `sink.multiple.database-pattern` 、 `sink.multiple.table-pattern` 分别设置正确的值。        |
+| sink.multiple.format              | 可选   | (none)            | string   | 多表写入时,表示源端二进制数据的真实格式,支持 `canal-json` 和 `debezium-json` 两种格式,请参阅 [kafka -- 动态 Topic 提取](https://github.com/apache/inlong-website/blob/master/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md) 获取更多细节。|
+| sink.multiple.database-pattern    | 可选   | (none)            | string   | 多表写入时,从源端二进制数据中按照 `sink.multiple.database-pattern` 指定名称提取写入的数据库名称。 `sink.multiple.enable` 为true时有效。                 | 
+| sink.multiple.table-pattern       | 可选   | (none)            | string   | 多表写入时,从源端二进制数据中按照 `sink.multiple.table-pattern` 指定名称提取写入的表名。 `sink.multiple.enable` 为true时有效。                         |
+| sink.multiple.ignore-single-table-errors | 可选 | true         | boolean  | 多表写入时,是否忽略某个表写入失败。为 `true` 时,如果某个表写入异常,则不写入该表数据,其他表的数据正常写入。为 `false` 时,如果某个表写入异常,则所有表均停止写入。     |
+
 ## 数据类型映射
 
 | Doris Type  | Flink Type           |
diff --git a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md
index a0c66fc45a..05568fc501 100644
--- a/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md
+++ b/i18n/zh-CN/docusaurus-plugin-content-docs/current/data_node/load_node/kafka.md
@@ -164,9 +164,9 @@ TODO: 将在未来支持此功能。
   "type": "UPDATE"
 } 
 ```
-'topic-pattern' 为 '{database}_${table}', 提取后的 Topic 为 'inventory_products' ('database', 'table' 为元数据字段, 'id' 为物理字段)
+'topic-pattern' 为 '{database}_${table}', 提取后的 Topic 为 'inventory_products' ('database', 'table' 为元数据字段)
 
-'topic-pattern' 为 '{database}_${table}_${id}', 提取后的 Topic 为 'inventory_products_4' ('database', 'table' 为元数据字段, 'id' 为物理字段)
+'topic-pattern' 为 '{database}_${table}_${id}', 提取后的 Topic 为 'inventory_products_111' ('database', 'table' 为元数据字段, 'id' 为物理字段)
 
 - 'sink.multiple.format' 为 'debezium-json':
 
@@ -194,7 +194,7 @@ TODO: 将在未来支持此功能。
   "transaction": null
 }
 ```
-'topic-pattern' 为 '{database}_${table}', 提取后的 Topic 为 'inventory_products' ('source.db', 'source.table' 为元数据字段, 'id' 为物理字段)
+'topic-pattern' 为 '{database}_${table}', 提取后的 Topic 为 'inventory_products' ('source.db', 'source.table' 为元数据字段)
 
 'topic-pattern' 为 '{database}_${table}_${id}', 提取后的 Topic 为 'inventory_products_4' ('source.db', 'source.table' 为元数据字段, 'id' 为物理字段)