You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by di...@apache.org on 2023/06/10 02:52:23 UTC

[doris] branch master updated: [doc](flink-connector) add flink cdc sync mysql database (#20486)

This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 206b5a4235 [doc](flink-connector) add flink cdc sync mysql database (#20486)
206b5a4235 is described below

commit 206b5a42356d4414edaa149bafd8c9643d5928b3
Author: wudi <67...@qq.com>
AuthorDate: Sat Jun 10 10:52:15 2023 +0800

    [doc](flink-connector) add flink cdc sync mysql database (#20486)
---
 docs/en/docs/ecosystem/flink-doris-connector.md    | 57 ++++++++++++++++++--
 docs/zh-CN/docs/ecosystem/flink-doris-connector.md | 63 ++++++++++++++++++++--
 2 files changed, 112 insertions(+), 8 deletions(-)

diff --git a/docs/en/docs/ecosystem/flink-doris-connector.md b/docs/en/docs/ecosystem/flink-doris-connector.md
index e3671ceac9..142cc7f662 100644
--- a/docs/en/docs/ecosystem/flink-doris-connector.md
+++ b/docs/en/docs/ecosystem/flink-doris-connector.md
@@ -45,9 +45,10 @@ Github: https://github.com/apache/doris-flink-connector
 | Connector Version | Flink Version | Doris Version | Java Version | Scala Version |
 | --------- | ----- | ------ | ---- | ----- |
 | 1.0.3     | 1.11+ | 0.15+  | 8    | 2.11,2.12 |
-| 1.1.0     | 1.14  | 1.0+   | 8    | 2.11,2.12 |
-| 1.2.0     | 1.15  | 1.0+   | 8    | -         |
+| 1.1.1    | 1.14  | 1.0+   | 8    | 2.11,2.12 |
+| 1.2.1    | 1.15  | 1.0+   | 8    | -         |
 | 1.3.0     | 1.16  | 1.0+   | 8    | -         |
+| 1.4.0     | 1.15,1.16,1.17  | 1.0+   | 8   |- |
 
 ## Build and Install
 
@@ -313,7 +314,7 @@ refer: [CDCSchemaChangeExample](https://github.com/apache/doris-flink-connector/
 | sink.properties.*     | --               | N              | The stream load parameters.<br /> <br /> eg:<br /> sink.properties.column_separator' = ','<br /> <br /> Setting 'sink.properties.escape_delimiters' = 'true' if you want to use a control char as a separator, so that such as '\\x01' will translate to binary 0x01<br /><br />Support JSON format import, you need to enable both 'sink.properties.format' ='json' and 'sink.properties.read_json_by_line' ='true' |
 | sink.enable-delete     | true               | N              | Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Uniq model.|
 | sink.enable-2pc                  | true              | N        | Whether to enable two-phase commit (2pc), the default is true, to ensure Exactly-Once semantics. For two-phase commit, please refer to [here](../data-operate/import/import-way/stream-load-manual.md). |
-| sink.max-retries                 | 1                  | N        | In the 2pc scenario, the number of retries after the commit phase fails.                                                                                                                                                                                                                                         |
+| sink.max-retries                 | 3                 | N        | In the 2pc scenario, the number of retries after the commit phase fails.                                                                                                                                                                                                                                         |
 | sink.buffer-size                 | 1048576(1MB)       | N        | Write data cache buffer size, in bytes. It is not recommended to modify, the default configuration is sufficient.                                                                                                                                                                                                                                 |
 | sink.buffer-count                | 3                  | N        | The number of write data cache buffers, it is not recommended to modify, the default configuration is sufficient.                                                                                                                               
 
@@ -378,6 +379,56 @@ WITH (
 insert into doris_sink select id,name from cdc_mysql_source;
 ```
 
+## Use Flink CDC to access multi-table or database
+### grammar
+```
+<FLINK_HOME>/bin/flink run \
+     -c org.apache.doris.flink.tools.cdc.CdcTools\
+     lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \
+     mysql-sync-database \
+     --database <doris-database-name> \
+     [--job-name <flink-job-name>] \
+     [--table-prefix <doris-table-prefix>] \
+     [--table-suffix <doris-table-suffix>] \
+     [--including-tables <mysql-table-name|name-regular-expr>] \
+     [--excluding-tables <mysql-table-name|name-regular-expr>] \
+     --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
+     --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
+     [--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]]
+```
+
+- **--job-name** Flink job name, not required.
+- **--database** Synchronize to the database name of Doris.
+- **--table-prefix** Doris table prefix name, for example --table-prefix ods_.
+- **--table-suffix** Same as above, the suffix name of the Doris table.
+- **--including-tables** MySQL tables that need to be synchronized, you can use "|" to separate multiple tables, and support regular expressions. For example --including-tables table1|tbl.* is to synchronize table1 and all tables beginning with tbl.
+- **--excluding-tables** Tables that do not need to be synchronized, the usage is the same as above.
+- **--mysql-conf** MySQL CDCSource configuration, for example --mysql-conf hostname=127.0.0.1 , you can find it in [here](https://ververica.github.io/flink-cdc-connectors/master /content/connectors/mysql-cdc.html) to view all configurations of MySQL-CDC, where hostname/username/password/database-name are required.
+- **--sink-conf** All configurations of Doris Sink, you can view the complete configuration items [here](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#%E9%80%9A%E7%94%A8%E9%85%8D%E7%BD%AE%E9%A1%B9).
+- **--table-conf** The configuration item of the Doris table, that is, the content contained in properties. For example --table-conf replication_num=1
+
+### Example
+```
+<FLINK_HOME>/bin/flink run \
+     -Dexecution.checkpointing.interval=10s\
+     -Dparallelism.default=1\
+     -c org.apache.doris.flink.tools.cdc.CdcTools\
+     lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \
+     mysql-sync-database\
+     --database test_db \
+     --mysql-conf hostname=127.0.0.1 \
+     --mysql-conf username=root \
+     --mysql-conf password=123456 \
+     --mysql-conf database-name=mysql_db \
+     --including-tables "tbl1|test.*" \
+     --sink-conf fenodes=127.0.0.1:8030 \
+     --sink-conf username=root \
+     --sink-conf password=123456 \
+     --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
+     --sink-conf sink.label-prefix=label \
+     --table-conf replication_num=1
+```
+
 ## Use FlinkCDC to update Key column
 Generally, in a business database, the number is used as the primary key of the table, such as the Student table, the number (id) is used as the primary key, but with the development of the business, the number corresponding to the data may change.
 In this scenario, using FlinkCDC + Doris Connector to synchronize data can automatically update the data in the Doris primary key column.
diff --git a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
index 8fc3061979..130c3d39cf 100644
--- a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
+++ b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
@@ -47,9 +47,10 @@ Flink Doris Connector 可以支持通过 Flink 操作(读取、插入、修改
 | Connector Version | Flink Version | Doris Version | Java Version | Scala Version |
 | --------- | ----- | ------ | ---- | ----- |
 | 1.0.3     | 1.11+ | 0.15+  | 8    | 2.11,2.12 |
-| 1.1.0     | 1.14  | 1.0+   | 8    | 2.11,2.12 |
-| 1.2.0     | 1.15  | 1.0+   | 8    | -         |
+| 1.1.1     | 1.14  | 1.0+   | 8    | 2.11,2.12 |
+| 1.2.1     | 1.15  | 1.0+   | 8    | -         |
 | 1.3.0     | 1.16  | 1.0+   | 8    | -         |
+| 1.4.0     | 1.15,1.16,1.17  | 1.0+   | 8   |- |
 
 ## 编译与安装
 
@@ -309,8 +310,9 @@ env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")//.
 | sink.properties.*                | --                 | N        | Stream Load 的导入参数。<br/>例如:  'sink.properties.column_separator' = ', ' 定义列分隔符,  'sink.properties.escape_delimiters' = 'true' 特殊字符作为分隔符,'\x01'会被转换为二进制的0x01  <br/><br/>JSON格式导入<br/>'sink.properties.format' = 'json' 'sink.properties.read_json_by_line' = 'true' |
 | sink.enable-delete               | TRUE               | N        | 是否启用删除。此选项需要 Doris 表开启批量删除功能(Doris0.15+版本默认开启),只支持 Unique 模型。 |
 | sink.enable-2pc                  | TRUE               | N        | 是否开启两阶段提交(2pc),默认为true,保证Exactly-Once语义。关于两阶段提交可参考[这里](../data-operate/import/import-way/stream-load-manual.md)。 |
-
-
+| sink.buffer-size                 | 1MB                | N        | 写数据缓存buffer大小,单位字节。不建议修改,默认配置即可     |
+| sink.buffer-count                | 3                  | N        | 写数据缓存buffer个数。不建议修改,默认配置即可               |
+| sink.max-retries                 | 3                  | N        | Commit失败后的最大重试次数,默认3次                          |
 
 ## Doris 和 Flink 列类型映射关系
 
@@ -334,7 +336,7 @@ env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")//.
 | TIME       | DOUBLE             |
 | HLL        | Unsupported datatype             |
 
-## 使用 Flink CDC 接入 Doris 示例(支持 Insert / Update / Delete 事件)
+## 使用FlinkSQL通过CDC接入Doris示例(支持Insert/Update/Delete事件)
 ```sql
 -- enable checkpoint
 SET 'execution.checkpointing.interval' = '10s';
@@ -373,6 +375,57 @@ WITH (
 insert into doris_sink select id,name from cdc_mysql_source;
 ```
 
+## 使用FlinkCDC接入多表或整库示例
+### 语法
+```
+<FLINK_HOME>/bin/flink run \
+    -c org.apache.doris.flink.tools.cdc.CdcTools \
+    lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \
+    mysql-sync-database \
+    --database <doris-database-name> \
+    [--job-name <flink-job-name>] \
+    [--table-prefix <doris-table-prefix>] \
+    [--table-suffix <doris-table-suffix>] \
+    [--including-tables <mysql-table-name|name-regular-expr>] \
+    [--excluding-tables <mysql-table-name|name-regular-expr>] \
+    --mysql-conf <mysql-cdc-source-conf> [--mysql-conf <mysql-cdc-source-conf> ...] \
+    --sink-conf <doris-sink-conf> [--table-conf <doris-sink-conf> ...] \
+    [--table-conf <doris-table-conf> [--table-conf <doris-table-conf> ...]]
+```
+
+- **--job-name** Flink任务名称, 非必需。
+- **--database** 同步到Doris的数据库名。
+- **--table-prefix**  Doris表前缀名,例如 --table-prefix ods_。
+- **--table-suffix** 同上,Doris表的后缀名。
+- **--including-tables** 需要同步的MySQL表,可以使用"|" 分隔多个表,并支持正则表达式。 比如--including-tables table1|tbl.*就是同步table1和所有以tbl开头的表。
+- **--excluding-tables** 不需要同步的表,用法同上。
+- **--mysql-conf** MySQL CDCSource 配置,例如--mysql-conf hostname=127.0.0.1 ,您可以在[这里](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html)查看所有配置MySQL-CDC,其中hostname/username/password/database-name 是必需的。
+- **--sink-conf** Doris Sink 的所有配置,可以在[这里](https://doris.apache.org/zh-CN/docs/dev/ecosystem/flink-doris-connector/#%E9%80%9A%E7%94%A8%E9%85%8D%E7%BD%AE%E9%A1%B9)查看完整的配置项。
+- **--table-conf** Doris表的配置项,即properties中包含的内容。 例如 --table-conf replication_num=1
+
+### 示例
+```
+<FLINK_HOME>/bin/flink run \
+    -Dexecution.checkpointing.interval=10s \
+    -Dparallelism.default=1 \
+    -c org.apache.doris.flink.tools.cdc.CdcTools \
+    lib/flink-doris-connector-1.16-1.4.0-SNAPSHOT.jar \
+    mysql-sync-database \
+    --database test_db \
+    --mysql-conf hostname=127.0.0.1 \
+    --mysql-conf username=root \
+    --mysql-conf password=123456 \
+    --mysql-conf database-name=mysql_db \
+    --including-tables "tbl1|test.*" \
+    --sink-conf fenodes=127.0.0.1:8030 \
+    --sink-conf username=root \
+    --sink-conf password=123456 \
+    --sink-conf jdbc-url=jdbc:mysql://127.0.0.1:9030 \
+    --sink-conf sink.label-prefix=label \
+    --table-conf replication_num=1 
+```
+
+
 ## 使用FlinkCDC更新Key列
 一般在业务数据库中,会使用编号来作为表的主键,比如Student表,会使用编号(id)来作为主键,但是随着业务的发展,数据对应的编号有可能是会发生变化的。
 在这种场景下,使用FlinkCDC + Doris Connector同步数据,便可以自动更新Doris主键列的数据。


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org