You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zy...@apache.org on 2023/06/08 05:42:35 UTC

[doris] branch master updated: [doc](flink) add flink delete column from kafka specified columns (#20545)

This is an automated email from the ASF dual-hosted git repository.

zykkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ee4c041444 [doc](flink) add flink delete column from kafka specified columns (#20545)
ee4c041444 is described below

commit ee4c041444ca0709987ec88f76082c55f7279cf6
Author: wudi <67...@qq.com>
AuthorDate: Thu Jun 8 13:42:27 2023 +0800

    [doc](flink) add flink delete column from kafka specified columns (#20545)
---
 docs/en/docs/ecosystem/flink-doris-connector.md    | 39 ++++++++++++++++++++++
 docs/zh-CN/docs/ecosystem/flink-doris-connector.md | 39 ++++++++++++++++++++++
 2 files changed, 78 insertions(+)

diff --git a/docs/en/docs/ecosystem/flink-doris-connector.md b/docs/en/docs/ecosystem/flink-doris-connector.md
index 48c67b713d..e3671ceac9 100644
--- a/docs/en/docs/ecosystem/flink-doris-connector.md
+++ b/docs/en/docs/ecosystem/flink-doris-connector.md
@@ -388,6 +388,45 @@ For the update of the primary key column, FlinkCDC will send DELETE and INSERT e
 ### Example
 The Flink program can refer to the CDC synchronization example above. After the task is successfully submitted, execute the Update primary key column statement (`update student set id = '1002' where id = '1001'`) on the MySQL side to modify the data in Doris .
 
+## Use Flink to delete data based on specified columns
+
+Generally, messages in Kafka use specific fields to mark the operation type, such as {"op_type":"delete",data:{...}}. For this type of data, it is hoped that the data with op_type=delete will be deleted.
+
+By default, DorisSink will distinguish the type of event based on RowKind. Usually, in the case of cdc, the event type can be obtained directly, and the hidden column `__DORIS_DELETE_SIGN__` is assigned to achieve the purpose of deletion, while Kafka needs to be based on business logic. Judgment, display the value passed in to the hidden column.
+
+### Example
+
+```sql
+-- Such as upstream data: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
+CREATE TABLE KAFKA_SOURCE(
+  data STRING,
+  op_type STRING
+) WITH (
+  'connector' = 'kafka',
+  ...
+);
+
+CREATE TABLE DORIS_SINK(
+  id INT,
+  name STRING,
+  __DORIS_DELETE_SIGN__ INT
+) WITH (
+  'connector' = 'doris',
+  'fenodes' = '127.0.0.1:8030',
+  'table.identifier' = 'db.table',
+  'username' = 'root',
+  'password' = '',
+  'sink.enable-delete' = 'false',        -- false means not to get the event type from RowKind
+  'sink.properties.columns' = 'name,age,__DORIS_DELETE_SIGN__'  -- Display the import column of the specified streamload
+);
+
+INSERT INTO KAFKA_SOURCE
+SELECT json_value(data,'$.id') as id,
+json_value(data,'$.name') as name, 
+if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
+from KAFKA_SOURCE;
+```
+
 ## Java example
 
 `samples/doris-demo/`  An example of the Java version is provided below for reference, see [here](https://github.com/apache/doris/tree/master/samples/doris-demo/)
diff --git a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
index d0bf80d677..8fc3061979 100644
--- a/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
+++ b/docs/zh-CN/docs/ecosystem/flink-doris-connector.md
@@ -383,6 +383,45 @@ Flink CDC底层的采集工具是Debezium,Debezium内部使用op字段来标
 ### 使用
 Flink程序可参考上面CDC同步的示例,成功提交任务后,在MySQL侧执行Update主键列的语句(`update  student set id = '1002' where id = '1001'`),即可修改Doris中的数据。
 
+## 使用Flink根据指定列删除数据
+
+一般Kafka中的消息会使用特定字段来标记操作类型,比如{"op_type":"delete",data:{...}}。针对这类数据,希望将op_type=delete的数据删除掉。
+
+DorisSink默认会根据RowKind来区分事件的类型,通常这种在cdc情况下可以直接获取到事件类型,对隐藏列`__DORIS_DELETE_SIGN__`进行赋值达到删除的目的,而Kafka则需要根据业务逻辑判断,显示的传入隐藏列的值。
+
+### 使用
+
+```sql
+-- 比如上游数据: {"op_type":"delete",data:{"id":1,"name":"zhangsan"}}
+CREATE TABLE KAFKA_SOURCE(
+  data STRING,
+  op_type STRING
+) WITH (
+  'connector' = 'kafka',
+  ...
+);
+
+CREATE TABLE DORIS_SINK(
+  id INT,
+  name STRING,
+  __DORIS_DELETE_SIGN__ INT
+) WITH (
+  'connector' = 'doris',
+  'fenodes' = '127.0.0.1:8030',
+  'table.identifier' = 'db.table',
+  'username' = 'root',
+  'password' = '',
+  'sink.enable-delete' = 'false',        -- false表示不从RowKind获取事件类型
+  'sink.properties.columns' = 'name,age,__DORIS_DELETE_SIGN__'  -- 显示指定streamload的导入列
+);
+
+INSERT INTO KAFKA_SOURCE
+SELECT json_value(data,'$.id') as id,
+json_value(data,'$.name') as name, 
+if(op_type='delete',1,0) as __DORIS_DELETE_SIGN__ 
+from KAFKA_SOURCE;
+```
+
 ## Java示例
 
 `samples/doris-demo/` 下提供了 Java 版本的示例,可供参考,查看点击[这里](https://github.com/apache/doris/tree/master/samples/doris-demo/)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org