You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/02/10 07:05:47 UTC

[GitHub] [incubator-doris] tianshanxuepiao commented on a change in pull request #2856: add spark load design

tianshanxuepiao commented on a change in pull request #2856: add spark load design
URL: https://github.com/apache/incubator-doris/pull/2856#discussion_r376893851
 
 

 ##########
 File path: docs/documentation/cn/internal/spark_load.md
 ##########
 @@ -0,0 +1,166 @@
+# Doris支持spark导入设计文档
+
+## 背景
+
+Doris现在支持Broker load/routine load/stream load/mini batch load等多种导入方式。
+spark load主要用于解决初次迁移,大量数据迁移doris的场景,用于提升数据导入的速度。
+
+## 名词解释
+
+* FE:Frontend,即 Palo 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
+* BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
+* Tablet: 一个palo table的水平分片称为tablet。
+
+## 设计
+
+### 目标
+
+Doris中现有的导入方式中,针对百G级别以上的数据的批量导入支持不是很好,功能上需要修改很多配置,而且可能无法完成导入,性能上会比较慢,并且由于没有读写分离,需要占用较多的cpu等资源。而这种大数据量导入会在用户迁移的时候遇到,所以需要实现基于spark集群的导入功能,利用spark集群的并发能力,完成导入时的ETL计算,排序、聚合等等,满足用户大数据量导入需求,降低用户导入时间和迁移成本。
+
+在Spark导入中,需要考虑支持多种spark部署模式,设计上需要兼容多种部署方式,可以考虑先实现yarn集群的部署模式;同时,由于用户数据格式多种多样,需要支持包括csv、parquet、orc等多种格式的数据文件。
+
+### 实现方案
+
+在将spark导入的设计实现的时候,有必要讲一下现有的导入框架。现在有的导入框架,可以参考《Doris Broker导入实现解析》。
+
+#### 方案1
+
+参考现有的导入框架和原有适用于百度内部hadoop集群的hadoop导入方式的实现,为了最大程度复用现有的导入框架,降低开发的难度,整体的方案如下:
+
+用户的导入语句经过语法和语意分析之后,生成LoadStmt,LoadStmt中增加一个isSparkLoad标识字段,如果为true,就会创建出SparkLoadJob,跟BrokerLoadJob类似,会通过状态机机制,实现Job的执行,在PENDING,会创建SparkLoadPendingTask,然后在LOADING阶段还是创建LoadLoadingTask,进行数据导入。在BE中,复用现有的计划执行框架,执行导入计划。
+
+实现Spark导入主要需要考虑以下几点:
+
+##### 语法	
+	这块主要考虑用户习惯,导入语句格式上尽量保持跟broker导入语句相似。下面是一个方案:
+
+```
+		LOAD LABEL example_db.label1
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+		NEGATIVE
+        INTO TABLE `my_table`
+		PARTITION (p1, p2)
+		COLUMNS TERMINATED BY ","
+		columns(k1,k2,k3,v1,v2)
+		set (
+			v3 = v1 + v2,
+			k4 = hll_hash(k2)
+		)
+		where k1 > 20
+		with spark
+        )
+        PROPERTIES
+        (
+        "cluster_type" = "yarn",
+        "yarn_resourcemanager_address" = "xxx.tc:8032",
+        "max_filter_ratio" = "0.1"
+        );
+```
+其中各个property的含义如下:
+- cluster_type是表示spark集群部署模式,支持包括yarn/standalone/local/k8s,预计先实现yarn的支持,并且使用yarn-cluster模式(yarn-client模式一般用于交互式的场景)。
+- yarn_resourcemanager_address:指定yarn的resourcemanager地址
+- max_filter_ratio:指定最大过滤比例阈值
+
+##### SparkLoadJob
+
+用户发送spark load语句,经过parse之后,会创建SparkLoadJob,
+
+```
+SparkLoadJob:
+         +-------+-------+
+         |    PENDING    |-----------------|
+         +-------+-------+                 |
+				 | SparkLoadPendingTask    |
+                 v                         |
+         +-------+-------+                 |
+         |    LOADING    |-----------------|
+         +-------+-------+                 |
+				 | LoadLodingTask          |
+                 v                         |
+         +-------+-------+                 |
+         |  COMMITTED    |-----------------|
+         +-------+-------+                 |
+				 |                         |
+                 v                         v  
+         +-------+-------+         +-------+-------+     
+         |   FINISHED    |         |   CANCELLED   |
+         +-------+-------+         +-------+-------+
+				 |                         Λ
+                 +-------------------------+
+```
+上图为SparkLoadJob的执行流程。
+
+##### SparkLoadPendingTask
+SparkLoadPendingTask主要用来提交spark etl作业到spark集群中。由于spark支持不同部署模型(localhost, standalone, yarn, k8s),所以需要抽象一个通用的接口SparkEtlJob,实现SparkEtl的功能,主要接口包括:
+- 提交spark etl任务
+- 取消spark etl的任务
+- 获取spark etl任务状态的接口
+
+大体接口如下:
+```
+class SparkEtlJob {
+	// 提交spark etl作业
+	// 返回JobId
+	String submitJob(TBrokerScanRangeParams params);
+
+	// 取消作业,用于支持用户cancel导入作业
+	bool cancelJob(String jobId);
+
+	// 获取作业状态,用于判断是否已经完成
+	JobStatus getJobStatus(String jobId);
+private:
+	std::list<DataDescription> data_descriptions;
+};
+```
+可以实现不同的子类,来实现对不同集群部署模式的支持。可以实现SparkEtlJobForYarn用于支持yarn集群的spark导入作业。具体来说上述接口中JobId就是Yarn集群的appid,如何获取appid?一个方案是通过spark-submit客户端提交spark job,然后分析标准错误中的输出,通过文本匹配获取appid。
+
+这里需要参考hadoop dpp作业的经验,就是需要考虑任务运行可能因为数据量、集群队列等原因,会达到并发导入作业个数限制,导致后续任务提交失败,这块需要考虑一下任务堆积的问题。一个方案是可以单独设置spark load job并发数限制,并且针对每个用户提供一个并发数的限制,这样各个用户之间的作业可以不用相互干扰,提升用户体验。
+
+spark任务执行的事情,包括以下几个关键点:
+1. 类型转化(extraction/Transformation)
+
+	将源文件字段转成具体列类型(判断字段是否合法,进行函数计算等等)
+2. 函数计算(Transformation),包括negative计算
+	
+	完成用户指定的列函数的计算。函数列表:"strftime","time_format","alignment_timestamp","default_value","md5sum","replace_value","now","hll_hash","substitute"
 
 Review comment:
   使用spark-submit提交spark job会依赖部署spark客户端,这块是否也可以抽象下,支持扩展如使用spark api或者Livy的方式提交?
   
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org