You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/08/05 03:04:51 UTC

[GitHub] [incubator-doris] yangzhg edited a comment on issue #4051: [Proposal] Doris supports batch delete On UNIQUE_KEYS table

yangzhg edited a comment on issue #4051:
URL: https://github.com/apache/incubator-doris/issues/4051#issuecomment-655317768


   # <center>Doris 支持批量删除设计文档</center>
   
   ## 背景
   
   ​		目前Doris 支持broker load, routine load, stream load 等多种导入方式,对于数据的删除目前只能通过delete 语句进行删除,使用delete 语句的方式删除时,每执行一次delete 都会生成一个新的数据版本,如果频繁删除会严重影响查询性能,并且在使用delete 方式删除时,是通过生成一个空的rowset来记录删除条件实现,每次读取都要对删除跳条件进行过滤,同样在条件较多时会对性能造成影响。对比其他的系统,greenplum 的实现方式更像是传统数据库产品,snowflake 通过merge 语法实现。
   
   ​		对于类似于cdc 数据的导入的场景,数据数据中insert 和delete 一般是穿插出现的,面对这种场景我们目前的导入方式也无法满足,即使我们能够分离出insert 和delete 虽然可以解决导入的问题,但是仍然解决不了删除的问题。
   
   ## 设计目标
   
   ### 功能层面:
   
   增强导入功能,使其能够支持如下场景:
   
   * ~~单纯的批量导入,目前已经支持~~
   * 批量点删除
   * 导入和删除混合的数据导入
   * 只支持unique_keys 表
   
   ### 易用性方面:
   
   尽量减少对导入语法的修改,并且能够兼容目前的导入语法 
   
   ### 性能方面
   
   导入和读取的性能要和目前的导入方式基本持平,不能有太大的性能损失
   
   ## 详细设计
   
   导入语法方面为在导入是增加一列标示当前行是导入还是删除,如果没有默认所有行为插入行,实现层面本次升级的功能只在segmentV2上实现,v1 暂不考虑,在segment 文件的`IndexRegion` 中增加一个类似于null bitmap 的bitmap 索引用于标示需要删除的行。
   
   ### 数据结构设计
   
   需要在segment 结构中增加一个bitmap 索引(delete_index_page)来标示哪一行被标记为删除,PagePointerPB 结构和之前定义的相同,使用bitmap 作为索引。
   
   ```
   message SegmentFooterPB {
       optional uint32 version = 1 [default = 1]; // file version
       repeated ColumnMetaPB columns = 2; // tablet schema
       optional uint32 num_rows = 3; // number of values
       optional uint64 index_footprint = 4; // total idnex footprint of all columns
       optional uint64 data_footprint = 5; // total data footprint of all columns
       optional uint64 raw_data_footprint = 6; // raw data footprint
   
       optional CompressionTypePB compress_type = 7 [default = LZ4F]; // default compression type for file columns
       repeated MetadataPairPB file_meta_datas = 8; // meta data of file
   
       // Short key index's page
       optional PagePointerPB short_key_index_page = 9;、
       // Use bitmap index to indicate which row is marked for deleting
       optional PagePointerPB delete_index_page = 10;
   }
   
   ```
   
   ### 导入语法
   
   导入的语法设计方面主要是增加一个指定删除标记列的字段的column 映射,并且需要在导入数据中增加这一列,各个导入方式设置的方法如下
   
   #### stream load
   
   stream load 的写法在在header 中的 columns  字段增加一个设置删除标记列的字段, 示例
   ` -H "columns: k1, k2, label_c3" -H "merge_type: [MERGE|APPEND|DELETE]" -H "delete: label_c3=1"`
   
   #### broker load
   
   在`PROPERTIES ` 处设置删除标记列的字段
   
   ```
   LOAD LABEL db1.label1
   (
       [MERGE|APPEND|DELETE] DATA INFILE("hdfs://abc.com:8888/user/palo/test/ml/file1")
       INTO TABLE tbl1
       COLUMNS TERMINATED BY ","
       (tmp_c1,tmp_c2, label_c3)
       SET
       (
           id=tmp_c2,
           name=tmp_c1,
       )
       [DELETE ON label=true]
   
   )
   WITH BROKER 'broker'
   (
       "username"="user",
       "password"="pass"
   )
   PROPERTIES
   (
       "timeout" = "3600"
       
   );
   
   ```
   
   #### reoutine load
   
   routine load 在`columns` 字段增加映射 映射方式同上,示例如下
   
   ```
      CREATE ROUTINE LOAD example_db.test1 ON example_tbl 
       [WITH MERGE|APPEND|DELETE]
       COLUMNS(k1, k2, k3, v1, v2, label),
       WHERE k1 > 100 and k2 like "%doris%"
       [DELETE ON label=true]
       PROPERTIES
       (
           "desired_concurrent_number"="3",
           "max_batch_interval" = "20",
           "max_batch_rows" = "300000",
           "max_batch_size" = "209715200",
           "strict_mode" = "false"
       )
       FROM KAFKA
       (
           "kafka_broker_list" = "broker1:9092,broker2:9092,broker3:9092",
           "kafka_topic" = "my_topic",
           "kafka_partitions" = "0,1,2,3",
           "kafka_offsets" = "101,0,0,200"
       );
   ```
   
   ### 导入
   
   数据导入时流程如下:
   
   当导入数据包含 删除标记 并且删除标记是true 时写入数据的同时记录次行在segment 中的行号,并记录在delete index 中,否则直接写数据,这里可以有一个优化就是当标记为删除时这一行的value 列可以都设置成相应类型的最空间占用最小的值, 比如varchar 类型我们可以把值都设置成空字符串以节省空间。
   
   ```flow
   st=>start: Start Load
   flag_cond=>condition: delete flag is true
   write_rowdata=>operation: write data 
   write_rowdata_opt=>operation: write Data with minimum values
   write_delete_index=>operation: write delete index
   e=>end
   
   st->flag_cond
   flag_cond(yes)->write_rowdata_opt->write_delete_index->e
   flag_cond(no)->write_rowdata->e
   ```
   ![image](https://user-images.githubusercontent.com/9098473/86886470-8fe21600-c129-11ea-93f5-f2ce692572c1.png)
   
   假设有表
   
   ```
   +-------+-------------+------+-------+---------+---------+
   | Field | Type        | Null | Key   | Default | Extra   |
   +-------+-------------+------+-------+---------+---------+
   | k1    | INT         | Yes  | true  | 0       |         |
   | k2    | SMALLINT    | Yes  | true  | NULL    |         |
   | k3    | VARCHAR(32) | Yes  | true  |         |         |
   | v1    | BIGINT      | Yes  | false | 0       | REPLACE |
   +-------+-------------+------+-------+---------+---------+
   ```
   
   导入数据为
   
   ```
   0,1,foo,3,false
   0,1,foo,2,true
   1,2,bar,3,true
   0,1,foo,5,false
   ```
   如果为UNIQUE_KEYS ,只记录最新的结果即可,因此第一行和第二行都没有意义,在导入聚合时会被忽略。则记录到rowset 中数据区的数据为
   
   ```
   1,2,bar,3
   0,1,foo,5
   ```
   
   delete_index 为bitmap 中记录[1,0], 标示这批导入数据中第1行被标记为删除。
   
   ### 读取
   
   #### UNIQUE_KEYS表
   
   目前UNIQUE_KEYS 表读取时已经是从高版本向地板版本合并数据,由于unique_keys能保证每个segement中只有一条记录,只需读取第一个版本即可,如果是标记删除的则全部跳过,否则直接返回第一行,剩余跳过。
   
   ### Compaction
   
   compaction时 cumulative compaction和 base compaction 的功能方式有所不同cumulative compaction需要保存delete index,base compaction结束后可以删除delete index。
   
   #### Cumulative Compaction
   
   cumulative compaction 是从低版本向高版本合并,类似于读取的操作,但是需要遇到删除的行需要读取出来合并并且还需要添加到新的delete index 中。由于delete index 是一个bitmap索引,存在于每个segment 文件中,
   
   #### Base Compaction
   
   base compaction 也是和读取类似的操作,但是因为是全量读取因此不需要记录delete index,直接把数据读取出来过滤掉要删除的行按之前compaction 逻辑处理即可。
   
   # 详细流程
   
   ## FE端
   
   在导入是通过fe 在导入的数据生成tuple 中增加了一列`__DORIS_BATCH_DELETE_COL__`,这一列的表达式就是删除条件,这一列的值就是delete 条件的结果,这样这列就会在broker scannode的中 自动计算出来并返回
   
   ## BE 端
   
   1. 在be 的 tablet writer 中通过fe发过来的参数判断是否存在delete列来初始化 memtable
   
   2. 在tuple 被插入memtable 时将删除属性带入,在memtable merge 时对这个属性处理类似于 replace聚合, 这里修改 `ContiguousRow`增加一个属性`_is_delete`
   3. 在memtable 将内容刷入segment 文件时 将delete 信息写入segment footer 中的一个类似索引结构中,这里只记录需要删除的行的行号
   4. 在读取是将这个索引信息由segemt iterator 带入读出 传入 `RowBlock` 最终写入`RowCursor`中(`RowCursor` 也需要增加相应的delete 属性),之后就是在reader.cpp处理删除逻辑
   
   # 折衷考虑
   
   在上面的实现在FE端给broker 读出的数据生成了一个隐藏列,而在对应实际数据存储时没有在最终数据上也是用隐藏列 主要考虑如下:
   
   1. 如果在doris 里增加隐藏列比如需要更改表的结构,这样就会出现需要新建表才能使用,旧表需要schem change 后在能用,这样对用户不是很友好,对于用户数据的修改处于谨慎考虑不宜自动做schema change;
   2. 增加隐藏列后对于查询、删除设计列名的操作都需要过滤隐藏列的情况,在be 层除了存储层在segment 层面不感知这一列的存在其他层面都需要感知,改动处仍然比较多。
   3. 如果我们不在fe 建表时增加隐藏列 仅在be 增加,则会在构建 memtable 时需要修改biao 的tablet_schema 中的列信息,这里会涉及许多tupleid 和slot ID, 这些id 均由FE 生成,在be 修改增加因为没有全局信息有较大的风险可能会重复或者冲突,而且这些结构的代码大多没有提供修改和增加列的接口,修改成本也比较大。
   
   两种实现方案的却别就在于,隐藏列的方式除了segment 层不感知以外 FE, 计算, 除了segment以外的存储层其他部分都需要感知,而通过删除标记 只有存储层感知,其他层均不感知这种变化。从开发时间和复杂度角度来看目前采用删除标记的方案比较好,而且就只功能而言使用隐藏列和删除标记并没有性能上的区别。
   
   但是就更广泛的层面考虑doris 也应该支持隐藏列功能,个人觉得这一功能可以单独作为一个功能点单独开发(如果咋doris 已经支持隐藏列的情况下,批量删除功能则应使用隐藏列实现)这样也有利于以后的功能扩展(比如事物的支持)。
   
   **综上,本期批量删除功能采取增加删除标记的方式实现,如果日后实现的隐藏列,为了代码的统一,迁移的成本应该也不高。**
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org