You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zy...@apache.org on 2023/06/08 05:42:35 UTC
[doris] branch master updated: [doc](flink) add flink delete column from kafka specified columns (#20545)
This is an automated email from the ASF dual-hosted git repository.
zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ee4c041444 [doc](flink) add flink delete column from kafka specified columns (#20545)
ee4c041444 is described below
commit ee4c041444ca0709987ec88f76082c55f7279cf6
Author: wudi <67...@qq.com>
AuthorDate: Thu Jun 8 13:42:27 2023 +0800
[doc](flink) add flink delete column from kafka specified columns (#20545)
---
docs/en/docs/ecosystem/flink-doris-connector.md | 39 ++++++++++++++++++++++
docs/zh-CN/docs/ecosystem/flink-doris-connector.md | 39 ++++++++++++++++++++++
2 files changed, 78 insertions(+)
diff --git a/docs/en/docs/ecosystem/flink-doris-connector.md b/docs/en/docs/ecosystem/flink-doris-connector.md
index 48c67b713d..e3671ceac9 100644
--- a/docs/en/docs/ecosystem/flink-doris-connector.md
+++ b/docs/en/docs/ecosystem/flink-doris-connector.md
@@ -388,6 +388,45 @@ For the update of the primary key column, FlinkCDC will send DELETE and INSERT e
### Example
The Flink program can refer to the CDC synchronization example above. After the task is successfully submitted, execute the Update primary key column statement (`update student set id = '1002' where id = '1001'`) on the MySQL side to modify the data in Doris .
+## Use Flink to delete data based on specified columns
+
+Generally, messages in Kafka use specific fields to mark the operation type, such as {"op_type":"delete",data:{...}}. For this type of data, it is hoped that the data with op_type=delete will be deleted.
+
+By default, DorisSink will distinguish the type of event based on RowKind. Usually, in the case of cdc, the event type can be obtained directly, and the hidden column `__DORIS_DELETE_SIGN__` is assigned to achieve the purpose of deletion, while Kafka needs to be based on business logic. Judgment, display the value passed in to the hidden column.
+
+### Example
+
+```sql
+-- Such as upstream data: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
+CREATE TABLE KAFKA_SOURCE(
+ data STRING,
+ op_type STRING
+) WITH (
+ 'connector' = 'kafka',
+ ...
+);
+
+CREATE TABLE DORIS_SINK(
+ id INT,
+ name STRING,
+ __DORIS_DELETE_SIGN__ INT
+) WITH (
+ 'connector' = 'doris',
+ 'fenodes' = '127.0.0.1:8030',
+ 'table.identifier' = 'db.table',
+ 'username' = 'root',
+ 'password' = '',
+ 'sink.enable-delete' = 'false', -- false means not to get the event type from RowKind
+ 'sink.properties.columns' = 'name,age,__DORIS_DELETE_SIGN__' -- Display the import column of the specified streamload
+);
+
+INSERT INTO KAFKA_SOURCE
+SELECT json_value(data,'$.id') as id,
+json_value(data,'$.name') as name,
+if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__
+from KAFKA_SOURCE;
+```
+
## Java example
`samples/doris-demo/` An example of the Java version is provided below for reference, see [here](https://github.com/apache/doris/tree/master/samples/doris-demo/)
diff --git a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
index d0bf80d677..8fc3061979 100644
--- a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
+++ b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
@@ -383,6 +383,45 @@ Flink CDC底层的采集工具是Debezium,Debezium内部使用op字段来标
### 使用
Flink程序可参考上面CDC同步的示例,成功提交任务后,在MySQL侧执行Update主键列的语句(`update student set id = '1002' where id = '1001'`),即可修改Doris中的数据。
+## 使用Flink根据指定列删除数据
+
+一般Kafka中的消息会使用特定字段来标记操作类型,比如{"op_type":"delete",data:{...}}。针对这类数据,希望将op_type=delete的数据删除掉。
+
+DorisSink默认会根据RowKind来区分事件的类型,通常这种在cdc情况下可以直接获取到事件类型,对隐藏列`__DORIS_DELETE_SIGN__`进行赋值达到删除的目的,而Kafka则需要根据业务逻辑判断,显示的传入隐藏列的值。
+
+### 使用
+
+```sql
+-- 比如上游数据: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
+CREATE TABLE KAFKA_SOURCE(
+ data STRING,
+ op_type STRING
+) WITH (
+ 'connector' = 'kafka',
+ ...
+);
+
+CREATE TABLE DORIS_SINK(
+ id INT,
+ name STRING,
+ __DORIS_DELETE_SIGN__ INT
+) WITH (
+ 'connector' = 'doris',
+ 'fenodes' = '127.0.0.1:8030',
+ 'table.identifier' = 'db.table',
+ 'username' = 'root',
+ 'password' = '',
+ 'sink.enable-delete' = 'false', -- false表示不从RowKind获取事件类型
+ 'sink.properties.columns' = 'name,age,__DORIS_DELETE_SIGN__' -- 显示指定streamload的导入列
+);
+
+INSERT INTO KAFKA_SOURCE
+SELECT json_value(data,'$.id') as id,
+json_value(data,'$.name') as name,
+if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__
+from KAFKA_SOURCE;
+```
+
## Java示例
`samples/doris-demo/` 下提供了 Java 版本的示例,可供参考,查看点击[这里](https://github.com/apache/doris/tree/master/samples/doris-demo/)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org