You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by GitBox <gi...@apache.org> on 2020/02/18 13:32:24 UTC

[GitHub] [incubator-doris] kangpinghuang opened a new pull request #2939: spark etl design

kangpinghuang opened a new pull request #2939: spark etl design
URL: https://github.com/apache/incubator-doris/pull/2939
 
 
   to resolve issue #2938 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wangbo commented on a change in pull request #2939: spark etl design

Posted by GitBox <gi...@apache.org>.
wangbo commented on a change in pull request #2939: spark etl design
URL: https://github.com/apache/incubator-doris/pull/2939#discussion_r381064301
 
 

 ##########
 File path: docs/documentation/cn/internal/spark_etl.md
 ##########
 @@ -0,0 +1,342 @@
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Doris spark导入ETL逻辑设计
+
+## 背景
+
+Doris为了解决初次迁移,大量数据迁移doris的问题,引入了spark导入,用于提升数据导入的速度。在spark导入中,需要利用spark进行ETL计算、分区、分桶、文件格式生成等逻辑。下面分别讲述具体的实现设计。
+
+## 名词解释
+
+* FE:Frontend,即 Palo 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
+* BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
+
+## 设计
+
+### 目标
+
+在Spark导入中,需要达到以下目标:
+
+1. 需要考虑支持多种spark部署模式,设计上需要兼容多种部署方式,可以考虑先实现yarn集群的部署模式;
+2. 需要支持包括csv、parquet、orc等多种格式的数据文件。
+3. 能够支持doris中所有的类型,其中包括hll和bitmap类型。同时,bitmap类型需要考虑支持全局字典,以实现string类型的精确去重
+4. 能够支持排序和预聚合
+5. 支持分区分桶逻辑
+6. 支持生成base表和rollup表的数据
+7. 能够支持生成doris的存储格式
+
+### 实现方案
+
+参考[pr-2865](https://github.com/apache/incubator-doris/pull/2856), 整的方案将按照如下的框架实现:
+
+```
+SparkLoadJob:
+         +-------+-------+
+         |    PENDING    |-----------------|
+         +-------+-------+                 |
+				 | SparkLoadPendingTask    |
+                 v                         |
+         +-------+-------+                 |
+         |    LOADING    |-----------------|
+         +-------+-------+                 |
+				 | SparkLoadLodingTas      |
+                 v                         |
+         +-------+-------+                 |
+         |  COMMITTED    |-----------------|
+         +-------+-------+                 |
+				 |                         |
+                 v                         v  
+         +-------+-------+         +-------+-------+     
+         |   FINISHED    |         |   CANCELLED   |
+         +-------+-------+         +-------+-------+
+				 |                         Λ
+                 +-------------------------+
+```
+
+整个流程大体如下:
+
+1. 用户的sql语句会被解析成LoadStmt,并且里面带一个is_spark_load的属性,为true
+2. LoadStmt会生成SparkLoadJob进行执行
+3. 在SparkLoadJob阶段会生成SparkLoadPendingTask,完成spark etl作业的提交
+4. 在SparkLoadLodingTask会向table涉及到的BE发送TPushReq
+5. 在BE中需要基于EngineBatchLoadTask完成数据的下载和导入
+
+spark导入的语句:
+这块主要考虑用户习惯,导入语句格式上尽量保持跟broker导入语句相似。下面是一个方案:
+
+```
+		LOAD LABEL example_db.label1
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+		NEGATIVE
+        INTO TABLE `my_table`
+		PARTITION (p1, p2)
+		COLUMNS TERMINATED BY ","
+		columns(k1,k2,k3,v1,v2)
+		set (
+			v3 = v1 + v2,
+			k4 = hll_hash(k2)
+		)
+		where k1 > 20
+        )
+		with spark
+        PROPERTIES
+        (
+        "cluster_type" = "yarn",
+        "yarn_resourcemanager_address" = "xxx.tc:8032",
+        "max_filter_ratio" = "0.1"
+        );
+```
+
+#### SparkLoadPendingTask
+
+该Task主要实现spark任务的提交。 
+
+1. 作业提交
+
+为此实现一个SparkApplication类,用于完成spark作业的提交.
+
+```
+class SparkApplication {
+public:
+	// 提交spark etl作业
+	// 返回appId
+	String submitApp(TBrokerScanRangeParams params);
+
+	// 取消作业,用于支持用户cancel导入作业
+	bool cancelApp(String appId);
+
+	// 获取作业状态,用于判断是否已经完成
+	JobStatus getJobStatus(String jobId);
+};
+
+```
+
+SparkApplication采用基于InProcessLauncher,并且采用cluster模式,来实现在FE中提交spark etl app。主要逻辑如下:
+```
+AbstractLauncher launcher = new InProcessLauncher();
+launcher.setAppResource(appResource)
+        .setMainClass(mainClass) // 这个地方使用导入作业的label
+        .setMaster(master) // 这个地方使用cluster
+CountDownLatch countDownLatch;
+Listener handleListeners = new SparkAppHandle.Listener() {
+    @Override
+    public void stateChanged(SparkAppHandle handle) {
+        if (handle.getState().isFinal()) {
+            countDownLatch.countDown();
+        }
+
+        @Override
+        public void infoChanged(SparkAppHandle handle) {}
+    };
+
+countDownLatch = new CountDownLatch(1);
+
+SparkAppHandle handle = launcher.startApplication(handleListeners);
+boolean regularExit = countDownLatch.await(20, TimeUnit.MINUTES);
+if (!regularExit)
+    handle.kill();
+```
+
+
+其中SparkAppHandle可以用来操控Spark App作业,其接口主要如下:
+```
+void	addListener(SparkAppHandle.Listener l);
+void	disconnect();
+String	getAppId();
+SparkAppHandle.State	getState();
+void	kill();
+void	stop();
+```
+
+2. 参数
+
+	参数分为两大类:
+
+- spark相关的参数,列表可以参考:http://spark.apache.org/docs/latest/running-on-yarn.html。最主要的参数罗列如下:
+	```
+		spark.yarn.am.memory
+		spark.yarn.am.cores
+		spark.executor.instances
+		spark.executor.memory
+		spark.executor.cores
+		spark.yarn.stagingDir
+		spark.yarn.queue
+	```
+- 业务参数,包括表的schema信息、表的rollup信息、用户导入语句的信息,参考broker load,主要参数结构如下:
+
+```
+DataDescriptions:用户导入语句信息,用于获取列映射关系、column from path、where predicate、文件类型等等参数。需要对DataDescriptions分析之后,得到类似TBrokerScanRangeParams的结构体,传给spark etl作业。
+
+Partition信息和bucket信息:包括分区和分桶列和现有的partition信息。具体就是:TOlapTablePartitionParam、TOlapTableIndexSchema、TOlapTableSchemaParam等
+
+base表和rollup表的树形层级关系:优化现有的rollup数据生成方案,实现基于最小覆盖parent rollup来计算rollup的方式,优化rollup计算。 
+class IndexTree {
+	int indexId;
+	int parentIndexId;
+	List<int> childIndexId;
+};
+```
+针对精确去重场景下的bitmap类型的列需要特别处理,因为bitmap类型的列可能需要构建全局词典和利用全局词典进行数据转化的步骤,因此业务相关的参数中还需要提供一个bitmap列的信息List<BitmapArg>,定义如下:
+
+```
+class BitmapArg {
+String columnName; // bitmap列名
+String globalDictPath; // 全局字典路径
+};
+
+```
+现在,全局字典构建方案还没有确定,一种方案https://github.com/wangbo/open_mark/issues/2。不管怎么样,这里主要关心产生的全局字典的路径。对于具体的构建步骤可以等实现全局字典的时候再设计(如果采用基于hive的方案,可以在提交spark etl app之前,进行词典构建任务)。
+
+
+参考:
+- http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html
+- http://spark.apache.org/docs/latest/api/java/org/apache/spark/launcher/SparkLauncher.html#startApplication-org.apache.spark.launcher.SparkAppHandle.Listener...-
+- http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html
+- https://www.cnblogs.com/yy3b2007com/p/10247239.html
+
+#### spark etl作业
+
+spark etl中主要考虑实现以下逻辑:
+
+1. doris类型和spark类型的映射
+
+spark支持的类型:https://spark.apache.org/docs/latest/sql-reference.html#data-types
+
+除了hll类型和bitmap列类型之外,spark支持所有的doris类型。在spark 2.x中不支持用户自定义类型(UDT),为了实现hll和bitmap类型的支持,这两种类型,可以使用Spark的StringType来进行存储,并且实现对应的udf和udaf函数。
 
 Review comment:
   spark预计算的时候,对于bitmap类型,内存中就用roarbitmap的类库就可以了。这个类库有专门的序列化接口,支持跨语言传数据,bitmap算好之后,序列化到文件中就可以了。
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangpinghuang commented on a change in pull request #2939: spark etl design

Posted by GitBox <gi...@apache.org>.
kangpinghuang commented on a change in pull request #2939: spark etl design
URL: https://github.com/apache/incubator-doris/pull/2939#discussion_r382360503
 
 

 ##########
 File path: docs/documentation/cn/internal/spark_etl.md
 ##########
 @@ -0,0 +1,342 @@
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Doris spark导入ETL逻辑设计
+
+## 背景
+
+Doris为了解决初次迁移,大量数据迁移doris的问题,引入了spark导入,用于提升数据导入的速度。在spark导入中,需要利用spark进行ETL计算、分区、分桶、文件格式生成等逻辑。下面分别讲述具体的实现设计。
+
+## 名词解释
+
+* FE:Frontend,即 Palo 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
+* BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
+
+## 设计
+
+### 目标
+
+在Spark导入中,需要达到以下目标:
+
+1. 需要考虑支持多种spark部署模式,设计上需要兼容多种部署方式,可以考虑先实现yarn集群的部署模式;
+2. 需要支持包括csv、parquet、orc等多种格式的数据文件。
+3. 能够支持doris中所有的类型,其中包括hll和bitmap类型。同时,bitmap类型需要考虑支持全局字典,以实现string类型的精确去重
+4. 能够支持排序和预聚合
+5. 支持分区分桶逻辑
+6. 支持生成base表和rollup表的数据
+7. 能够支持生成doris的存储格式
+
+### 实现方案
+
+参考[pr-2856](https://github.com/apache/incubator-doris/pull/2856), 整的方案将按照如下的框架实现:
+
+```
+SparkLoadJob:
+         +-------+-------+
+         |    PENDING    |-----------------|
+         +-------+-------+                 |
+				 | SparkLoadPendingTask    |
+                 v                         |
+         +-------+-------+                 |
+         |    LOADING    |-----------------|
+         +-------+-------+                 |
+				 | SparkLoadLodingTask     |
+                 v                         |
+         +-------+-------+                 |
+         |  COMMITTED    |-----------------|
+         +-------+-------+                 |
+				 |                         |
+                 v                         v  
+         +-------+-------+         +-------+-------+     
+         |   FINISHED    |         |   CANCELLED   |
+         +-------+-------+         +-------+-------+
+				 |                         Λ
+                 +-------------------------+
+```
+
+整个流程大体如下:
+
+1. 用户的sql语句会被解析成LoadStmt,并且里面带一个is_spark_load的属性,为true
+2. LoadStmt会生成SparkLoadJob进行执行
+3. 在SparkLoadJob阶段会生成SparkLoadPendingTask,完成spark etl作业的提交
+4. 在SparkLoadLodingTask会向table涉及到的BE发送TPushReq
+5. 在BE中需要基于EngineBatchLoadTask完成数据的下载和导入
+
+spark导入的语句:
+这块主要考虑用户习惯,导入语句格式上尽量保持跟broker导入语句相似。下面是一个方案:
+
+```
+		LOAD LABEL example_db.label1
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+		NEGATIVE
+        INTO TABLE `my_table`
+		PARTITION (p1, p2)
+		COLUMNS TERMINATED BY ","
+		columns(k1,k2,k3,v1,v2)
+		set (
+			v3 = v1 + v2,
+			k4 = hll_hash(k2)
+		)
+		where k1 > 20
+        )
+		with spark
+        PROPERTIES
+        (
+        "cluster_type" = "yarn",
+        "yarn_resourcemanager_address" = "xxx.tc:8032",
+        "max_filter_ratio" = "0.1"
+        );
+```
+
+#### SparkLoadPendingTask
+
+该Task主要实现spark任务的提交。 
+
+1. 作业提交
+
+为此实现一个SparkApplication类,用于完成spark作业的提交.
+
+```
+class SparkApplication {
+public:
+	// 提交spark etl作业
+	// 返回appId
+	String submitApp(TBrokerScanRangeParams params);
+
+	// 取消作业,用于支持用户cancel导入作业
+	bool cancelApp(String appId);
+
+	// 获取作业状态,用于判断是否已经完成
+	JobStatus getJobStatus(String jobId);
+};
+
+```
+
+SparkApplication采用基于InProcessLauncher,并且采用cluster模式,来实现在FE中提交spark etl app。主要逻辑如下:
+```
+AbstractLauncher launcher = new InProcessLauncher();
+launcher.setAppResource(appResource)
+        .setMainClass(mainClass) // 这个地方使用导入作业的label
+        .setMaster(master) // 这个地方使用cluster
+CountDownLatch countDownLatch;
+Listener handleListeners = new SparkAppHandle.Listener() {
+    @Override
+    public void stateChanged(SparkAppHandle handle) {
+        if (handle.getState().isFinal()) {
+            countDownLatch.countDown();
+        }
+
+        @Override
+        public void infoChanged(SparkAppHandle handle) {}
+    };
+
+countDownLatch = new CountDownLatch(1);
+
+SparkAppHandle handle = launcher.startApplication(handleListeners);
+boolean regularExit = countDownLatch.await(20, TimeUnit.MINUTES);
+if (!regularExit)
+    handle.kill();
+```
+
+
+其中SparkAppHandle可以用来操控Spark App作业,其接口主要如下:
+```
+void	addListener(SparkAppHandle.Listener l);
+void	disconnect();
+String	getAppId();
+SparkAppHandle.State	getState();
+void	kill();
+void	stop();
+```
+
+2. 参数
+
+	参数分为两大类:
+
+- spark相关的参数,列表可以参考:http://spark.apache.org/docs/latest/running-on-yarn.html。最主要的参数罗列如下:
+	```
+		spark.yarn.am.memory
+		spark.yarn.am.cores
+		spark.executor.instances
+		spark.executor.memory
+		spark.executor.cores
+		spark.yarn.stagingDir
+		spark.yarn.queue
+	```
+- 业务参数,包括表的schema信息、表的rollup信息、用户导入语句的信息,参考broker load,主要参数结构如下:
+
+```
+DataDescriptions:用户导入语句信息,用于获取列映射关系、column from path、where predicate、文件类型等等参数。需要对DataDescriptions分析之后,得到类似TBrokerScanRangeParams的结构体,传给spark etl作业。
+
+Partition信息和bucket信息:包括分区和分桶列和现有的partition信息。具体就是:TOlapTablePartitionParam、TOlapTableIndexSchema、TOlapTableSchemaParam等
+
+base表和rollup表的树形层级关系:优化现有的rollup数据生成方案,实现基于最小覆盖parent rollup来计算rollup的方式,优化rollup计算。 
+class IndexTree {
+	int indexId;
+	int parentIndexId;
+	List<int> childIndexId;
+};
+```
+针对精确去重场景下的bitmap类型的列需要特别处理,因为bitmap类型的列可能需要构建全局词典和利用全局词典进行数据转化的步骤,因此业务相关的参数中还需要提供一个bitmap列的信息List<BitmapArg>,定义如下:
+
+```
+class BitmapArg {
+String columnName; // bitmap列名
+String globalDictPath; // 全局字典路径
+};
+
+```
+现在,全局字典构建方案还没有确定,一种方案https://github.com/wangbo/open_mark/issues/2。不管怎么样,这里主要关心产生的全局字典的路径。对于具体的构建步骤可以等实现全局字典的时候再设计(如果采用基于hive的方案,可以在提交spark etl app之前,进行词典构建任务)。
 
 Review comment:
   可以先简单搞一下,后续这块等全局词典设计确认之后再改,也比较好改。

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangpinghuang commented on a change in pull request #2939: spark etl design

Posted by GitBox <gi...@apache.org>.
kangpinghuang commented on a change in pull request #2939: spark etl design
URL: https://github.com/apache/incubator-doris/pull/2939#discussion_r382359984
 
 

 ##########
 File path: docs/documentation/cn/internal/spark_etl.md
 ##########
 @@ -0,0 +1,323 @@
+# Doris spark导入ETL逻辑设计
+
+## 背景
+
+Doris为了解决初次迁移,大量数据迁移doris的问题,引入了spark导入,用于提升数据导入的速度。在spark导入中,需要利用spark进行ETL计算、分区、分桶、文件格式生成等逻辑。下面分别讲述具体的实现设计。
+
+## 名词解释
+
+* FE:Frontend,即 Palo 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
+* BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
+
+## 设计
+
+### 目标
+
+在Spark导入中,需要达到以下目标:
+
+1. 需要考虑支持多种spark部署模式,设计上需要兼容多种部署方式,可以考虑先实现yarn集群的部署模式;
+2. 需要支持包括csv、parquet、orc等多种格式的数据文件。
+3. 能够支持doris中所有的类型,其中包括hll和bitmap类型。同时,bitmap类型需要考虑支持全局字典,以实现string类型的精确去重
+4. 能够支持排序和预聚合
+5. 支持分区分桶逻辑
+6. 支持生成base表和rollup表的数据
+7. 能够支持生成doris的存储格式
+
+### 实现方案
+
+参考[pr-2865](https://github.com/apache/incubator-doris/pull/2856), 整的方案将按照如下的框架实现:
+
+```
+SparkLoadJob:
+         +-------+-------+
+         |    PENDING    |-----------------|
+         +-------+-------+                 |
+				 | SparkLoadPendingTask    |
+                 v                         |
+         +-------+-------+                 |
+         |    LOADING    |-----------------|
+         +-------+-------+                 |
+				 | SparkLoadLodingTas      |
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] crazyleeyang commented on a change in pull request #2939: spark etl design

Posted by GitBox <gi...@apache.org>.
crazyleeyang commented on a change in pull request #2939: spark etl design
URL: https://github.com/apache/incubator-doris/pull/2939#discussion_r381042007
 
 

 ##########
 File path: docs/documentation/cn/internal/spark_etl.md
 ##########
 @@ -0,0 +1,323 @@
+# Doris spark导入ETL逻辑设计
+
+## 背景
+
+Doris为了解决初次迁移,大量数据迁移doris的问题,引入了spark导入,用于提升数据导入的速度。在spark导入中,需要利用spark进行ETL计算、分区、分桶、文件格式生成等逻辑。下面分别讲述具体的实现设计。
+
+## 名词解释
+
+* FE:Frontend,即 Palo 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
+* BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
+
+## 设计
+
+### 目标
+
+在Spark导入中,需要达到以下目标:
+
+1. 需要考虑支持多种spark部署模式,设计上需要兼容多种部署方式,可以考虑先实现yarn集群的部署模式;
+2. 需要支持包括csv、parquet、orc等多种格式的数据文件。
+3. 能够支持doris中所有的类型,其中包括hll和bitmap类型。同时,bitmap类型需要考虑支持全局字典,以实现string类型的精确去重
+4. 能够支持排序和预聚合
+5. 支持分区分桶逻辑
+6. 支持生成base表和rollup表的数据
+7. 能够支持生成doris的存储格式
+
+### 实现方案
+
+参考[pr-2865](https://github.com/apache/incubator-doris/pull/2856), 整的方案将按照如下的框架实现:
+
+```
+SparkLoadJob:
+         +-------+-------+
+         |    PENDING    |-----------------|
+         +-------+-------+                 |
+				 | SparkLoadPendingTask    |
+                 v                         |
+         +-------+-------+                 |
+         |    LOADING    |-----------------|
+         +-------+-------+                 |
+				 | SparkLoadLodingTas      |
 
 Review comment:
   SparkLoadLodingTas 应该是 SparkLoadLodingTask吧

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangpinghuang commented on a change in pull request #2939: spark etl design

Posted by GitBox <gi...@apache.org>.
kangpinghuang commented on a change in pull request #2939: spark etl design
URL: https://github.com/apache/incubator-doris/pull/2939#discussion_r382359959
 
 

 ##########
 File path: docs/documentation/cn/internal/spark_etl.md
 ##########
 @@ -0,0 +1,323 @@
+# Doris spark导入ETL逻辑设计
+
+## 背景
+
+Doris为了解决初次迁移,大量数据迁移doris的问题,引入了spark导入,用于提升数据导入的速度。在spark导入中,需要利用spark进行ETL计算、分区、分桶、文件格式生成等逻辑。下面分别讲述具体的实现设计。
+
+## 名词解释
+
+* FE:Frontend,即 Palo 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
+* BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
+
+## 设计
+
+### 目标
+
+在Spark导入中,需要达到以下目标:
+
+1. 需要考虑支持多种spark部署模式,设计上需要兼容多种部署方式,可以考虑先实现yarn集群的部署模式;
+2. 需要支持包括csv、parquet、orc等多种格式的数据文件。
+3. 能够支持doris中所有的类型,其中包括hll和bitmap类型。同时,bitmap类型需要考虑支持全局字典,以实现string类型的精确去重
+4. 能够支持排序和预聚合
+5. 支持分区分桶逻辑
+6. 支持生成base表和rollup表的数据
+7. 能够支持生成doris的存储格式
+
+### 实现方案
+
+参考[pr-2865](https://github.com/apache/incubator-doris/pull/2856), 整的方案将按照如下的框架实现:
 
 Review comment:
   done

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wangbo commented on a change in pull request #2939: spark etl design

Posted by GitBox <gi...@apache.org>.
wangbo commented on a change in pull request #2939: spark etl design
URL: https://github.com/apache/incubator-doris/pull/2939#discussion_r381064301
 
 

 ##########
 File path: docs/documentation/cn/internal/spark_etl.md
 ##########
 @@ -0,0 +1,342 @@
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Doris spark导入ETL逻辑设计
+
+## 背景
+
+Doris为了解决初次迁移,大量数据迁移doris的问题,引入了spark导入,用于提升数据导入的速度。在spark导入中,需要利用spark进行ETL计算、分区、分桶、文件格式生成等逻辑。下面分别讲述具体的实现设计。
+
+## 名词解释
+
+* FE:Frontend,即 Palo 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
+* BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
+
+## 设计
+
+### 目标
+
+在Spark导入中,需要达到以下目标:
+
+1. 需要考虑支持多种spark部署模式,设计上需要兼容多种部署方式,可以考虑先实现yarn集群的部署模式;
+2. 需要支持包括csv、parquet、orc等多种格式的数据文件。
+3. 能够支持doris中所有的类型,其中包括hll和bitmap类型。同时,bitmap类型需要考虑支持全局字典,以实现string类型的精确去重
+4. 能够支持排序和预聚合
+5. 支持分区分桶逻辑
+6. 支持生成base表和rollup表的数据
+7. 能够支持生成doris的存储格式
+
+### 实现方案
+
+参考[pr-2865](https://github.com/apache/incubator-doris/pull/2856), 整的方案将按照如下的框架实现:
+
+```
+SparkLoadJob:
+         +-------+-------+
+         |    PENDING    |-----------------|
+         +-------+-------+                 |
+				 | SparkLoadPendingTask    |
+                 v                         |
+         +-------+-------+                 |
+         |    LOADING    |-----------------|
+         +-------+-------+                 |
+				 | SparkLoadLodingTas      |
+                 v                         |
+         +-------+-------+                 |
+         |  COMMITTED    |-----------------|
+         +-------+-------+                 |
+				 |                         |
+                 v                         v  
+         +-------+-------+         +-------+-------+     
+         |   FINISHED    |         |   CANCELLED   |
+         +-------+-------+         +-------+-------+
+				 |                         Λ
+                 +-------------------------+
+```
+
+整个流程大体如下:
+
+1. 用户的sql语句会被解析成LoadStmt,并且里面带一个is_spark_load的属性,为true
+2. LoadStmt会生成SparkLoadJob进行执行
+3. 在SparkLoadJob阶段会生成SparkLoadPendingTask,完成spark etl作业的提交
+4. 在SparkLoadLodingTask会向table涉及到的BE发送TPushReq
+5. 在BE中需要基于EngineBatchLoadTask完成数据的下载和导入
+
+spark导入的语句:
+这块主要考虑用户习惯,导入语句格式上尽量保持跟broker导入语句相似。下面是一个方案:
+
+```
+		LOAD LABEL example_db.label1
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+		NEGATIVE
+        INTO TABLE `my_table`
+		PARTITION (p1, p2)
+		COLUMNS TERMINATED BY ","
+		columns(k1,k2,k3,v1,v2)
+		set (
+			v3 = v1 + v2,
+			k4 = hll_hash(k2)
+		)
+		where k1 > 20
+        )
+		with spark
+        PROPERTIES
+        (
+        "cluster_type" = "yarn",
+        "yarn_resourcemanager_address" = "xxx.tc:8032",
+        "max_filter_ratio" = "0.1"
+        );
+```
+
+#### SparkLoadPendingTask
+
+该Task主要实现spark任务的提交。 
+
+1. 作业提交
+
+为此实现一个SparkApplication类,用于完成spark作业的提交.
+
+```
+class SparkApplication {
+public:
+	// 提交spark etl作业
+	// 返回appId
+	String submitApp(TBrokerScanRangeParams params);
+
+	// 取消作业,用于支持用户cancel导入作业
+	bool cancelApp(String appId);
+
+	// 获取作业状态,用于判断是否已经完成
+	JobStatus getJobStatus(String jobId);
+};
+
+```
+
+SparkApplication采用基于InProcessLauncher,并且采用cluster模式,来实现在FE中提交spark etl app。主要逻辑如下:
+```
+AbstractLauncher launcher = new InProcessLauncher();
+launcher.setAppResource(appResource)
+        .setMainClass(mainClass) // 这个地方使用导入作业的label
+        .setMaster(master) // 这个地方使用cluster
+CountDownLatch countDownLatch;
+Listener handleListeners = new SparkAppHandle.Listener() {
+    @Override
+    public void stateChanged(SparkAppHandle handle) {
+        if (handle.getState().isFinal()) {
+            countDownLatch.countDown();
+        }
+
+        @Override
+        public void infoChanged(SparkAppHandle handle) {}
+    };
+
+countDownLatch = new CountDownLatch(1);
+
+SparkAppHandle handle = launcher.startApplication(handleListeners);
+boolean regularExit = countDownLatch.await(20, TimeUnit.MINUTES);
+if (!regularExit)
+    handle.kill();
+```
+
+
+其中SparkAppHandle可以用来操控Spark App作业,其接口主要如下:
+```
+void	addListener(SparkAppHandle.Listener l);
+void	disconnect();
+String	getAppId();
+SparkAppHandle.State	getState();
+void	kill();
+void	stop();
+```
+
+2. 参数
+
+	参数分为两大类:
+
+- spark相关的参数,列表可以参考:http://spark.apache.org/docs/latest/running-on-yarn.html。最主要的参数罗列如下:
+	```
+		spark.yarn.am.memory
+		spark.yarn.am.cores
+		spark.executor.instances
+		spark.executor.memory
+		spark.executor.cores
+		spark.yarn.stagingDir
+		spark.yarn.queue
+	```
+- 业务参数,包括表的schema信息、表的rollup信息、用户导入语句的信息,参考broker load,主要参数结构如下:
+
+```
+DataDescriptions:用户导入语句信息,用于获取列映射关系、column from path、where predicate、文件类型等等参数。需要对DataDescriptions分析之后,得到类似TBrokerScanRangeParams的结构体,传给spark etl作业。
+
+Partition信息和bucket信息:包括分区和分桶列和现有的partition信息。具体就是:TOlapTablePartitionParam、TOlapTableIndexSchema、TOlapTableSchemaParam等
+
+base表和rollup表的树形层级关系:优化现有的rollup数据生成方案,实现基于最小覆盖parent rollup来计算rollup的方式,优化rollup计算。 
+class IndexTree {
+	int indexId;
+	int parentIndexId;
+	List<int> childIndexId;
+};
+```
+针对精确去重场景下的bitmap类型的列需要特别处理,因为bitmap类型的列可能需要构建全局词典和利用全局词典进行数据转化的步骤,因此业务相关的参数中还需要提供一个bitmap列的信息List<BitmapArg>,定义如下:
+
+```
+class BitmapArg {
+String columnName; // bitmap列名
+String globalDictPath; // 全局字典路径
+};
+
+```
+现在,全局字典构建方案还没有确定,一种方案https://github.com/wangbo/open_mark/issues/2。不管怎么样,这里主要关心产生的全局字典的路径。对于具体的构建步骤可以等实现全局字典的时候再设计(如果采用基于hive的方案,可以在提交spark etl app之前,进行词典构建任务)。
+
+
+参考:
+- http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html
+- http://spark.apache.org/docs/latest/api/java/org/apache/spark/launcher/SparkLauncher.html#startApplication-org.apache.spark.launcher.SparkAppHandle.Listener...-
+- http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html
+- https://www.cnblogs.com/yy3b2007com/p/10247239.html
+
+#### spark etl作业
+
+spark etl中主要考虑实现以下逻辑:
+
+1. doris类型和spark类型的映射
+
+spark支持的类型:https://spark.apache.org/docs/latest/sql-reference.html#data-types
+
+除了hll类型和bitmap列类型之外,spark支持所有的doris类型。在spark 2.x中不支持用户自定义类型(UDT),为了实现hll和bitmap类型的支持,这两种类型,可以使用Spark的StringType来进行存储,并且实现对应的udf和udaf函数。
 
 Review comment:
   1 spark预计算的时候,对于bitmap类型,内存中就用roarbitmap的类库就可以了。这个类库有专门的序列化接口,支持跨语言传数据,bitmap算好之后,序列化到文件中就可以了。
   2 这udf和udaf是用来干啥的,计算hll和bitmap?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] wangbo commented on a change in pull request #2939: spark etl design

Posted by GitBox <gi...@apache.org>.
wangbo commented on a change in pull request #2939: spark etl design
URL: https://github.com/apache/incubator-doris/pull/2939#discussion_r381090187
 
 

 ##########
 File path: docs/documentation/cn/internal/spark_etl.md
 ##########
 @@ -0,0 +1,342 @@
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Doris spark导入ETL逻辑设计
+
+## 背景
+
+Doris为了解决初次迁移,大量数据迁移doris的问题,引入了spark导入,用于提升数据导入的速度。在spark导入中,需要利用spark进行ETL计算、分区、分桶、文件格式生成等逻辑。下面分别讲述具体的实现设计。
+
+## 名词解释
+
+* FE:Frontend,即 Palo 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
+* BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
+
+## 设计
+
+### 目标
+
+在Spark导入中,需要达到以下目标:
+
+1. 需要考虑支持多种spark部署模式,设计上需要兼容多种部署方式,可以考虑先实现yarn集群的部署模式;
+2. 需要支持包括csv、parquet、orc等多种格式的数据文件。
+3. 能够支持doris中所有的类型,其中包括hll和bitmap类型。同时,bitmap类型需要考虑支持全局字典,以实现string类型的精确去重
+4. 能够支持排序和预聚合
+5. 支持分区分桶逻辑
+6. 支持生成base表和rollup表的数据
+7. 能够支持生成doris的存储格式
+
+### 实现方案
+
+参考[pr-2856](https://github.com/apache/incubator-doris/pull/2856), 整的方案将按照如下的框架实现:
+
+```
+SparkLoadJob:
+         +-------+-------+
+         |    PENDING    |-----------------|
+         +-------+-------+                 |
+				 | SparkLoadPendingTask    |
+                 v                         |
+         +-------+-------+                 |
+         |    LOADING    |-----------------|
+         +-------+-------+                 |
+				 | SparkLoadLodingTask     |
+                 v                         |
+         +-------+-------+                 |
+         |  COMMITTED    |-----------------|
+         +-------+-------+                 |
+				 |                         |
+                 v                         v  
+         +-------+-------+         +-------+-------+     
+         |   FINISHED    |         |   CANCELLED   |
+         +-------+-------+         +-------+-------+
+				 |                         Λ
+                 +-------------------------+
+```
+
+整个流程大体如下:
+
+1. 用户的sql语句会被解析成LoadStmt,并且里面带一个is_spark_load的属性,为true
+2. LoadStmt会生成SparkLoadJob进行执行
+3. 在SparkLoadJob阶段会生成SparkLoadPendingTask,完成spark etl作业的提交
+4. 在SparkLoadLodingTask会向table涉及到的BE发送TPushReq
+5. 在BE中需要基于EngineBatchLoadTask完成数据的下载和导入
+
+spark导入的语句:
+这块主要考虑用户习惯,导入语句格式上尽量保持跟broker导入语句相似。下面是一个方案:
+
+```
+		LOAD LABEL example_db.label1
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+		NEGATIVE
+        INTO TABLE `my_table`
+		PARTITION (p1, p2)
+		COLUMNS TERMINATED BY ","
+		columns(k1,k2,k3,v1,v2)
+		set (
+			v3 = v1 + v2,
+			k4 = hll_hash(k2)
+		)
+		where k1 > 20
+        )
+		with spark
+        PROPERTIES
+        (
+        "cluster_type" = "yarn",
+        "yarn_resourcemanager_address" = "xxx.tc:8032",
+        "max_filter_ratio" = "0.1"
+        );
+```
+
+#### SparkLoadPendingTask
+
+该Task主要实现spark任务的提交。 
+
+1. 作业提交
+
+为此实现一个SparkApplication类,用于完成spark作业的提交.
+
+```
+class SparkApplication {
+public:
+	// 提交spark etl作业
+	// 返回appId
+	String submitApp(TBrokerScanRangeParams params);
+
+	// 取消作业,用于支持用户cancel导入作业
+	bool cancelApp(String appId);
+
+	// 获取作业状态,用于判断是否已经完成
+	JobStatus getJobStatus(String jobId);
+};
+
+```
+
+SparkApplication采用基于InProcessLauncher,并且采用cluster模式,来实现在FE中提交spark etl app。主要逻辑如下:
+```
+AbstractLauncher launcher = new InProcessLauncher();
+launcher.setAppResource(appResource)
+        .setMainClass(mainClass) // 这个地方使用导入作业的label
+        .setMaster(master) // 这个地方使用cluster
+CountDownLatch countDownLatch;
+Listener handleListeners = new SparkAppHandle.Listener() {
+    @Override
+    public void stateChanged(SparkAppHandle handle) {
+        if (handle.getState().isFinal()) {
+            countDownLatch.countDown();
+        }
+
+        @Override
+        public void infoChanged(SparkAppHandle handle) {}
+    };
+
+countDownLatch = new CountDownLatch(1);
+
+SparkAppHandle handle = launcher.startApplication(handleListeners);
+boolean regularExit = countDownLatch.await(20, TimeUnit.MINUTES);
+if (!regularExit)
+    handle.kill();
+```
+
+
+其中SparkAppHandle可以用来操控Spark App作业,其接口主要如下:
+```
+void	addListener(SparkAppHandle.Listener l);
+void	disconnect();
+String	getAppId();
+SparkAppHandle.State	getState();
+void	kill();
+void	stop();
+```
+
+2. 参数
+
+	参数分为两大类:
+
+- spark相关的参数,列表可以参考:http://spark.apache.org/docs/latest/running-on-yarn.html。最主要的参数罗列如下:
+	```
+		spark.yarn.am.memory
+		spark.yarn.am.cores
+		spark.executor.instances
+		spark.executor.memory
+		spark.executor.cores
+		spark.yarn.stagingDir
+		spark.yarn.queue
+	```
+- 业务参数,包括表的schema信息、表的rollup信息、用户导入语句的信息,参考broker load,主要参数结构如下:
+
+```
+DataDescriptions:用户导入语句信息,用于获取列映射关系、column from path、where predicate、文件类型等等参数。需要对DataDescriptions分析之后,得到类似TBrokerScanRangeParams的结构体,传给spark etl作业。
+
+Partition信息和bucket信息:包括分区和分桶列和现有的partition信息。具体就是:TOlapTablePartitionParam、TOlapTableIndexSchema、TOlapTableSchemaParam等
+
+base表和rollup表的树形层级关系:优化现有的rollup数据生成方案,实现基于最小覆盖parent rollup来计算rollup的方式,优化rollup计算。 
+class IndexTree {
+	int indexId;
+	int parentIndexId;
+	List<int> childIndexId;
+};
+```
+针对精确去重场景下的bitmap类型的列需要特别处理,因为bitmap类型的列可能需要构建全局词典和利用全局词典进行数据转化的步骤,因此业务相关的参数中还需要提供一个bitmap列的信息List<BitmapArg>,定义如下:
+
+```
+class BitmapArg {
+String columnName; // bitmap列名
+String globalDictPath; // 全局字典路径
+};
+
+```
+现在,全局字典构建方案还没有确定,一种方案https://github.com/wangbo/open_mark/issues/2。不管怎么样,这里主要关心产生的全局字典的路径。对于具体的构建步骤可以等实现全局字典的时候再设计(如果采用基于hive的方案,可以在提交spark etl app之前,进行词典构建任务)。
 
 Review comment:
   目前方案预期spark作业的输入应该是一张hive表,spark作业只需要知道表名就可以了

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangpinghuang commented on a change in pull request #2939: spark etl design

Posted by GitBox <gi...@apache.org>.
kangpinghuang commented on a change in pull request #2939: spark etl design
URL: https://github.com/apache/incubator-doris/pull/2939#discussion_r382360310
 
 

 ##########
 File path: docs/documentation/cn/internal/spark_etl.md
 ##########
 @@ -0,0 +1,342 @@
+<!-- 
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Doris spark导入ETL逻辑设计
+
+## 背景
+
+Doris为了解决初次迁移,大量数据迁移doris的问题,引入了spark导入,用于提升数据导入的速度。在spark导入中,需要利用spark进行ETL计算、分区、分桶、文件格式生成等逻辑。下面分别讲述具体的实现设计。
+
+## 名词解释
+
+* FE:Frontend,即 Palo 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
+* BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
+
+## 设计
+
+### 目标
+
+在Spark导入中,需要达到以下目标:
+
+1. 需要考虑支持多种spark部署模式,设计上需要兼容多种部署方式,可以考虑先实现yarn集群的部署模式;
+2. 需要支持包括csv、parquet、orc等多种格式的数据文件。
+3. 能够支持doris中所有的类型,其中包括hll和bitmap类型。同时,bitmap类型需要考虑支持全局字典,以实现string类型的精确去重
+4. 能够支持排序和预聚合
+5. 支持分区分桶逻辑
+6. 支持生成base表和rollup表的数据
+7. 能够支持生成doris的存储格式
+
+### 实现方案
+
+参考[pr-2865](https://github.com/apache/incubator-doris/pull/2856), 整的方案将按照如下的框架实现:
+
+```
+SparkLoadJob:
+         +-------+-------+
+         |    PENDING    |-----------------|
+         +-------+-------+                 |
+				 | SparkLoadPendingTask    |
+                 v                         |
+         +-------+-------+                 |
+         |    LOADING    |-----------------|
+         +-------+-------+                 |
+				 | SparkLoadLodingTas      |
+                 v                         |
+         +-------+-------+                 |
+         |  COMMITTED    |-----------------|
+         +-------+-------+                 |
+				 |                         |
+                 v                         v  
+         +-------+-------+         +-------+-------+     
+         |   FINISHED    |         |   CANCELLED   |
+         +-------+-------+         +-------+-------+
+				 |                         Λ
+                 +-------------------------+
+```
+
+整个流程大体如下:
+
+1. 用户的sql语句会被解析成LoadStmt,并且里面带一个is_spark_load的属性,为true
+2. LoadStmt会生成SparkLoadJob进行执行
+3. 在SparkLoadJob阶段会生成SparkLoadPendingTask,完成spark etl作业的提交
+4. 在SparkLoadLodingTask会向table涉及到的BE发送TPushReq
+5. 在BE中需要基于EngineBatchLoadTask完成数据的下载和导入
+
+spark导入的语句:
+这块主要考虑用户习惯,导入语句格式上尽量保持跟broker导入语句相似。下面是一个方案:
+
+```
+		LOAD LABEL example_db.label1
+        (
+        DATA INFILE("hdfs://hdfs_host:hdfs_port/user/palo/data/input/file")
+		NEGATIVE
+        INTO TABLE `my_table`
+		PARTITION (p1, p2)
+		COLUMNS TERMINATED BY ","
+		columns(k1,k2,k3,v1,v2)
+		set (
+			v3 = v1 + v2,
+			k4 = hll_hash(k2)
+		)
+		where k1 > 20
+        )
+		with spark
+        PROPERTIES
+        (
+        "cluster_type" = "yarn",
+        "yarn_resourcemanager_address" = "xxx.tc:8032",
+        "max_filter_ratio" = "0.1"
+        );
+```
+
+#### SparkLoadPendingTask
+
+该Task主要实现spark任务的提交。 
+
+1. 作业提交
+
+为此实现一个SparkApplication类,用于完成spark作业的提交.
+
+```
+class SparkApplication {
+public:
+	// 提交spark etl作业
+	// 返回appId
+	String submitApp(TBrokerScanRangeParams params);
+
+	// 取消作业,用于支持用户cancel导入作业
+	bool cancelApp(String appId);
+
+	// 获取作业状态,用于判断是否已经完成
+	JobStatus getJobStatus(String jobId);
+};
+
+```
+
+SparkApplication采用基于InProcessLauncher,并且采用cluster模式,来实现在FE中提交spark etl app。主要逻辑如下:
+```
+AbstractLauncher launcher = new InProcessLauncher();
+launcher.setAppResource(appResource)
+        .setMainClass(mainClass) // 这个地方使用导入作业的label
+        .setMaster(master) // 这个地方使用cluster
+CountDownLatch countDownLatch;
+Listener handleListeners = new SparkAppHandle.Listener() {
+    @Override
+    public void stateChanged(SparkAppHandle handle) {
+        if (handle.getState().isFinal()) {
+            countDownLatch.countDown();
+        }
+
+        @Override
+        public void infoChanged(SparkAppHandle handle) {}
+    };
+
+countDownLatch = new CountDownLatch(1);
+
+SparkAppHandle handle = launcher.startApplication(handleListeners);
+boolean regularExit = countDownLatch.await(20, TimeUnit.MINUTES);
+if (!regularExit)
+    handle.kill();
+```
+
+
+其中SparkAppHandle可以用来操控Spark App作业,其接口主要如下:
+```
+void	addListener(SparkAppHandle.Listener l);
+void	disconnect();
+String	getAppId();
+SparkAppHandle.State	getState();
+void	kill();
+void	stop();
+```
+
+2. 参数
+
+	参数分为两大类:
+
+- spark相关的参数,列表可以参考:http://spark.apache.org/docs/latest/running-on-yarn.html。最主要的参数罗列如下:
+	```
+		spark.yarn.am.memory
+		spark.yarn.am.cores
+		spark.executor.instances
+		spark.executor.memory
+		spark.executor.cores
+		spark.yarn.stagingDir
+		spark.yarn.queue
+	```
+- 业务参数,包括表的schema信息、表的rollup信息、用户导入语句的信息,参考broker load,主要参数结构如下:
+
+```
+DataDescriptions:用户导入语句信息,用于获取列映射关系、column from path、where predicate、文件类型等等参数。需要对DataDescriptions分析之后,得到类似TBrokerScanRangeParams的结构体,传给spark etl作业。
+
+Partition信息和bucket信息:包括分区和分桶列和现有的partition信息。具体就是:TOlapTablePartitionParam、TOlapTableIndexSchema、TOlapTableSchemaParam等
+
+base表和rollup表的树形层级关系:优化现有的rollup数据生成方案,实现基于最小覆盖parent rollup来计算rollup的方式,优化rollup计算。 
+class IndexTree {
+	int indexId;
+	int parentIndexId;
+	List<int> childIndexId;
+};
+```
+针对精确去重场景下的bitmap类型的列需要特别处理,因为bitmap类型的列可能需要构建全局词典和利用全局词典进行数据转化的步骤,因此业务相关的参数中还需要提供一个bitmap列的信息List<BitmapArg>,定义如下:
+
+```
+class BitmapArg {
+String columnName; // bitmap列名
+String globalDictPath; // 全局字典路径
+};
+
+```
+现在,全局字典构建方案还没有确定,一种方案https://github.com/wangbo/open_mark/issues/2。不管怎么样,这里主要关心产生的全局字典的路径。对于具体的构建步骤可以等实现全局字典的时候再设计(如果采用基于hive的方案,可以在提交spark etl app之前,进行词典构建任务)。
+
+
+参考:
+- http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html
+- http://spark.apache.org/docs/latest/api/java/org/apache/spark/launcher/SparkLauncher.html#startApplication-org.apache.spark.launcher.SparkAppHandle.Listener...-
+- http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/launcher/package-summary.html
+- https://www.cnblogs.com/yy3b2007com/p/10247239.html
+
+#### spark etl作业
+
+spark etl中主要考虑实现以下逻辑:
+
+1. doris类型和spark类型的映射
+
+spark支持的类型:https://spark.apache.org/docs/latest/sql-reference.html#data-types
+
+除了hll类型和bitmap列类型之外,spark支持所有的doris类型。在spark 2.x中不支持用户自定义类型(UDT),为了实现hll和bitmap类型的支持,这两种类型,可以使用Spark的StringType来进行存储,并且实现对应的udf和udaf函数。
 
 Review comment:
   恩,计算是用roarbitmap实现来实现序列化和反序列化,但是需要一个类型来保存这个值

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] crazyleeyang commented on a change in pull request #2939: spark etl design

Posted by GitBox <gi...@apache.org>.
crazyleeyang commented on a change in pull request #2939: spark etl design
URL: https://github.com/apache/incubator-doris/pull/2939#discussion_r381041474
 
 

 ##########
 File path: docs/documentation/cn/internal/spark_etl.md
 ##########
 @@ -0,0 +1,323 @@
+# Doris spark导入ETL逻辑设计
+
+## 背景
+
+Doris为了解决初次迁移,大量数据迁移doris的问题,引入了spark导入,用于提升数据导入的速度。在spark导入中,需要利用spark进行ETL计算、分区、分桶、文件格式生成等逻辑。下面分别讲述具体的实现设计。
+
+## 名词解释
+
+* FE:Frontend,即 Palo 的前端节点。主要负责接收和返回客户端请求、元数据以及集群管理、查询计划生成等工作。
+* BE:Backend,即 Palo 的后端节点。主要负责数据存储与管理、查询计划执行等工作。
+
+## 设计
+
+### 目标
+
+在Spark导入中,需要达到以下目标:
+
+1. 需要考虑支持多种spark部署模式,设计上需要兼容多种部署方式,可以考虑先实现yarn集群的部署模式;
+2. 需要支持包括csv、parquet、orc等多种格式的数据文件。
+3. 能够支持doris中所有的类型,其中包括hll和bitmap类型。同时,bitmap类型需要考虑支持全局字典,以实现string类型的精确去重
+4. 能够支持排序和预聚合
+5. 支持分区分桶逻辑
+6. 支持生成base表和rollup表的数据
+7. 能够支持生成doris的存储格式
+
+### 实现方案
+
+参考[pr-2865](https://github.com/apache/incubator-doris/pull/2856), 整的方案将按照如下的框架实现:
 
 Review comment:
   参考[pr-2865] 应该是 参考[pr-2856]

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org


[GitHub] [incubator-doris] kangpinghuang closed pull request #2939: spark etl design

Posted by GitBox <gi...@apache.org>.
kangpinghuang closed pull request #2939:
URL: https://github.com/apache/incubator-doris/pull/2939


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org