You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/02/12 05:21:13 UTC

[GitHub] [incubator-doris] wangbo opened a new issue #2887: [Proposal] Support Spark Convert Doris Segment

wangbo opened a new issue #2887: [Proposal] Support Spark Convert Doris Segment
URL: https://github.com/apache/incubator-doris/issues/2887
 
 
   # 目标
   
   将预计算的结果(由上一步Spark预计算的作业完成)转换成Doris Be File的过程由Spark完成,预期降低Doris Be的压力,提升导入性能。
   
   ## 主要流程
   
   * 通过spark作业完成数据的分区与排序
   
   * 通过定义OutputFormat完成数据格式的转换
   
   * 通过JNI实现Java对于Doris Be代码的调用,降低开发和维护成本
   
   ## 模块划分
   
   * JNI接口封装
   
   * Spark作业逻辑
   
   * Doris添加新的Load逻辑
   
   # JNI接口封装
   
   ## 主要思路
   
   1. 实现一个Java的类,该类封装了构建Rowset所必需的接口,并定义为native方法。
   
   2. 通过javah将该类转换出一个头文件,该头文件包含所有native方法。
   
   3. 编写一个CPP文件,为该头文件中的方法定义一个C++的实现。
   
   4. 在CPP文件中定义一个文件粒度的BetaRowsetWriter对象,通过调用这个对象的方法来实现头文件中的所有方法。
   
   ## 功能点
   | 名称        | 目标   |  说明  |
   | :--------   | :-----  | :----  |
   | Doris Be接口抽象     | 1. BetaRowsetWriter可以作为一个类库使用<br> 2. 对JNI定义的接口进行实现   |       |
   | JNI接口       |   1. 确定JNI接口以及入参 <br> 2. 完成数据结构转换逻辑 <br> 2.1 将java中的tuple需要转成C++的tuple <br> 2.2 将java中的Doris元数据转成C++中的元数据   |      |
   | 打包与编译        |    主要是对构编译脚本进行改造,将BetaRowsetWriter相关lib放在编译后的Fe目录下,在提交spark作业时可以被上传    |    |
   
   # Spark作业逻辑
   ## 文件输入格式要求
   + K,V的形式,K应该是`indexId`和`维度值`,V应该是在上一步作业中预计算好的`value`
   
   + 输入文件中还需要包含一个特殊的base rollup,这个特殊的base rollup需要包含所有`不包含分区键以及部分包含分区键的rollup`的维度
   
   ##  关于<不包含分区键以及部分包含分区键的rollup>的处理逻辑
   ### 定义
   `这种rollup index`指的是schema中不包含或者包含部分分区字段(包括范围分区字段以及hash分区字段)
   ### 现有问题
   `这种rollup index`在导入数据时,即使schema本身不包含分区字段,但是在导入数据的分区过程中,tuple中依然要保留分区字段数据,待分区完成后再做上卷消除不需要的分区字段。
   ### Spark作业中的处理逻辑
   Spark Convert的过程中,需要保持和Doris同样的处理逻辑。
   因此对于对于`这种rollup index`,也是需要通过shuffle分区时保留分区字段的值,然后在reducer中再做一次上卷操作。
   由于是要在reducer中再次预计算,因此可以针对一次导入任务中的所有`这种rollup index`构建一个base rollup(这个构建操作应该由上一步预计算Spark作业完成),然后再基于这个特殊的base rollup做上卷操作,从而计算出`这种rollup index`。
   
   ## 主要流程
   1. 提交一个Spark作业
   	1. 同时需要上传的是本次作业所需的元数据
   
   2. 读取上一步预计算的结果转成RDD
   
   3. 通过自定义partitioner进行shuffle
   
   4. 对shuffle之后的数据按照K进行排序
   
   	1. 此时reducer的输入数据应该是所有rollupIndex的同一个分区的数据
   
   	2. 排序首先保证的每个rollupIndex的数据是连续的
   
   	3. 排序其次要保证的是每个rollupIndex内部有序
   
   5. 根据上传的元数据构建出partitionmap
   
   6. 计算出当前输入行所属的partitionId,tabletId,作为BetaRowsetWriter的输入参数
   
   	1. 根据tuple中分区键的值和partitionmap确定当前tuple属于哪个分区
   
   	2. 根据tuple中hash键的值确定当前tuple所属tablet的分桶号
   
   	3. 根据分桶号结合paritionmap就可以获得当前tuple所属的tabletId
   
   7. 如果发现当前处理的rollupIndex是属于`不包含分区键以及部分包含分区键的rollup`,那么还需要做额外的处理
   
   	1. 基于一个base rollup再次上卷,消除当前rollup中不需要的分区键信息
   
   	2. 对结果集进行排序
   8. 将结果通过OutputFormat输出到本地磁盘
   
   9. 将本地磁盘的数据上传到HDFS对应的路径
   
   ## 功能点
   | 名称  | 目标  | 说明 |
   | :------------ |:---------------| :-----|
   | HDFS输入/输出路径定义     | 按照约定的方式定义,避免不必要的信息传递 | 1. 预期路径本身可以包含额外的信息,比如jobid和indexid <br> 2. 输出路径中需要包含partitionId和tabletId的信息 |
   | 实现一个OutputFormat     | 能够将输入数据转成doris格式的文件并写入hdfs        |   1. 主要封装了BetaRowsetWriter的接口 <br> 2. 运行时需要获得以下元数据 <br> 2.1 tabletSchema,用于作为BetaRowsetWriter的入参 <br> 2.2 本地路径信息,用于将BetaRowsetWriter的产出临时保存到本地磁盘 |
   | Doris元数据支持本地访问 | 在Spark作业中访问本地的Doris元数据        |  1. 可以将指定的Doris元数据写入文件 <br> 2.可以通过接口加载这些文件本地访问Doris元数据  |
   |实现一个自定义的partitioner|保证同一个tablet的数据由一个reducer处理|1. partitioner中需要获取到以下元数据<br>1.1 partition信息,包括每个分区的范围<br>1.2 分区键<br>1.3 每个partition对应的tablet信息<br>2. 需要在partitioner中实现和Be相同的Hash算法|
   # Doris添加新的Load逻辑
   ## 主要流程
   1. 当Fe获知Spark Convert Job完成之后,通过broker进程获取到位于HDFS上所有convert结果集的文件列表
   
   	1. HDFS的路径可以是按照约定来,如/doris_home/bulk_load/jobid/spark_convert/tablet_id/
   
   2. Fe将tablet以及对应文件信息发送到Be
   
   	1. 这里需要保证的是,每个Be只处理本节点的导入任务
   
   3. Be向broker发送请求,将位于HDFS的数据拉取到指定路径下
   
   	1. 请求参数为位于当前be需要处理的tablet的文件
   
   4. Be task提交事务。
   
   5. 当Fe轮询发现所有Be task都执行完成,提交事务。
   
   ## 功能点
   |  名称 | 目标 | 说明 |
   | :------------ |:---------------| :-----|
   | Broker新增接口      | 可以直接从HDFS拉取文件然后将数据写入到Doris Be的tablet路径下 |  |
   | Doris新增导入接口      | 支持基于Doris Tablet File的导入        |   1. Fe需要添加新的LoadJob流程<br>1.1 需要能够拉取已构建好的tablet file,并根据这些file构建task并发送到Be执行<br>2. Be添加新的LoadJobTask<br>1.2 预期能够通过broker将构建好的tablet file直接拉取到Doris Be下 <br> (这部分主要依赖Fe Load Job模块的设计,这里只列出预期的功能) |

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wangbo closed issue #2887: [Proposal] Support Spark Convert Doris Segment

Posted by GitBox <gi...@apache.org>.
wangbo closed issue #2887:
URL: https://github.com/apache/incubator-doris/issues/2887


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org