You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/05/30 10:25:54 UTC

[kylin] 07/11: KYLIN-4347 Spark engine BatchCubingJobBuilder implementation

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 4ebf40316889528ec8b31f840afba328c6aff4fe
Author: wangxiaojing <wa...@didichuxing.com>
AuthorDate: Wed May 6 14:56:54 2020 +0800

    KYLIN-4347 Spark engine BatchCubingJobBuilder implementation
---
 .../engine/spark/SparkBatchCubingJobBuilder2.java      | 18 ++++++++++++++++++
 1 file changed, 18 insertions(+)

diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 20e509c..9309a3d 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.spark;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
@@ -64,6 +65,23 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
         // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
         inputSide.addStepPhase1_CreateFlatTable(result);
 
+        // build global dict
+        KylinConfig dictConfig = seg.getConfig();
+        String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
+
+        if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0
+                && !"".equals(mrHiveDictColumns[0])) {
+            //parallel part build
+            result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId));
+            //parallel total build
+            result.addTask(createBuildGlobalHiveDicTotalBuildJob(jobId));
+        }
+
+        // merge global dic and replace flat table
+        if(Objects.nonNull(dictConfig.getMrHiveDictColumns()) && dictConfig.getMrHiveDictColumns().length > 0 && !"".equals(dictConfig.getMrHiveDictColumns()[0])){
+            inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
+        }
+
         // Phase 2: Build Dictionary
         if (seg.getConfig().isSparkFactDistinctEnable()) {
             result.addTask(createFactDistinctColumnsSparkStep(jobId));