You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/05/30 10:25:54 UTC
[kylin] 07/11: KYLIN-4347 Spark engine BatchCubingJobBuilder
implementation
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 4ebf40316889528ec8b31f840afba328c6aff4fe
Author: wangxiaojing <wa...@didichuxing.com>
AuthorDate: Wed May 6 14:56:54 2020 +0800
KYLIN-4347 Spark engine BatchCubingJobBuilder implementation
---
.../engine/spark/SparkBatchCubingJobBuilder2.java | 18 ++++++++++++++++++
1 file changed, 18 insertions(+)
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 20e509c..9309a3d 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
@@ -64,6 +65,23 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
// Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
inputSide.addStepPhase1_CreateFlatTable(result);
+ // build global dict
+ KylinConfig dictConfig = seg.getConfig();
+ String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
+
+ if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0
+ && !"".equals(mrHiveDictColumns[0])) {
+ //parallel part build
+ result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId));
+ //parallel total build
+ result.addTask(createBuildGlobalHiveDicTotalBuildJob(jobId));
+ }
+
+ // merge global dic and replace flat table
+ if(Objects.nonNull(dictConfig.getMrHiveDictColumns()) && dictConfig.getMrHiveDictColumns().length > 0 && !"".equals(dictConfig.getMrHiveDictColumns()[0])){
+ inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
+ }
+
// Phase 2: Build Dictionary
if (seg.getConfig().isSparkFactDistinctEnable()) {
result.addTask(createFactDistinctColumnsSparkStep(jobId));