You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zh...@apache.org on 2020/02/22 04:01:03 UTC

[incubator-doris] branch master updated: Update spark load doc (#2973)

This is an automated email from the ASF dual-hosted git repository.

zhaoc pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fc2d92d  Update spark load doc (#2973)
fc2d92d is described below

commit fc2d92d68a5a6488d009770d723bd813e0d1a31c
Author: wyb <wy...@gmail.com>
AuthorDate: Sat Feb 22 12:00:50 2020 +0800

    Update spark load doc (#2973)
---
 docs/documentation/cn/internal/spark_load.md | 17 +++++++++++------
 1 file changed, 11 insertions(+), 6 deletions(-)

diff --git a/docs/documentation/cn/internal/spark_load.md b/docs/documentation/cn/internal/spark_load.md
index 799cf60..021b02b 100644
--- a/docs/documentation/cn/internal/spark_load.md
+++ b/docs/documentation/cn/internal/spark_load.md
@@ -29,6 +29,7 @@ spark load主要用于解决初次迁移,大量数据迁移doris的场景,
 * FE:Frontend,即 Palo 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
 * BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
 * Tablet: 一个palo table的水平分片称为tablet。
+* Dpp:Data preprocessing,数据预处理模块,通过外部计算资源(Hadoop、Spark)完成对导入数据预处理,包括转化、清洗、分区、排序和聚合等。
 
 ## 设计
 
@@ -68,7 +69,7 @@ Doris中现有的导入方式中,针对百G级别以上的数据的批量导
 		)
 		where k1 > 20
         )
-		with spark
+		with spark.cluster_name
         PROPERTIES
         (
         "spark.master" = "yarn",
@@ -78,7 +79,9 @@ Doris中现有的导入方式中,针对百G级别以上的数据的批量导
         "max_filter_ratio" = "0.1",
         );
 ```
-其中各个property的含义如下:
+其中spark.cluster_name为用户导入使用的Spark集群名,可以通过SET PROPERTY来设置,可参考原来Hadoop集群的设置。
+property中的Spark集群设置会覆盖spark.cluster_name中对应的内容。
+各个property的含义如下:
 - spark.master是表示spark集群部署模式,支持包括yarn/standalone/local/k8s,预计先实现yarn的支持,并且使用yarn-cluster模式(yarn-client模式一般用于交互式的场景)。
 - spark.executor.cores: executor的cpu个数
 - spark.executor.memory: executor的内存大小
@@ -186,7 +189,7 @@ LoadLoadingTask可以复现现在的逻辑,但是,有一个地方跟BrokerLo
 
 方案1可以最大限度的复用现有的导入框架,能够快速实现支持大数据量导入的功能。但是存在以下问题,就是经过spark etl处理之后的数据其实已经按照tablet划分好了,但是现有的Broker导入框架还是会对流式读取的数据进行分区和bucket计算,然后经过序列化通过rpc发送到对应的目标BE的机器,有一次序列化和网络IO的开销。 方案2是在SparkEtlJob生成数据的时候,直接生成doris的存储格式Segment文件,然后三个副本需要通过类似clone机制的方式,通过add_rowset接口,进行文件的导入。这种方案具体不一样的地方如下:
 
-1. 需要在生成的文件中添加tabletid后续
+1. 需要在生成的文件中添加tabletid后缀
 2. 在SparkLoadPendingTask类中增加一个接口protected Map<long, Pair<String, Long>> getFilePathMap()用于返回tabletid和文件之间的映射关系,
 3. 在BE rpc服务中增加一个spark_push接口,实现拉取源端etl转化之后的文件到本地(可以通过broker读取),然后通过add_rowset接口完成数据的导入,类似克隆的逻辑
 4. 生成新的导入任务SparkLoadLoadingTask,该SparkLoadLoadingTask主要功能就是读取job.json文件,解析其中的属性并且,将属性作为rpc参数,调用spark_push接口,向tablet所在的后端BE发送导入请求,进行数据的导入。BE中spark_push根据is_segment_file来决定如何处理,如果为true,则直接下载segment文件,进行add rowset;如果为false,则走pusher逻辑,实现数据导入。
@@ -195,6 +198,8 @@ LoadLoadingTask可以复现现在的逻辑,但是,有一个地方跟BrokerLo
 
 ## 总结
 
-综合以上三种方案,第一种方案的改动量比较小,是后面两种方案的基础。第二种方案的对导入框架的改动较大,而且需要依赖单副本导入的修改。相对来说第三种方案的性能提升可能会更好。所以,计划分两步完成spark load的工作。
-第一步,按照方案1,快读支持spark导入的功能。
-第二部,按照方案2,封装segment写入的库,并且增加一个rpc接口,实现类似clone的导入逻辑。
\ No newline at end of file
+综合以上两种方案,第一种方案的改动量比较小,但是BE做了重复的工作。第二种方案可以参考原有的Hadoop导入框架。所以,计划分两步完成spark load的工作。
+
+第一步,按照方案2,实现通过Spark完成导入数据的分区排序聚合,生成parquet格式文件。然后走Hadoop pusher的流程由BE转化格式。
+
+第二步,封装segment写入的库,直接生成Doris底层的格式,并且增加一个rpc接口,实现类似clone的导入逻辑。


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org