You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/05/30 10:25:58 UTC

[kylin] 11/11: KYLIN-4342 Improve code smell

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5731f43fcf350247e76fd7e36b0980d5cf9fc912
Author: XiaoxiangYu <hi...@126.com>
AuthorDate: Thu May 28 23:26:20 2020 +0800

    KYLIN-4342 Improve code smell
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  55 +++---
 .../kylin/job/constant/ExecutableConstants.java    |  12 +-
 .../bitmap/BitmapIntersectValueAggFunc.java        |  10 +-
 .../kylin/measure/bitmap/BitmapMeasureType.java    |   6 +-
 .../apache/kylin/metadata/model/FunctionDesc.java  |   4 +
 .../kylin/engine/mr/BatchCubingJobBuilder2.java    |  53 +++---
 .../java/org/apache/kylin/engine/mr/IInput.java    |  24 ++-
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  30 +--
 .../kylin/engine/mr/common/BaseCuboidBuilder.java  |   8 +-
 .../kylin/engine/mr/common/BatchConstants.java     |   8 +-
 .../mr/steps/BuildGlobalHiveDictPartBuildJob.java  |  45 +++--
 .../steps/BuildGlobalHiveDictPartBuildMapper.java  |  13 +-
 ...va => BuildGlobalHiveDictPartBuildReducer.java} |  20 +-
 ...ava => BuildGlobalHiveDictPartPartitioner.java} |  17 +-
 ....java => BuildGlobalHiveDictTotalBuildJob.java} |  17 +-
 .../steps/BuildGlobalHiveDictTotalBuildMapper.java |  93 +++++----
 .../engine/spark/SparkBatchCubingJobBuilder2.java  |  49 +++--
 .../localmeta/cube_desc/ci_inner_join_cube.json    |   3 +-
 kubernetes/README.md                               |  12 +-
 .../kylin/source/hive/CreateMrHiveDictStep.java    |  60 ++----
 .../apache/kylin/source/hive/HiveInputBase.java    | 210 +++++++++++----------
 .../apache/kylin/source/hive/MRHiveDictUtil.java   | 140 +++++++++-----
 22 files changed, 476 insertions(+), 413 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f7f73ac..3429963 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.common;
 
@@ -162,7 +162,6 @@ public abstract class KylinConfigBase implements Serializable {
     }
 
     /**
-     *
      * @param propertyKeys the collection of the properties; if null will return all properties
      * @return properties which contained in propertyKeys
      */
@@ -581,21 +580,28 @@ public abstract class KylinConfigBase implements Serializable {
 
 
     // ============================================================================
-    // mr-hive dict
+    // Hive Global dictionary by mr/hive
     // ============================================================================
     public String[] getMrHiveDictColumns() {
         String columnStr = getMrHiveDictColumnsStr();
-        if (!columnStr.equals("")) {
+        if (!StringUtils.isEmpty(columnStr)) {
             return columnStr.split(",");
         }
         return new String[0];
     }
 
+    /**
+     * @return the hdfs path for Hive Global dictionary table
+     */
+    public String getHiveDatabaseDir() {
+        return this.getOptional("kylin.source.hive.databasedir", "");
+    }
+
     public String[] getMrHiveDictColumnsExcludeRefColumns() {
         String[] excludeRefCols = null;
         String[] hiveDictColumns = getMrHiveDictColumns();
         Map<String, String> refCols = getMrHiveDictRefColumns();
-        if(Objects.nonNull(hiveDictColumns) && hiveDictColumns.length>0) {
+        if (Objects.nonNull(hiveDictColumns) && hiveDictColumns.length > 0) {
             excludeRefCols = Arrays.stream(hiveDictColumns).filter(x -> !refCols.containsKey(x)).toArray(String[]::new);
         }
         return excludeRefCols;
@@ -603,40 +609,40 @@ public abstract class KylinConfigBase implements Serializable {
 
     /**
      * set kylin.dictionary.mr-hive.columns in Cube level config , value are the columns which want to use MR/Hive to build global dict ,
-     * Format, tableAliasName_ColumnName, multiple columns separated by commas,eg KYLIN_SALES_BUYER_ID,KYLIN_SALES_SELLER_ID
-     * @return  if mr-hive dict not enabled, return "";
-     *          else return {TABLE_NAME}_{COLUMN_NAME1},{TABLE_NAME}_{COLUMN_NAME2}"
+     * Format, tableAliasName_ColumnName, multiple columns separated by comma,eg KYLIN_SALES_BUYER_ID,KYLIN_SALES_SELLER_ID
+     *
+     * @return if mr-hive dict not enabled, return "";
+     * else return {TABLE_NAME}_{COLUMN_NAME1},{TABLE_NAME}_{COLUMN_NAME2}"
      */
     private String getMrHiveDictColumnsStr() {
         return getOptional("kylin.dictionary.mr-hive.columns", "");
     }
 
     /**
-     * @return  The global dic reduce num per column. Default 2  per column.
+     * @return The global dic reduce num per column. Default 2  per column.
      */
     public Integer[] getMrHiveDictColumnsReduceNumExcludeRefCols() {
         String[] excludeRefCols = getMrHiveDictColumnsExcludeRefColumns();
 
-        if(Objects.nonNull(excludeRefCols) && excludeRefCols.length>0) {
+        if (Objects.nonNull(excludeRefCols) && excludeRefCols.length > 0) {
             String[] arr = null;
             Map<String, Integer> colNum = new HashMap<>();
             Integer[] reduceNumArr = new Integer[excludeRefCols.length];
             String[] columnReduceNum = getMrHiveDictColumnsReduceNumStr().split(",");
 
-            //change set columnReduceNum to map struct
             try {
-                for(int i=0;i<columnReduceNum.length;i++){
-                    if(!StringUtils.isBlank(columnReduceNum[i])) {
+                for (int i = 0; i < columnReduceNum.length; i++) {
+                    if (!StringUtils.isBlank(columnReduceNum[i])) {
                         arr = columnReduceNum[i].split(":");
                         colNum.put(arr[0], Integer.parseInt(arr[1]));
                     }
                 }
-            }catch (Exception e){
+            } catch (Exception e) {
                 logger.error("set kylin.dictionary.mr-hive.columns.reduce.num error {} , the value should like colAilasName:reduceNum,colAilasName:reduceNum", getMrHiveDictColumnsReduceNumStr());
             }
 
             for (int i = 0; i < excludeRefCols.length; i++) {
-                reduceNumArr[i] = colNum.containsKey(excludeRefCols[i])?colNum.get(excludeRefCols[i]): DEFAULT_MR_HIVE_GLOBAL_DICT_REDUCE_NUM_PER_COLUMN;
+                reduceNumArr[i] = colNum.containsKey(excludeRefCols[i]) ? colNum.get(excludeRefCols[i]) : DEFAULT_MR_HIVE_GLOBAL_DICT_REDUCE_NUM_PER_COLUMN;
             }
 
             Arrays.asList(reduceNumArr).stream().forEach(x -> {
@@ -646,7 +652,7 @@ public abstract class KylinConfigBase implements Serializable {
             });
 
             return reduceNumArr;
-        }else {
+        } else {
             return null;
         }
     }
@@ -654,15 +660,13 @@ public abstract class KylinConfigBase implements Serializable {
     /**
      * Set kylin.dictionary.mr-hive.columns.reduce.num in Cube level config , value are the reduce number for global dict columns which are set in kylin.dictionary.mr-hive.columns.
      * Format, tableAliasName_ColumnName:number, multiple columns separated by commas,eg KYLIN_SALES_BUYER_ID:5,KYLIN_SALES_SELLER_ID:3
-     * @return
      */
     private String getMrHiveDictColumnsReduceNumStr() {
         return getOptional("kylin.dictionary.mr-hive.columns.reduce.num", "");
     }
 
     /**
-     * MR/Hive global domain dic (reuse dict from other cube's MR/Hive global dic column)
-     * @return
+     * MR/Hive global domain dictionary (reuse dict from other cube's MR/Hive global dic column)
      */
     public Map<String, String> getMrHiveDictRefColumns() {
         Map<String, String> result = new HashMap<>();
@@ -670,7 +674,7 @@ public abstract class KylinConfigBase implements Serializable {
         if (!StringUtils.isEmpty(columnStr)) {
             String[] pairs = columnStr.split(",");
             for (String pair : pairs) {
-                String [] infos = pair.split(":");
+                String[] infos = pair.split(":");
                 result.put(infos[0], infos[1]);
             }
         }
@@ -685,8 +689,8 @@ public abstract class KylinConfigBase implements Serializable {
         return getOptional("kylin.dictionary.mr-hive.table.suffix", "_global_dict");
     }
 
-    public String getMrHiveDictIntermediateTTableSuffix() {
-        return getOptional("kylin.dictionary.mr-hive.intermediate.table.suffix", "__group_by");
+    public String getMrHiveDistinctValueTableSuffix() {
+        return getOptional("kylin.dictionary.mr-hive.intermediate.table.suffix", "__distinct_value");
     }
 
     // ============================================================================
@@ -1100,9 +1104,6 @@ public abstract class KylinConfigBase implements Serializable {
         return this.getOptional("kylin.source.hive.database-for-flat-table", DEFAULT);
     }
 
-    public String getHiveDatabaseDir() {
-        return this.getOptional("kylin.source.hive.databasedir", "");
-    }
 
     public String getFlatTableStorageFormat() {
         return this.getOptional("kylin.source.hive.flat-table-storage-format", "SEQUENCEFILE");
@@ -2326,7 +2327,7 @@ public abstract class KylinConfigBase implements Serializable {
         return getPropertiesByPrefix("kylin.metrics.");
     }
 
-    public int printSampleEventRatio(){
+    public int printSampleEventRatio() {
         String val = getOptional("kylin.metrics.kafka-sample-ratio", "10000");
         return Integer.parseInt(val);
     }
@@ -2558,7 +2559,7 @@ public abstract class KylinConfigBase implements Serializable {
         return (getOptional("kylin.stream.event.timezone", ""));
     }
 
-    public boolean isAutoResubmitDiscardJob(){
+    public boolean isAutoResubmitDiscardJob() {
         return Boolean.parseBoolean(getOptional("kylin.stream.auto-resubmit-after-discard-enabled", "true"));
     }
 
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 8d4de09..576f4bf 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -84,12 +84,12 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_STREAMING_BUILD_BASE_CUBOID = "Build Base Cuboid Data For Streaming Job";
     public static final String STEP_NAME_STREAMING_SAVE_DICTS = "Save Cube Dictionaries";
 
-    // MR - Hive Dict
-    public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL = "Build Global Dict - extract distinct value from data";
-    public static final String STEP_NAME_GLOBAL_DICT_PART_BUILD_DICTVAL = "Build Global Dict - parallel part build";
-    public static final String STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL = "Build Global Dict - parallel total build";
-    public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL = "Build Global Dict - merge to dict table";
-    public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = "Build Global Dict - replace intermediate table";
+    // Hive Global Dictionary built by MR
+    public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL = "Build Hive Global Dict - extract distinct value";
+    public static final String STEP_NAME_GLOBAL_DICT_PART_BUILD_DICTVAL = "Build Hive Global Dict - parallel part build";
+    public static final String STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL = "Build Hive Global Dict - parallel total build";
+    public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL = "Build Hive Global Dict - merge to dict table";
+    public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = "Build Hive Global Dict - replace intermediate table";
 
     public static final String FLINK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY = "mergedict";
 }
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectValueAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectValueAggFunc.java
index 7ec21b5..2ab4313 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectValueAggFunc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectValueAggFunc.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package org.apache.kylin.measure.bitmap;
 
 import java.util.List;
@@ -22,10 +22,7 @@ import java.util.List;
 import org.apache.kylin.measure.ParamAsMeasureCount;
 
 /**
- * BitmapIntersectDistinctCountAggFunc is an UDAF used for calculating the intersection of two or more bitmaps
- * Usage:   intersect_count(columnToCount, columnToFilter, filterList)
- * Example: intersect_count(uuid, event, array['A', 'B', 'C']), meaning find the count of uuid in all A/B/C 3 bitmaps
- *          requires an bitmap count distinct measure of uuid, and an dimension of event
+ *
  */
 public class BitmapIntersectValueAggFunc implements ParamAsMeasureCount {
 
@@ -50,5 +47,4 @@ public class BitmapIntersectValueAggFunc implements ParamAsMeasureCount {
     public static String result(RetentionPartialResult result) {
         return result.valueResult();
     }
-}
-
+}
\ No newline at end of file
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
index 70a64ea..9d95584 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
@@ -112,8 +112,8 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
                 int id;
                 TblColRef literalCol = measureDesc.getFunction().getParameter().getColRefs().get(0);
                 if (needDictionaryColumn(measureDesc.getFunction()) && dictionaryMap.containsKey(literalCol)) {
-                        Dictionary<String> dictionary = dictionaryMap.get(literalCol);
-                        id = dictionary.getIdFromValue(values[0]);
+                    Dictionary<String> dictionary = dictionaryMap.get(literalCol);
+                    id = dictionary.getIdFromValue(values[0]);
                 } else {
                     id = Integer.parseInt(values[0]);
                 }
@@ -153,6 +153,8 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
     private boolean needDictionaryColumn(FunctionDesc functionDesc) {
         DataType dataType = functionDesc.getParameter().getColRefs().get(0).getType();
         if (functionDesc.isMrDict()) {
+            // If isMrDict set to true, it means related column has been
+            // encoded in previous step by Hive Global Dictionary
             return false;
         }
         if (dataType.isIntegerFamily() && !dataType.isBigInt()) {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index 93b4064..c4d002b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -88,6 +88,10 @@ public class FunctionDesc implements Serializable {
     private DataType returnDataType;
     private MeasureType<?> measureType;
     private boolean isDimensionAsMetric = false;
+
+    /**
+     * The flag of Hive Global Dictionary for COUNT_DISTINCT
+     */
     private boolean isMrDict = false;
 
     public boolean isMrDict() {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 8ec7d36..47f709d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -6,20 +6,21 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.engine.mr;
 
 import java.util.List;
 import java.util.Objects;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidUtil;
@@ -59,24 +60,8 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
         inputSide.addStepPhase1_CreateFlatTable(result);
 
-        // build global dict
-        KylinConfig dictConfig = seg.getConfig();
-        String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
-
-        if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0
-                && !"".equals(mrHiveDictColumns[0])) {
-
-            //parallel part build
-            result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId));
-
-            //parallel total build
-            result.addTask(createBuildGlobalHiveDicTotalBuildJob(jobId));
-        }
-
-        //merge global dic and replace flat table
-        if(Objects.nonNull(dictConfig.getMrHiveDictColumns()) && dictConfig.getMrHiveDictColumns().length > 0 && !"".equals(dictConfig.getMrHiveDictColumns()[0])){
-            inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
-        }
+        // Build global dictionary in distributed way
+        buildHiveGlobalDictionaryByMR(result, jobId);
 
         // Phase 2: Build Dictionary
         result.addTask(createFactDistinctColumnsStep(jobId));
@@ -106,7 +91,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
         inputSide.addStepPhase4_Cleanup(result);
         outputSide.addStepPhase4_Cleanup(result);
-        
+
         // Set the task priority if specified
         result.setPriorityBasedOnPriorityOffset(priorityOffset);
         result.getTasks().forEach(task -> task.setPriorityBasedOnPriorityOffset(priorityOffset));
@@ -216,6 +201,30 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
         return ndCuboidStep;
     }
 
+    /**
+     * Build hive global dictionary by MR and encode corresponding column into integer for flat table
+     */
+    protected void buildHiveGlobalDictionaryByMR(final CubingJob result, String jobId) {
+        KylinConfig dictConfig = seg.getConfig();
+        String[] mrHiveDictColumnExcludeRef = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
+        String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns();
+
+        if (Objects.nonNull(mrHiveDictColumnExcludeRef) && mrHiveDictColumnExcludeRef.length > 0
+                && !"".equals(mrHiveDictColumnExcludeRef[0])) {
+
+            // 1. parallel part build
+            result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId));
+
+            // 2. parallel total build
+            result.addTask(createBuildGlobalHiveDictTotalBuildJob(jobId));
+        }
+
+        // Merge new dictionary entry into global dictionary and replace/encode flat table
+        if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0 && !"".equals(mrHiveDictColumns[0])) {
+            inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
+        }
+    }
+
     protected Class<? extends AbstractHadoopJob> getNDCuboidJob() {
         return NDCuboidJob.class;
     }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
index 9fdb300..2775cb7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
@@ -24,26 +24,38 @@ import org.apache.kylin.metadata.model.ISegment;
 
 public interface IInput {
 
-    /** Return a helper to participate in batch cubing job flow. */
+    /**
+     * Return a helper to participate in batch cubing job flow.
+     */
     public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
 
-    /** Return a helper to participate in batch cubing merge job flow. */
+    /**
+     * Return a helper to participate in batch cubing merge job flow.
+     */
     public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
 
     public interface IBatchCubingInputSide {
-        /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
+        /**
+         * Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc
+         */
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
 
-        /** Add step that replace flat table global column value by global dic*/
+        /**
+         * An optional step that replace/encode flat table with Hive Global Dictionary
+         */
         public void addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable jobFlow);
 
-        /** Add step that does necessary clean up, like delete the intermediate flat table */
+        /**
+         * Add step that does necessary clean up, like delete the intermediate flat table
+         */
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
     public interface IBatchMergeInputSide {
 
-        /** Add step that executes before merge dictionary and before merge cube. */
+        /**
+         * Add step that executes before merge dictionary and before merge cube.
+         */
         public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
 
     }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index a597279..479db86 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.engine.mr;
 
@@ -37,7 +37,7 @@ import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDicTotalBuildJob;
+import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDictTotalBuildJob;
 import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDictPartBuildJob;
 import org.apache.kylin.engine.mr.steps.CalculateStatsFromBaseCuboidJob;
 import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
@@ -155,7 +155,7 @@ public class JobBuilderSupport {
     }
 
     public MapReduceExecutable createCalculateStatsFromBaseCuboid(String inputPath, String outputPath,
-            CuboidModeEnum cuboidMode) {
+                                                                  CuboidModeEnum cuboidMode) {
         MapReduceExecutable result = new MapReduceExecutable();
         result.setName(ExecutableConstants.STEP_NAME_CALCULATE_STATS_FROM_BASE_CUBOID);
         result.setMapReduceJobClass(CalculateStatsFromBaseCuboidJob.class);
@@ -224,10 +224,10 @@ public class JobBuilderSupport {
         return result;
     }
 
-    public MapReduceExecutable createBuildGlobalHiveDicTotalBuildJob(String jobId) {
+    public MapReduceExecutable createBuildGlobalHiveDictTotalBuildJob(String jobId) {
         MapReduceExecutable result = new MapReduceExecutable();
         result.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL);
-        result.setMapReduceJobClass(BuildGlobalHiveDicTotalBuildJob.class);
+        result.setMapReduceJobClass(BuildGlobalHiveDictTotalBuildJob.class);
         StringBuilder cmd = new StringBuilder();
         appendMapReduceParameters(cmd);
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
@@ -384,26 +384,26 @@ public class JobBuilderSupport {
     }
 
     public String getBuildGlobalHiveDicTotalBuildJobInputPath(String jobId) {
-        return getBuildGlobalDictionaryBasePath(jobId)+"/part_sort";
+        return getBuildGlobalDictionaryBasePath(jobId) + "/part_sort";
     }
 
     public String getBuildGlobalDictionaryMaxDistinctCountPath(String jobId) {
         KylinConfig conf = seg.getConfig();
         String dbDir = conf.getHiveDatabaseDir();
         IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
-        String tableName = flatDesc.getTableName()+conf.getMrHiveDictIntermediateTTableSuffix();
-        String outPut = dbDir+"/"+tableName+"/dict_column="+BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE;
+        String tableName = flatDesc.getTableName() + conf.getMrHiveDistinctValueTableSuffix();
+        String outPut = dbDir + "/" + tableName + "/dict_column=" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE;
         return outPut;
     }
 
     public String getBuildGlobalDictionaryPartReduceStatsPathV2(String jobId) {
-        return getBuildGlobalDictionaryBasePath(jobId)+ "/reduce_stats";
+        return getBuildGlobalDictionaryBasePath(jobId) + "/reduce_stats";
     }
 
-    public String getBuildGlobalDictionaryTotalOutput(KylinConfig config){
+    public String getBuildGlobalDictionaryTotalOutput(KylinConfig config) {
         String dbDir = config.getHiveDatabaseDir();
-        String tableName = EngineFactory.getJoinedFlatTableDesc(seg).getTableName()+config.getMrHiveDictTableSuffix();
-        String path = dbDir+"/"+tableName;
+        String tableName = EngineFactory.getJoinedFlatTableDesc(seg).getTableName() + config.getMrHiveDictTableSuffix();
+        String path = dbDir + "/" + tableName;
         return path;
     }
 
@@ -509,7 +509,7 @@ public class JobBuilderSupport {
         List<FileStatus> outputs = Lists.newArrayList();
         scanFiles(input, fs, outputs);
         long size = 0L;
-        for (FileStatus stat: outputs) {
+        for (FileStatus stat : outputs) {
             size += stat.getLen();
         }
         return size;
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
index aa377ed..baf415f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
@@ -73,7 +73,7 @@ public class BaseCuboidBuilder implements java.io.Serializable {
         measureCodec = new BufferedMeasureCodec(measureDescList);
 
         kvBuilder = new KeyValueBuilder(intermediateTableDesc);
-        checkMrDictClolumn();
+        checkHiveGlobalDictionaryColumn();
     }
 
     public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment,
@@ -92,7 +92,7 @@ public class BaseCuboidBuilder implements java.io.Serializable {
         measureCodec = new BufferedMeasureCodec(measureDescList);
 
         kvBuilder = new KeyValueBuilder(intermediateTableDesc);
-        checkMrDictClolumn();
+        checkHiveGlobalDictionaryColumn();
     }
 
     public byte[] buildKey(String[] flatRow) {
@@ -121,7 +121,7 @@ public class BaseCuboidBuilder implements java.io.Serializable {
         }
     }
 
-    private void checkMrDictClolumn(){
+    private void checkHiveGlobalDictionaryColumn(){
         Set<String> mrDictColumnSet = new HashSet<>();
         if (kylinConfig.getMrHiveDictColumns() != null) {
             Collections.addAll(mrDictColumnSet, kylinConfig.getMrHiveDictColumns());
@@ -133,7 +133,7 @@ public class BaseCuboidBuilder implements java.io.Serializable {
                 TblColRef colRef = functionDesc.getParameter().getColRefs().get(0);
                 if (mrDictColumnSet.contains(JoinedFlatTable.colName(colRef, true))) {
                     functionDesc.setMrDict(true);
-                    logger.info("setMrDict for {}", colRef);
+                    logger.info("Enable hive global dictionary for {}", colRef);
                     measure.setFunction(functionDesc);
                 }
             }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 6031f3c..f8ab007 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.engine.mr.common;
 
@@ -72,7 +72,7 @@ public interface BatchConstants {
     String CFG_MR_SPARK_JOB = "mr.spark.job";
     String CFG_SPARK_META_URL = "spark.meta.url";
     String CFG_GLOBAL_DICT_BASE_DIR = "global.dict.base.dir";
-    String CFG_GLOBAL_DICT_STATS_PARTITION_VALUE="KYLIN_MAX_DISTINCT_COUNT";
+    String CFG_GLOBAL_DICT_STATS_PARTITION_VALUE = "KYLIN_MAX_DISTINCT_COUNT";
 
 
     String CFG_HLL_REDUCER_NUM = "cuboidHLLCounterReducerNum";
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java
index 07b0824..c51cd11 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
+
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -47,7 +48,7 @@ public class BuildGlobalHiveDictPartBuildJob extends AbstractHadoopJob {
     @Override
     public int run(String[] args) throws Exception {
         Options options = new Options();
-        String[] dicColsArr=null;
+        String[] dicColsArr = null;
 
         try {
             options.addOption(OPTION_JOB_NAME);
@@ -78,7 +79,7 @@ public class BuildGlobalHiveDictPartBuildJob extends AbstractHadoopJob {
             setJobClasspath(job, cube.getConfig());
 
             //FileInputFormat.setInputPaths(job, input);
-            setInputput(job, dicColsArr, getInputPath(config, segment));
+            setInput(job, dicColsArr, getInputPath(config, segment));
 
             // make each reducer output to respective dir
             setOutput(job, dicColsArr, getOptionValue(OPTION_OUTPUT_PATH));
@@ -94,15 +95,14 @@ public class BuildGlobalHiveDictPartBuildJob extends AbstractHadoopJob {
             job.setOutputValueClass(Text.class);
 
             job.setMapperClass(BuildGlobalHiveDictPartBuildMapper.class);
-
-            job.setPartitionerClass(BuildGlobalHiveDicPartPartitioner.class);
-            job.setReducerClass(BuildGlobalHiveDicPartBuildReducer.class);
+            job.setPartitionerClass(BuildGlobalHiveDictPartPartitioner.class);
+            job.setReducerClass(BuildGlobalHiveDictPartBuildReducer.class);
 
             // prevent to create zero-sized default output
             LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
 
-            //delete output
-            Path baseOutputPath =new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            // delete output
+            Path baseOutputPath = new Path(getOptionValue(OPTION_OUTPUT_PATH));
             deletePath(job.getConfiguration(), baseOutputPath);
 
             attachSegmentMetadataWithDict(segment, job.getConfiguration());
@@ -113,44 +113,41 @@ public class BuildGlobalHiveDictPartBuildJob extends AbstractHadoopJob {
         }
     }
 
-    private void setOutput(Job job, String[] dicColsArry, String outputBase){
+    private void setOutput(Job job, String[] dicColsArr, String outputBase) {
         // make each reducer output to respective dir
-        //eg: /user/kylin/tmp/kylin/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/kylin_sales_cube_mr/dict_column=KYLIN_SALES_SELLER_ID/part_sort
-        for(int i=0;i<dicColsArry.length;i++){
+        // eg: /user/kylin/tmp/kylin/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/kylin_sales_cube_mr/dict_column=KYLIN_SALES_SELLER_ID/part_sort
+        for (int i = 0; i < dicColsArr.length; i++) {
             MultipleOutputs.addNamedOutput(job, i + "", TextOutputFormat.class, LongWritable.class, Text.class);
         }
-        Path outputPath=new Path(outputBase);
+        Path outputPath = new Path(outputBase);
         FileOutputFormat.setOutputPath(job, outputPath);
     }
 
-    private void setInputput(Job job, String[] dicColsArray, String inputBase) throws IOException {
-        StringBuffer paths=new StringBuffer();
+    private void setInput(Job job, String[] dicColsArray, String inputBase) throws IOException {
+        StringBuffer paths = new StringBuffer();
         // make each reducer output to respective dir
-        for(String col:dicColsArray){
+        for (String col : dicColsArray) {
             paths.append(inputBase).append("/dict_column=").append(col).append(",");
         }
-
         paths.delete(paths.length() - 1, paths.length());
         FileInputFormat.setInputPaths(job, paths.toString());
-
     }
 
-    private void setReduceNum(Job job, KylinConfig config){
+    private void setReduceNum(Job job, KylinConfig config) {
         Integer[] reduceNumArr = config.getMrHiveDictColumnsReduceNumExcludeRefCols();
         int totalReduceNum = 0;
-        for(Integer num:reduceNumArr){
-            totalReduceNum +=num;
+        for (Integer num : reduceNumArr) {
+            totalReduceNum += num;
         }
         logger.info("BuildGlobalHiveDictPartBuildJob total reduce num is {}", totalReduceNum);
         job.setNumReduceTasks(totalReduceNum);
     }
 
-    private String getInputPath(KylinConfig config, CubeSegment segment){
+    private String getInputPath(KylinConfig config, CubeSegment segment) {
         String dbDir = config.getHiveDatabaseDir();
-        String tableName = EngineFactory.getJoinedFlatTableDesc(segment).getTableName()+config.getMrHiveDictIntermediateTTableSuffix();
-        String input = dbDir+"/"+tableName;
-        logger.info("part build base input path:"+input);
+        String tableName = EngineFactory.getJoinedFlatTableDesc(segment).getTableName() + config.getMrHiveDistinctValueTableSuffix();
+        String input = dbDir + "/" + tableName;
+        logger.info("part build base input path:" + input);
         return input;
     }
-
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java
index 54708f3..76c73f9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -62,12 +63,12 @@ public class BuildGlobalHiveDictPartBuildMapper<KEYIN, Object> extends KylinMapp
         String colName = name.split("=")[1];
         logger.info("this map build col name :{}", colName);
 
-        for(int i=0;i<dicCols.length;i++){
-            if(dicCols[i].equalsIgnoreCase(colName)){
-                colIndex=i;
+        for (int i = 0; i < dicCols.length; i++) {
+            if (dicCols[i].equalsIgnoreCase(colName)) {
+                colIndex = i;
             }
         }
-        if(colIndex<0 || colIndex>127){
+        if (colIndex < 0 || colIndex > 127) {
             logger.error("kylin.dictionary.mr-hive.columns colIndex :{} error ", colIndex);
             logger.error("kylin.dictionary.mr-hive.columns set error,mr-hive columns's count should less than 128");
         }
@@ -77,7 +78,7 @@ public class BuildGlobalHiveDictPartBuildMapper<KEYIN, Object> extends KylinMapp
 
     @Override
     public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
-        count ++;
+        count++;
         writeFieldValue(context, key.toString());
     }
 
@@ -94,7 +95,7 @@ public class BuildGlobalHiveDictPartBuildMapper<KEYIN, Object> extends KylinMapp
         tmpbuf.put(valueBytes);
         outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
         context.write(outputKey, NullWritable.get());
-        if(count<10){
+        if (count < 10) {
             logger.info("colIndex:{},input key:{}", colIndex, value);
         }
     }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildReducer.java
similarity index 86%
rename from engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java
rename to engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildReducer.java
index 8cdd8f1..54cd4b9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildReducer.java
@@ -14,11 +14,12 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -31,11 +32,11 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BuildGlobalHiveDicPartBuildReducer extends KylinReducer<Text, LongWritable, LongWritable, Text> {
+public class BuildGlobalHiveDictPartBuildReducer extends KylinReducer<Text, LongWritable, LongWritable, Text> {
 
-    private static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDicPartBuildReducer.class);
+    private static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDictPartBuildReducer.class);
 
-    private Long count=0L;
+    private Long count = 0L;
     private MultipleOutputs mos;
     private String[] dicCols;
     private String colName;
@@ -58,16 +59,16 @@ public class BuildGlobalHiveDicPartBuildReducer extends KylinReducer<Text, LongW
             throws IOException, InterruptedException {
         count++;
         byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
-        if(count==1){
-            colIndex = key.getBytes()[0];//col index
+        if (count == 1) {
+            colIndex = key.getBytes()[0];
             colName = dicCols[colIndex];
         }
 
-        if(count<10){
+        if (count < 10) {
             logger.info("key:{}, temp dict num :{}, colIndex:{}, colName:{}", key.toString(), count, colIndex, colName);
         }
 
-        mos.write(colIndex+"", new LongWritable(count), new Text(keyBytes), "part_sort/"+colIndex);
+        mos.write(colIndex + "", new LongWritable(count), new Text(keyBytes), "part_sort/" + colIndex);
     }
 
     @Override
@@ -77,7 +78,6 @@ public class BuildGlobalHiveDicPartBuildReducer extends KylinReducer<Text, LongW
         String partition = conf.get(MRJobConfig.TASK_PARTITION);
         mos.write(colIndex + "", new LongWritable(count), new Text(partition), "reduce_stats/" + colIndex);
         mos.close();
-        logger.info("Reduce partition num {} finish, this reduce done item count is {}" , partition, count);
+        logger.info("Reduce partition num {} finish, this reduce done item count is {}", partition, count);
     }
-
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartPartitioner.java
similarity index 79%
rename from engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java
rename to engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartPartitioner.java
index 97ad4f4..8858e20 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartPartitioner.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
+
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
@@ -28,7 +29,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 
-public class BuildGlobalHiveDicPartPartitioner extends Partitioner<Text, NullWritable> implements Configurable {
+public class BuildGlobalHiveDictPartPartitioner extends Partitioner<Text, NullWritable> implements Configurable {
     private Configuration conf;
 
     private Integer[] reduceNumArr;
@@ -49,21 +50,19 @@ public class BuildGlobalHiveDicPartPartitioner extends Partitioner<Text, NullWri
 
     @Override
     public int getPartition(Text key, NullWritable value, int numReduceTasks) {
-        //get first byte, the first byte value is the dic col index ,start from 0
+        // get first byte, the first byte value is the dic col index ,start from 0
         int colIndex = key.getBytes()[0];
         int colReduceNum = reduceNumArr[colIndex];
 
         int colReduceNumOffset = 0;
-        for (int i=0;i<colIndex;i++){
-            colReduceNumOffset += reduceNumArr[i] ;
+        for (int i = 0; i < colIndex; i++) {
+            colReduceNumOffset += reduceNumArr[i];
         }
 
-        //Calculate reduce number , reduce num = (value.hash % colReduceNum) + colReduceNumOffset
+        // Calculate reduce number , reduce num = (value.hash % colReduceNum) + colReduceNumOffset
         byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
-        int hashCode = new Text(keyBytes).hashCode() &  0x7FFFFFFF ;
-        int reduceNo = hashCode % colReduceNum + colReduceNumOffset;
-
-        return reduceNo;
+        int hashCode = new Text(keyBytes).hashCode() & 0x7FFFFFFF;
+        return hashCode % colReduceNum + colReduceNumOffset;
     }
 
     @Override
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildJob.java
similarity index 92%
rename from engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java
rename to engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildJob.java
index acdbb07..20bdfc7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildJob.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
+
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -40,8 +41,8 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob {
-    protected static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDicTotalBuildJob.class);
+public class BuildGlobalHiveDictTotalBuildJob extends AbstractHadoopJob {
+    protected static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDictTotalBuildJob.class);
 
     @Override
     public int run(String[] args) throws Exception {
@@ -77,7 +78,7 @@ public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob {
             job.getConfiguration().set("last.max.dic.value.path", getOptionValue(OPTION_GLOBAL_DIC_MAX_DISTINCT_COUNT));
             job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", false);
 
-            job.setJarByClass(BuildGlobalHiveDicTotalBuildJob.class);
+            job.setJarByClass(BuildGlobalHiveDictTotalBuildJob.class);
 
             setJobClasspath(job, cube.getConfig());
 
@@ -95,8 +96,8 @@ public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob {
             // prevent to create zero-sized default output
             LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
 
-            //delete output
-            Path baseOutputPath =new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            // delete output
+            Path baseOutputPath = new Path(getOptionValue(OPTION_OUTPUT_PATH));
             deletePath(job.getConfiguration(), baseOutputPath);
 
             attachSegmentMetadataWithDict(segment, job.getConfiguration());
@@ -107,10 +108,10 @@ public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob {
         }
     }
 
-    private void setOutput(Job job, String[] dicColsArry, String outputBase){
+    private void setOutput(Job job, String[] dicColsArr, String outputBase) {
         // make each reducer output to respective dir
         ///user/prod_kylin/tmp/kylin2/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/bs_order_scene_day_new_cube_clone/dict_column=DM_ES_REPORT_ORDER_VIEW0420_DRIVER_ID/part_sort
-        for(int i=0;i<dicColsArry.length;i++){
+        for (int i = 0; i < dicColsArr.length; i++) {
             MultipleOutputs.addNamedOutput(job, i + "", TextOutputFormat.class, Text.class, LongWritable.class);
         }
         Path outputPath = new Path(outputBase);
@@ -120,7 +121,7 @@ public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob {
     private void setInput(Job job, String input) throws IOException {
         Path path = new Path(input);
         FileSystem fs = path.getFileSystem(job.getConfiguration());
-        if(!fs.exists(path)){
+        if (!fs.exists(path)) {
             fs.mkdirs(path);
         }
         FileInputFormat.setInputPaths(job, getOptionValue(OPTION_INPUT_PATH));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java
index b2252c0..8af341c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java
@@ -21,10 +21,12 @@ package org.apache.kylin.engine.mr.steps;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -40,7 +42,7 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMapper<KEYIN, Text, Text, LongWritable> {
+public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, KEYOUT> extends KylinMapper<KEYIN, Text, Text, LongWritable> {
     private static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDictTotalBuildMapper.class);
 
     private MultipleOutputs mos;
@@ -65,26 +67,26 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
 
         String statPath = conf.get("partition.statistics.path");
 
-        //get the input file name ,the file name format by colIndex-part-partitionNum, eg: 1-part-000019
+        // get the input file name ,the file name format by colIndex-part-partitionNum, eg: 1-part-000019
         FileSplit fileSplit = (FileSplit) context.getInputSplit();
         String[] arr = fileSplit.getPath().getName().split("-");
         int partitionNum = Integer.parseInt(arr[2]);
         colIndex = Integer.parseInt(arr[0]);
         colName = cols[colIndex];
-        logger.info("Input fileName:{},colIndex:{},colName:{},partitionNum:{}", fileSplit.getPath().getName(), colIndex, colName, partitionNum);
+        logger.info("Input fileName:{}, colIndex:{}, colName:{}, partitionNum:{}", fileSplit.getPath().getName(), colIndex, colName, partitionNum);
 
         //last max dic value per column
         String lastMaxValuePath = conf.get("last.max.dic.value.path");
-        logger.info("last.max.dic.value.path:"+lastMaxValuePath);
+        logger.info("last.max.dic.value.path:" + lastMaxValuePath);
         long lastMaxDictValue = this.getLastMaxDicValue(conf, lastMaxValuePath);
-        logger.info("last.max.dic.value.path:"+lastMaxValuePath+",value="+lastMaxDictValue);
+        logger.info("last.max.dic.value.path:" + lastMaxValuePath + ",value=" + lastMaxDictValue);
 
-        //Calculate the starting position of this file, the starting position of this file = sum (count) of all previous numbers + last max dic value of the column
-        Map<Integer, TreeMap<Integer, Long>> allStats = getPartitionsCount(conf, statPath);//<colIndex,<reduceNum,count>>
-        TreeMap<Integer, Long> partitionStats =allStats.get(colIndex);
-        if(partitionNum!=0) {
+        // Calculate the starting position of this file, the starting position of this file = sum (count) of all previous numbers + last max dic value of the column
+        Map<Integer, TreeMap<Integer, Long>> allStats = getPartitionsCount(conf, statPath); //<colIndex,<reduceNum,count>>
+        TreeMap<Integer, Long> partitionStats = allStats.get(colIndex);
+        if (partitionNum != 0) {
             SortedMap<Integer, Long> subStat = partitionStats.subMap(0, true, partitionNum, false);
-            subStat.forEach((k, v)->{
+            subStat.forEach((k, v) -> {
                 logger.info("Split num:{} and it's count:{}", k, v);
                 start += v;
             });
@@ -96,7 +98,7 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
     @Override
     public void doMap(KEYIN key, Text record, Context context) throws IOException, InterruptedException {
         long inkey = Long.parseLong(key.toString());
-        mos.write(colIndex+"", record, new LongWritable(start + inkey), "dict_column="+colName+"/"+colIndex);
+        mos.write(colIndex + "", record, new LongWritable(start + inkey), "dict_column=" + colName + "/" + colIndex);
     }
 
     @Override
@@ -106,7 +108,7 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
 
     private Map<Integer, TreeMap<Integer, Long>> getPartitionsCount(Configuration conf, String partitionStatPath) throws IOException {
         StringBuffer sb = new StringBuffer();
-        String temp=null;
+        String temp = null;
 
         String[] fileNameArr = null;
         String[] statsArr = null;
@@ -121,14 +123,14 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
         if (fs.exists(path) && fs.isDirectory(path)) {
             for (FileStatus status : fs.listStatus(path)) {
                 //fileNameArr[0] is globaldict colIndex
-                fileNameArr=status.getPath().getName().split("-");
+                fileNameArr = status.getPath().getName().split("-");
                 colStats = allStats.get(Integer.parseInt(fileNameArr[0]));
-                if(colStats==null){
+                if (colStats == null) {
                     colStats = new TreeMap<>();
                 }
-                temp=cat(status.getPath(), fs);
+                temp = cat(status.getPath(), fs);
                 logger.info("partitionStatPath:{},content:{}", partitionStatPath, temp);
-                if(temp!=null){
+                if (temp != null) {
                     statsArr = temp.split("\t");
                     colStats.put(Integer.parseInt(statsArr[1]), Long.parseLong(statsArr[0]));
                     allStats.put(Integer.parseInt(fileNameArr[0]), colStats);
@@ -136,8 +138,8 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
             }
         }
 
-        allStats.forEach((k, v)->{
-            v.forEach((k1, v1)->{
+        allStats.forEach((k, v) -> {
+            v.forEach((k1, v1) -> {
                 logger.info("allStats.colIndex:{},this split num:{},this split num's count:{}", k, k1, v1);
             });
         });
@@ -148,21 +150,21 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
     private String cat(Path remotePath, FileSystem fs) throws IOException {
         FSDataInputStream in = null;
         BufferedReader buffer = null;
-        StringBuffer stat= new StringBuffer();
+        StringBuffer stat = new StringBuffer();
         try {
-            in= fs.open(remotePath);
-            buffer= new BufferedReader(new InputStreamReader(in, "UTF-8")) ;
+            in = fs.open(remotePath);
+            buffer = new BufferedReader(new InputStreamReader(in, "UTF-8"));
             String line = null;
             while ((line = buffer.readLine()) != null) {
                 stat.append(line);
             }
         } catch (IOException e) {
             e.printStackTrace();
-        }finally {
-            if(buffer!=null) {
+        } finally {
+            if (buffer != null) {
                 buffer.close();
             }
-            if(in!=null) {
+            if (in != null) {
                 in.close();
             }
         }
@@ -170,15 +172,12 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
     }
 
     /**
-     *
-     * @param conf
      * @param lastMaxDicValuePath eg: /user/kylin/warehouse/db/kylin_intermediate_kylin_sales_cube_mr_6222c210_ce2d_e8ce_dd0f_f12c38fa9115__group_by/dict_column=KYLIN_MAX_DISTINCT_COUNT/part-00000-450ee120-39ff-4806-afaf-ed482ceffc68-c000
-     *        remotePath content is dict colum stats info of per column: dic column name,extract distinct value count,last max dic value
+     *                            remotePath content is dict column stats info of per column: dic column name,extract distinct value count,last max dic value
      * @return this colIndex's last max dic value
-     * @throws IOException
      */
     private long getLastMaxDicValue(Configuration conf, String lastMaxDicValuePath) throws IOException {
-        StringBuffer sb=new StringBuffer();
+        StringBuffer sb = new StringBuffer();
         Map<Integer, Long> map = null;
         Path path = new Path(lastMaxDicValuePath);
         FileSystem fs = path.getFileSystem(conf);
@@ -187,39 +186,35 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
                 logger.info("start buildMaxCountMap :");
                 map = buildMaxCountMap(status.getPath(), fs);
                 logger.info("end buildMaxCountMap :");
-
             }
         }
-        if(map == null){
+        if (map == null) {
             return 0L;
-        }else{
-            return  map.get(colIndex)==null?0L:map.get(colIndex);
+        } else {
+            return map.get(colIndex) == null ? 0L : map.get(colIndex);
         }
     }
 
     /**
-     *
      * @param remotePath , eg: /user/kylin/warehouse/db/kylin_intermediate_kylin_sales_cube_mr_6222c210_ce2d_e8ce_dd0f_f12c38fa9115__group_by/dict_column=KYLIN_MAX_DISTINCT_COUNT/part-00000-450ee120-39ff-4806-afaf-ed482ceffc68-c000
-     *        remotePath content is dict colum stats info of per column: dic column name,extract distinct value count,last max dic value
-     * @param fs
+     *                   remotePath content is dict column stats info of per column: dic column name,extract distinct value count,last max dic value
      * @return Map<>,key is colIndex, value is last max dict value
-     * @throws IOException
      */
-    private  Map<Integer, Long> buildMaxCountMap(Path remotePath, FileSystem fs) throws IOException {
+    private Map<Integer, Long> buildMaxCountMap(Path remotePath, FileSystem fs) throws IOException {
         FSDataInputStream in = null;
         BufferedReader buffer = null;
-        String[] arr=null;
-        Map<Integer, Long> map= new HashMap();
+        String[] arr = null;
+        Map<Integer, Long> map = new HashMap<>();
         try {
-            in= fs.open(remotePath);
-            buffer= new BufferedReader(new InputStreamReader(in, "UTF-8")) ;
+            in = fs.open(remotePath);
+            buffer = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
             String line = null;
             while ((line = buffer.readLine()) != null) {
                 arr = line.split(",");
-                logger.info("line="+line+",arr.length:"+arr.length);
-                if(arr.length==3) {
+                logger.info("line=" + line + ",arr.length:" + arr.length);
+                if (arr.length == 3) {
                     for (int i = 0; i < cols.length; i++) {
-                        if(cols[i].equalsIgnoreCase(arr[0])) {
+                        if (cols[i].equalsIgnoreCase(arr[0])) {
                             map.put(i, Long.parseLong(arr[2]));
                             logger.info("col.{}.maxValue={}", cols[i], Long.parseLong(arr[2]));
                             break;
@@ -229,15 +224,15 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
             }
         } catch (IOException e) {
             e.printStackTrace();
-        }finally {
-            if(buffer!=null) {
+        } finally {
+            if (buffer != null) {
                 buffer.close();
             }
-            if(in!=null) {
+            if (in != null) {
                 in.close();
             }
         }
-        logger.info("BuildMaxCountMap map="+map);
+        logger.info("BuildMaxCountMap map=" + map);
         return map;
     }
 }
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 9309a3d..7d6a367 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.engine.spark;
 
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
+ *
  */
 public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
 
@@ -48,7 +49,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
     public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
         this(newSegment, submitter, 0);
     }
-    
+
     public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter, Integer priorityOffset) {
         super(newSegment, submitter, priorityOffset);
         this.inputSide = SparkUtil.getBatchCubingInputSide(seg);
@@ -65,22 +66,8 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
         // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
         inputSide.addStepPhase1_CreateFlatTable(result);
 
-        // build global dict
-        KylinConfig dictConfig = seg.getConfig();
-        String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
-
-        if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0
-                && !"".equals(mrHiveDictColumns[0])) {
-            //parallel part build
-            result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId));
-            //parallel total build
-            result.addTask(createBuildGlobalHiveDicTotalBuildJob(jobId));
-        }
-
-        // merge global dic and replace flat table
-        if(Objects.nonNull(dictConfig.getMrHiveDictColumns()) && dictConfig.getMrHiveDictColumns().length > 0 && !"".equals(dictConfig.getMrHiveDictColumns()[0])){
-            inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
-        }
+        // Build global dictionary in distributed way
+        buildHiveGlobalDictionaryByMR(result, jobId);
 
         // Phase 2: Build Dictionary
         if (seg.getConfig().isSparkFactDistinctEnable()) {
@@ -202,7 +189,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
 
 
     public void configureSparkJob(final CubeSegment seg, final SparkExecutable sparkExecutable,
-            final String jobId, final String cuboidRootPath) {
+                                  final String jobId, final String cuboidRootPath) {
         final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
         final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId));
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
@@ -228,4 +215,28 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
         param.put("path", getDumpMetadataPath(jobId));
         return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString();
     }
+
+    /**
+     * Build hive global dictionary by MR and encode corresponding column into integer for flat table
+     */
+    protected void buildHiveGlobalDictionaryByMR(final CubingJob result, String jobId) {
+        KylinConfig dictConfig = seg.getConfig();
+        String[] mrHiveDictColumnExcludeRef = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
+        String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns();
+
+        if (Objects.nonNull(mrHiveDictColumnExcludeRef) && mrHiveDictColumnExcludeRef.length > 0
+                && !"".equals(mrHiveDictColumnExcludeRef[0])) {
+
+            // 1. parallel part build
+            result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId));
+
+            // 2. parallel total build
+            result.addTask(createBuildGlobalHiveDictTotalBuildJob(jobId));
+        }
+
+        // merge global dic and replace flat table
+        if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0 && !"".equals(mrHiveDictColumns[0])) {
+            inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
+        }
+    }
 }
diff --git a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
index fce237e..763cf90 100644
--- a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
+++ b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
@@ -595,7 +595,8 @@
   "override_kylin_properties": {
     "kylin.cube.algorithm": "LAYER",
     "kylin.dictionary.shrunken-from-global-enabled": "false",
-    "kylin.dictionary.mr-hive.columns": "TEST_KYLIN_FACT_TEST_COUNT_DISTINCT_BITMAP"
+    "kylin.dictionary.mr-hive.columns": "TEST_KYLIN_FACT_TEST_COUNT_DISTINCT_BITMAP",
+    "kylin.source.hive.databasedir" : "/apps/hive/warehouse"
   },
   "partition_date_start": 0
 }
diff --git a/kubernetes/README.md b/kubernetes/README.md
index 964c612..2d0ecc6 100644
--- a/kubernetes/README.md
+++ b/kubernetes/README.md
@@ -1,4 +1,4 @@
-## Backgroud
+## Background
 Kubernetes is a portable, extensible, open-source platform for managing containerized workloads and services, that facilitates 
 both declarative configuration and automation. It has a large, rapidly growing ecosystem. Kubernetes services, support, 
 and tools are widely available.
@@ -11,7 +11,7 @@ cluster, will reduce cost of maintenance and extension.
   Please update your configuration file here. 
 - **template**
   This directory provided two deployment templates, one for **quick-start** purpose, another for **production/distributed** deployment.
-  1. Quick-start template is for one node deployment with an **ALL** kylin instance.
+  1. Quick-start template is for one node deployment with an **ALL** kylin instance for test or PoC purpose.
   2. Production template is for multi-nodes deployment with a few of **job**/**query** kylin instances; and some other service 
   like **memcached** and **filebeat** will help to satisfy log collection/query cache/session sharing demand.
 - **docker**
@@ -21,16 +21,16 @@ cluster, will reduce cost of maintenance and extension.
   This is a complete example by applying production template in a CDH 5.7 hadoop env with step by step guide.  
  
 ### Note 
-1. CuratorScheduler is used as default JobScheduler because it is more flexible.
+1. **CuratorScheduler** is used as default JobScheduler because it is more flexible.
 2. Spark building require use `cluster` as deployMode. If you forget it, your spark application will never submitted successfully because Hadoop cluster can not resolve hostname of Pod (Spark Driver).
 3. To modify `/etc/hosts` in Pod, please check this : https://kubernetes.io/docs/concepts/services-networking/add-entries-to-pod-etc-hosts-with-host-aliases/ . 
-4. To build you own kylin-client docker image, please don't forget to download and put following jars into KYLIN_HOME/tomcat/lib to enable tomcat session sharing.
+4. To build you own kylin-client docker image, please don't forget to download and put following jars into `KYLIN_HOME/tomcat/lib` to enable tomcat session sharing.
     - https://repo1.maven.org/maven2/de/javakaffee/msm/memcached-session-manager-tc7/2.1.1/
     - https://repo1.maven.org/maven2/de/javakaffee/msm/memcached-session-manager/2.1.1/
 5. If you have difficulty in configure filebeat, please check this https://www.elastic.co/guide/en/beats/filebeat/current/index.html .
 6. External query cache is enabled by default, if you are interested in detail, you may check http://kylin.apache.org/blog/2019/07/30/detailed-analysis-of-refine-query-cache/ .
-7. All configuration files is separated from Docker image, please use configMap or secret. Compared to configMap, secrets is more recommended for security reason.
-8. Some verified kylin-client image will be published to DockerHub, here is the link https://hub.docker.com/r/apachekylin/kylin-client . You may consider contributed your Dockerfile to kylin's repo if you are interested.
+7. All configuration files is separated from Docker image, please use **configMap** or **secret**. Compared to **configMap**, **secret** is more recommended for security reason.
+8. Some verified kylin-client image will be published to DockerHub, here is the link https://hub.docker.com/r/apachekylin/kylin-client . You may consider contributed your `Dockerfile` to kylin's repo if you are interested.
  
 ### Reference 
 - JIRA ticket: https://issues.apache.org/jira/browse/KYLIN-4447
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
index 305cdae..8538622 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
@@ -58,8 +58,9 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
     private static final String GET_SQL = "\" Get Max Dict Value Sql : \"";
 
     protected void createMrHiveDict(KylinConfig config, DistributedLock lock) throws Exception {
-        logger.info("start to run createMrHiveDict {}", getId());
+        logger.info("Start to run createMrHiveDict {}", getId());
         try {
+            // Step 1: Apply for lock if required
             if (getIsLock()) {
                 getLock(lock);
             }
@@ -72,12 +73,13 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
             if (sql != null && sql.length() > 0) {
                 hiveCmdBuilder.addStatement(sql);
             }
-            Map<String, String> maxDictValMap = deserilizeForMap(getMaxDictStatementMap());
-            Map<String, String> dictSqlMap = deserilizeForMap(getCreateTableStatementMap());
+            Map<String, String> maxDictValMap = deserializeForMap(getMaxDictStatementMap());
+            Map<String, String> dictSqlMap = deserializeForMap(getCreateTableStatementMap());
 
-            if (dictSqlMap != null && dictSqlMap.size() > 0) {
+            // Step 2: Execute HQL
+            if (!dictSqlMap.isEmpty()) {
                 IHiveClient hiveClient = HiveClientFactory.getHiveClient();
-                if (maxDictValMap != null && maxDictValMap.size() > 0) {
+                if (!maxDictValMap.isEmpty()) {
                     if (maxDictValMap.size() == dictSqlMap.size()) {
                         maxDictValMap.forEach((columnName, maxDictValSql) -> {
                             int max = 0;
@@ -111,7 +113,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
 
             final String cmd = hiveCmdBuilder.toString();
 
-            stepLogger.log("MR/Hive dict, cmd: " + cmd);
+            stepLogger.log("Build Hive Global Dictionary by: " + cmd);
 
             CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cube = manager.getCube(getCubeName());
@@ -123,9 +125,9 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
                 if (response.getFirst() != 0) {
                     throw new RuntimeException("Failed to create MR/Hive dict, error code " + response.getFirst());
                 }
-                getManager().addJobInfo(getId(), stepLogger.getInfo());
             }
 
+            // Step 3: Release lock if required
             if (getIsUnlock()) {
                 unLock(lock);
             }
@@ -153,20 +155,10 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
                 lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
             }
 
-            String preHdfsShell = getPreHdfsShell();
-            if (Objects.nonNull(preHdfsShell) && !"".equalsIgnoreCase(preHdfsShell)) {
-                doRetry(preHdfsShell, config);
-            }
-
             createMrHiveDict(config, lock);
 
-            String postfixHdfsCmd = getPostfixHdfsShell();
-            if (Objects.nonNull(postfixHdfsCmd) && !"".equalsIgnoreCase(postfixHdfsCmd)) {
-                doRetry(postfixHdfsCmd, config);
-            }
-
             if (isDiscarded()) {
-                if (getIsLock()) {
+                if (getIsLock() && lock != null) {
                     unLock(lock);
                 }
                 return new ExecuteResult(ExecuteResult.State.DISCARDED, stepLogger.getBufferedLog());
@@ -224,44 +216,28 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
     }
 
     public void setCreateTableStatementMap(Map<String, String> dictSqlMap) {
-        setParam("HiveRedistributeDataMap", serilizeToMap(dictSqlMap));
+        setParam("DictSqlMap", serializeMap(dictSqlMap));
     }
 
     public String getCreateTableStatementMap() {
-        return getParam("HiveRedistributeDataMap");
+        return getParam("DictSqlMap");
     }
 
     public void setMaxDictStatementMap(Map<String, String> maxDictValMap) {
-        setParam("DictMaxMap", serilizeToMap(maxDictValMap));
+        setParam("DictMaxMap", serializeMap(maxDictValMap));
     }
 
     public String getMaxDictStatementMap() {
         return getParam("DictMaxMap");
     }
 
-    public String getPreHdfsShell() {
-        return getParam("preHdfsCmd");
-    }
-
-    public void setPrefixHdfsShell(String cmd) {
-        setParam("preHdfsCmd", cmd);
-    }
-
-    public String getPostfixHdfsShell() {
-        return getParam("postfixHdfsCmd");
-    }
-
-    public void setPostfixHdfsShell(String cmd) {
-        setParam("postfixHdfsCmd", cmd);
-    }
-
     public void setIsLock(Boolean isLock) {
         setParam("isLock", String.valueOf(isLock));
     }
 
     public boolean getIsLock() {
         String isLock = getParam("isLock");
-        return Strings.isNullOrEmpty(isLock) ? false : Boolean.parseBoolean(isLock);
+        return !Strings.isNullOrEmpty(isLock) && Boolean.parseBoolean(isLock);
     }
 
     public void setJobFlowJobId(String jobId) {
@@ -278,7 +254,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
 
     public boolean getIsUnlock() {
         String isUnLock = getParam("isUnLock");
-        return Strings.isNullOrEmpty(isUnLock) ? false : Boolean.parseBoolean(isUnLock);
+        return !Strings.isNullOrEmpty(isUnLock) && Boolean.parseBoolean(isUnLock);
     }
 
     public void setLockPathName(String pathName) {
@@ -368,7 +344,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
                                 }
                             }
                         }
-                        isLocked = true;//get lock fail,will try again
+                        isLocked = true; //get lock fail,will try again
                     }
                 }
                 // wait 1 min and try again
@@ -402,7 +378,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
         }
     }
 
-    private static String serilizeToMap(Map<String, String> map) {
+    private static String serializeMap(Map<String, String> map) {
         JSONArray result = new JSONArray();
         if (map != null && map.size() > 0) {
             map.forEach((key, value) -> {
@@ -418,7 +394,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
         return result.toString();
     }
 
-    private static Map<String, String> deserilizeForMap(String mapStr) {
+    private static Map<String, String> deserializeForMap(String mapStr) {
         Map<String, String> result = new HashMap<>();
         if (mapStr != null) {
             try {
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 49e3f8d..c60a2ce 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.source.hive;
 
@@ -43,7 +43,6 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.IInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.spark.SparkCreatingFlatTable;
 import org.apache.kylin.engine.spark.SparkExecutable;
@@ -97,17 +96,12 @@ public class HiveInputBase {
             // create flat table first
             addStepPhase1_DoCreateFlatTable(jobFlow);
 
-            // create global dict
-            KylinConfig dictConfig = (flatDesc.getSegment()).getConfig();
+            // create hive global dictionary
+            KylinConfig dictConfig = flatDesc.getSegment().getConfig();
             String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
             if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0
                     && !"".equals(mrHiveDictColumns[0])) {
-                String globalDictDatabase = dictConfig.getMrHiveDictDB();
-                if (null == globalDictDatabase) {
-                    throw new IllegalArgumentException("Mr-Hive Global dict database is null.");
-                }
-                String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix();
-                addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, mrHiveDictColumns, globalDictDatabase, globalDictTable);
+                addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, mrHiveDictColumns);
             }
 
             // then count and redistribute
@@ -129,80 +123,75 @@ public class HiveInputBase {
 
         @Override
         public void addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable jobFlow) {
-            KylinConfig dictConfig = (flatDesc.getSegment()).getConfig();
+            KylinConfig dictConfig = flatDesc.getSegment().getConfig();
             final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+            String globalDictTable = MRHiveDictUtil.globalDictTableName(flatDesc, cubeName);
+            String globalDictDatabase = dictConfig.getMrHiveDictDB();
+
             String[] mrHiveDictColumnsExcludeRefCols = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
             Map<String, String> dictRef = dictConfig.getMrHiveDictRefColumns();
             final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
 
-            String globalDictDatabase = dictConfig.getMrHiveDictDB();
-            if (null == globalDictDatabase) {
-                throw new IllegalArgumentException("Mr-Hive Global dict database is null.");
-            }
-            String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix();
-            if(Objects.nonNull(mrHiveDictColumnsExcludeRefCols) && mrHiveDictColumnsExcludeRefCols.length > 0) {
-                //merge to dict table step
+            if (Objects.nonNull(mrHiveDictColumnsExcludeRefCols) && mrHiveDictColumnsExcludeRefCols.length > 0) {
                 jobFlow.addTask(createHiveGlobalDictMergeGlobalDict(flatDesc, hiveInitStatements, cubeName, mrHiveDictColumnsExcludeRefCols, globalDictDatabase, globalDictTable));
-
                 for (String item : mrHiveDictColumnsExcludeRefCols) {
                     dictRef.put(item, "");
                 }
             }
 
-            //replace step
-            if(dictRef.size()>0) {
+            // replace step
+            if (!dictRef.isEmpty()) {
                 jobFlow.addTask(createMrHiveGlobalDictReplaceStep(flatDesc, hiveInitStatements, cubeName,
                         dictRef, flatTableDatabase, globalDictDatabase, globalDictTable, dictConfig.getMrHiveDictTableSuffix(), jobFlow.getId()));
             }
-
         }
 
-        protected void addStepPhase1_DoCreateMrHiveGlobalDict(DefaultChainedExecutable jobFlow,
-                String[] mrHiveDictColumns, String globalDictDatabase, String globalDictTable) {
+        /**
+         * 1. Create three related tables
+         * 2. Insert distinct value into distinct value table
+         * 3. Calculate statistics for dictionary
+         */
+        protected void addStepPhase1_DoCreateMrHiveGlobalDict(DefaultChainedExecutable jobFlow, String[] mrHiveDictColumns) {
             final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
             final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
 
-            //Crete tables for global dict and extract distinct value
             jobFlow.addTask(createMrHiveGlobalDictExtractStep(flatDesc, hiveInitStatements, cubeName,
-                    mrHiveDictColumns, globalDictDatabase, globalDictTable, jobFlow.getId()));
+                    mrHiveDictColumns, jobFlow.getId()));
 
         }
 
-        protected static AbstractExecutable createMrHiveGlobalDictExtractStep(IJoinedFlatTableDesc flatDesc,
-                String hiveInitStatements, String cubeName, String[] mrHiveDictColumns,
-                String globalDictDatabase, String globalDictTable, String jobId) {
-            // Firstly, determine if the global dict hive table of cube is exists.
-            String createGlobalDictTableHql = "CREATE TABLE IF NOT EXISTS " + globalDictDatabase + "." + globalDictTable
-                    + "\n" + "( dict_key STRING COMMENT '', \n" + "dict_val INT COMMENT '' \n" + ") \n"
-                    + "COMMENT '' \n" + "PARTITIONED BY (dict_column string) \n" + " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' \n" + "STORED AS TEXTFILE; \n";
-
-            final String dropDictIntermediateTableHql = MRHiveDictUtil.generateDropTableStatement(flatDesc);
-            final String createDictIntermediateTableHql = MRHiveDictUtil.generateCreateTableStatement(flatDesc);
-            final String groupByTable = flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
-            final String globalDictIntermediateTable = MRHiveDictUtil.getMRHiveFlatTableGlobalDictTableName(flatDesc);
-            final String dropGlobalDictIntermediateTableHql = MRHiveDictUtil.generateDropTableStatement(globalDictIntermediateTable);
-            final String createGlobalDictIntermediateTableHql = MRHiveDictUtil.generateCreateGlobalDicIntermediateTableStatement(globalDictIntermediateTable);
-
-            String maxAndDistinctCountSql = "INSERT OVERWRITE TABLE  " + groupByTable + " PARTITION (DICT_COLUMN = '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "') "
-                    + "\n" + "SELECT  CONCAT_WS(',', tc.dict_column, cast(tc.total_distinct_val AS String), if(tm.max_dict_val is null, '0', cast(max_dict_val as string))) "
-                    + "\n" + "FROM ("
-                    + "\n" + "    SELECT  dict_column,count(1) total_distinct_val FROM "
-                    + "\n" + groupByTable + " where DICT_COLUMN != '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "' group by dict_column) tc "
-                    + "\n" + "LEFT JOIN (\n"
-                    + "\n" + "    SELECT  dict_column,if(max(dict_val) is null, 0, max(dict_val)) as max_dict_val FROM "
-                    + "\n" + globalDictDatabase + "." + globalDictTable + " group by dict_column) tm "
-                    + "\n" + "ON  tc.dict_column = tm.dict_column;";
+        protected static AbstractExecutable createMrHiveGlobalDictExtractStep(IJoinedFlatTableDesc flatDesc, String hiveInitStatements,
+                                                                              String cubeName, String[] mrHiveDictColumns, String jobId) {
+            KylinConfig cfg = flatDesc.getSegment().getConfig();
+            String globalDictTable = MRHiveDictUtil.globalDictTableName(flatDesc, cubeName);
+            String globalDictDatabase = cfg.getMrHiveDictDB();
+            final String distinctValueTable = MRHiveDictUtil.distinctValueTable(flatDesc);
+            final String segmentLevelDictTableName = MRHiveDictUtil.segmentLevelDictTableName(flatDesc);
+
+            final String createGlobalDictTableHql = MRHiveDictUtil.generateDictionaryDdl(globalDictDatabase, globalDictTable);
+            final String dropDistinctValueTableHql = MRHiveDictUtil.generateDropTableStatement(distinctValueTable);
+            final String createDistinctValueTableHql = MRHiveDictUtil.generateDistinctValueTableStatement(flatDesc);
+            final String dropSegmentLevelDictTableHql = MRHiveDictUtil.generateDropTableStatement(segmentLevelDictTableName);
+            final String createSegmentLevelDictTableHql = MRHiveDictUtil.generateDictTableStatement(segmentLevelDictTableName);
+
+            String maxAndDistinctCountSql = MRHiveDictUtil.generateDictStatisticsSql(distinctValueTable, globalDictTable, globalDictDatabase);
 
             StringBuilder insertDataToDictIntermediateTableSql = new StringBuilder();
             for (String dictColumn : mrHiveDictColumns) {
                 insertDataToDictIntermediateTableSql
                         .append(MRHiveDictUtil.generateInsertDataStatement(flatDesc, dictColumn, globalDictDatabase, globalDictTable));
             }
-            String set = "set hive.exec.compress.output=false;set hive.mapred.mode=unstrict;";
+            String setParametersHql = "set hive.exec.compress.output=false;set hive.mapred.mode=unstrict;";
             CreateMrHiveDictStep step = new CreateMrHiveDictStep();
             step.setInitStatement(hiveInitStatements);
-            step.setCreateTableStatement(set + createGlobalDictTableHql + dropDictIntermediateTableHql
-                    + createDictIntermediateTableHql + dropGlobalDictIntermediateTableHql + createGlobalDictIntermediateTableHql + insertDataToDictIntermediateTableSql.toString() + maxAndDistinctCountSql);
+            step.setCreateTableStatement(setParametersHql
+                    + createGlobalDictTableHql
+                    + dropDistinctValueTableHql
+                    + createDistinctValueTableHql
+                    + dropSegmentLevelDictTableHql
+                    + createSegmentLevelDictTableHql
+                    + insertDataToDictIntermediateTableSql.toString()
+                    + maxAndDistinctCountSql);
             CubingExecutableUtil.setCubeName(cubeName, step.getParams());
             step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL);
             step.setIsLock(true);
@@ -212,20 +201,27 @@ public class HiveInputBase {
             return step;
         }
 
+        /**
+         * In the previous step, data of hive global dictionary is prepared by MR,
+         * so now it is time for create partition for Segment Dictionary Table
+         * and merge into Hive Global Dictionary Table.
+         */
         protected static AbstractExecutable createHiveGlobalDictMergeGlobalDict(IJoinedFlatTableDesc flatDesc,
                                                                                 String hiveInitStatements, String cubeName, String[] mrHiveDictColumns,
                                                                                 String globalDictDatabase, String globalDictTable) {
 
-            String globalDictItermediateTable = MRHiveDictUtil.getMRHiveFlatTableGlobalDictTableName(flatDesc);
+            String globalDictIntermediateTable = MRHiveDictUtil.segmentLevelDictTableName(flatDesc);
+            StringBuilder addPartitionHql = new StringBuilder();
 
-            StringBuffer addPartition = new StringBuffer();
-            Map<String, String> maxDictValMap = new HashMap<>();
             Map<String, String> dictHqlMap = new HashMap<>();
             for (String dictColumn : mrHiveDictColumns) {
                 try {
-                    addPartition.append("alter table ").append(globalDictItermediateTable)
-                            .append(" add  IF NOT EXISTS partition (dict_column='").append(dictColumn)
-                            .append("');").append(" \n");
+                    addPartitionHql.append("ALTER TABLE ")
+                            .append(globalDictIntermediateTable)
+                            .append(" ADD IF NOT EXISTS PARTITION (dict_column='")
+                            .append(dictColumn)
+                            .append("');")
+                            .append(" \n");
 
                     String dictHql = "INSERT OVERWRITE TABLE " + globalDictDatabase + "." + globalDictTable + " \n"
                             + "PARTITION (dict_column = '" + dictColumn + "') \n"
@@ -233,7 +229,7 @@ public class HiveInputBase {
                             + globalDictDatabase + "." + globalDictTable + " \n" + "WHERE dict_column = '" + dictColumn
                             + "' \n" + flatDesc.getDataModel().getConfig().getHiveUnionStyle() + " \n"
                             + "SELECT dict_key, dict_val FROM "
-                            + globalDictItermediateTable + " \n" + " WHERE dict_column = '" + dictColumn + "' ;\n";
+                            + globalDictIntermediateTable + " \n" + " WHERE dict_column = '" + dictColumn + "' ;\n";
                     dictHqlMap.put(dictColumn, dictHql);
                 } catch (Exception e) {
                     logger.error("", e);
@@ -241,9 +237,8 @@ public class HiveInputBase {
             }
             String hiveInitStatementForUnstrict = "set hive.mapred.mode=unstrict;";
             CreateMrHiveDictStep step = new CreateMrHiveDictStep();
-            step.setInitStatement(hiveInitStatements + hiveInitStatementForUnstrict + addPartition);
+            step.setInitStatement(hiveInitStatements + hiveInitStatementForUnstrict + addPartitionHql);
             step.setCreateTableStatementMap(dictHqlMap);
-            step.setMaxDictStatementMap(maxDictValMap);
             step.setIsLock(false);
             step.setIsUnLock(false);
             step.setLockPathName(cubeName);
@@ -252,58 +247,75 @@ public class HiveInputBase {
             return step;
         }
 
+        /**
+         * Use Hive Global Dictionary to replace/encode flat table
+         *
+         * @param mrHiveDictColumns a Map which key is and vale is .
+         */
         protected static AbstractExecutable createMrHiveGlobalDictReplaceStep(IJoinedFlatTableDesc flatDesc, String hiveInitStatements, String cubeName, Map<String, String> mrHiveDictColumns, String flatTableDatabase, String globalDictDatabase, String globalDictTable, String dictSuffix, String jobId) {
             Map<String, String> dictHqlMap = new HashMap<>();
-            StringBuilder addPartition = new StringBuilder();
             for (String dictColumn : mrHiveDictColumns.keySet()) {
-                StringBuilder dictHql = new StringBuilder();
+                StringBuilder insertOverwriteHql = new StringBuilder();
                 TblColRef dictColumnRef = null;
 
                 String flatTable = flatTableDatabase + "." + flatDesc.getTableName();
-                // replace the flat table's dict column value
-                dictHql.append("INSERT OVERWRITE TABLE " + flatTable + " \n");
+                insertOverwriteHql.append("INSERT OVERWRITE TABLE ").append(flatTable).append(" \n");
                 try {
-                    dictHql.append("SELECT \n");
-                    Integer flatTableColumnSize = flatDesc.getAllColumns().size();
+                    insertOverwriteHql.append("SELECT \n");
+                    int flatTableColumnSize = flatDesc.getAllColumns().size();
                     for (int i = 0; i < flatTableColumnSize; i++) {
                         TblColRef tblColRef = flatDesc.getAllColumns().get(i);
+                        String colName = JoinedFlatTable.colName(tblColRef, flatDesc.useAlias());
+
                         if (i > 0) {
-                            dictHql.append(",");
+                            insertOverwriteHql.append(",");
                         }
-                        if (JoinedFlatTable.colName(tblColRef, flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) {
-                            dictHql.append("b.dict_val \n");
+
+                        if (colName.equalsIgnoreCase(dictColumn)) {
+                            // Note: replace original value into encoded integer
+                            insertOverwriteHql.append("b.dict_val \n");
                             dictColumnRef = tblColRef;
                         } else {
-                            dictHql.append("a." + JoinedFlatTable.colName(tblColRef) + " \n");
+                            // Note: keep its original value
+                            insertOverwriteHql.append("a.")
+                                    .append(JoinedFlatTable.colName(tblColRef))
+                                    .append(" \n");
                         }
                     }
 
                     if (!Strings.isNullOrEmpty(mrHiveDictColumns.get(dictColumn))) {
-                        String[] cubePartion = mrHiveDictColumns.get(dictColumn).split("\\.");
-
-                        String refGlobalDictTable = cubePartion[0] + dictSuffix;
-                        String refDictColumn = cubePartion[1];
-
-                        dictHql.append("FROM " + flatTable + " a \n" + "LEFT OUTER JOIN \n" + "( \n"
-                                + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + refGlobalDictTable
-                                + " WHERE dict_column = '" + refDictColumn + "' \n" + ") b \n" + " ON a."
-                                + JoinedFlatTable.colName(dictColumnRef) + " = b.dict_key;");
-                        dictHqlMap.put(dictColumn, dictHql.toString());
-                    }else {
-                        dictHql.append("FROM " + flatTable + " a \n" + "LEFT OUTER JOIN \n" + "( \n"
-                                + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + globalDictTable
-                                + " WHERE dict_column = '" + dictColumn + "' \n" + ") b \n" + " ON a."
-                                + JoinedFlatTable.colName(dictColumnRef) + " = b.dict_key;");
+                        // Note: reuse previous hive global dictionary
+                        String[] tableColumn = mrHiveDictColumns.get(dictColumn).split("\\.");
+
+                        String refGlobalDictTable = tableColumn[0] + dictSuffix;
+                        String refDictColumn = tableColumn[1];
+
+                        insertOverwriteHql
+                                .append("FROM ").append(flatTable).append(" a \nLEFT OUTER JOIN \n (")
+                                .append("SELECT dict_key, dict_val FROM ")
+                                .append(globalDictDatabase).append(".").append(refGlobalDictTable)
+                                .append(" WHERE dict_column = '").append(refDictColumn).append("') b \n")
+                                .append("ON a.").append(JoinedFlatTable.colName(dictColumnRef)).append(" = b.dict_key;");
+                        dictHqlMap.put(dictColumn, insertOverwriteHql.toString());
+                    } else {
+                        // Note: use hive global dictionary built by current cube
+                        insertOverwriteHql
+                                .append("FROM ").append(flatTable).append(" a \nLEFT OUTER JOIN \n (")
+                                .append("SELECT dict_key, dict_val FROM ")
+                                .append(globalDictDatabase).append(".").append(globalDictTable)
+                                .append(" WHERE dict_column = '").append(dictColumn).append("') b \n")
+                                .append("ON a.").append(JoinedFlatTable.colName(dictColumnRef)).append(" = b.dict_key;");
                     }
-                    dictHqlMap.put(dictColumn, dictHql.toString());
+                    dictHqlMap.put(dictColumn, insertOverwriteHql.toString());
                 } catch (Exception e) {
                     logger.error("", e);
                 }
             }
-            String set = "set hive.exec.compress.output=false; set hive.mapred.mode=unstrict;";
+            String setParameterHal = "set hive.exec.compress.output=false; set hive.mapred.mode=unstrict;";
             CreateMrHiveDictStep step = new CreateMrHiveDictStep();
-            step.setInitStatement(hiveInitStatements + set + addPartition);
+            step.setInitStatement(hiveInitStatements + setParameterHal);
             step.setCreateTableStatementMap(dictHqlMap);
+
             step.setIsUnLock(true);
             step.setLockPathName(cubeName);
             step.setJobFlowJobId(jobId);
@@ -389,7 +401,7 @@ public class HiveInputBase {
     }
 
     protected static AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir,
-            String cubeName, IJoinedFlatTableDesc flatDesc) {
+                                                                String cubeName, IJoinedFlatTableDesc flatDesc) {
         //from hive to hive
         final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
         final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
@@ -404,7 +416,7 @@ public class HiveInputBase {
     }
 
     protected static AbstractExecutable createFlatHiveTableByLivyStep(String hiveInitStatements, String jobWorkingDir,
-            String cubeName, IJoinedFlatTableDesc flatDesc) {
+                                                                      String cubeName, IJoinedFlatTableDesc flatDesc) {
         //from hive to hive
         final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
         final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
@@ -419,7 +431,7 @@ public class HiveInputBase {
     }
 
     protected static AbstractExecutable createFlatHiveTableBySparkSql(String hiveInitStatements,
-            String jobWorkingDir, String cubeName, IJoinedFlatTableDesc flatDesc) {
+                                                                      String jobWorkingDir, String cubeName, IJoinedFlatTableDesc flatDesc) {
         final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
         final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc,
                 jobWorkingDir);
@@ -472,7 +484,7 @@ public class HiveInputBase {
     }
 
     protected static AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName,
-            IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
+                                                                            IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
         RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
         step.setInitStatement(hiveInitStatements);
         step.setIntermediateTable(flatDesc.getTableName());
@@ -483,7 +495,7 @@ public class HiveInputBase {
     }
 
     protected static AbstractExecutable createRedistributeFlatHiveTableByLivyStep(String hiveInitStatements,
-            String cubeName, IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
+                                                                                  String cubeName, IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
         RedistributeFlatHiveTableByLivyStep step = new RedistributeFlatHiveTableByLivyStep();
         step.setInitStatement(hiveInitStatements);
         step.setIntermediateTable(flatDesc.getTableName());
@@ -493,8 +505,8 @@ public class HiveInputBase {
         return step;
     }
 
-    protected static ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements,
-            String jobWorkingDir, IJoinedFlatTableDesc flatDesc, List<String> intermediateTables, String uuid) {
+    protected static ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, String jobWorkingDir, IJoinedFlatTableDesc flatDesc,
+                                                                             List<String> intermediateTables, String uuid) {
         ShellExecutable step = new ShellExecutable();
         step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
 
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
index 85cd855..573ecd3 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
@@ -27,6 +27,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.livy.LivyRestBuilder;
 import org.apache.kylin.common.livy.LivyRestExecutor;
 import org.apache.kylin.common.livy.LivyTypeEnum;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -41,6 +42,30 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+/**
+ * <pre>
+ * Hold some constant/enum/statement for Hive Global Dictionary.
+ *
+ * There are two different temporary tables which help to build Hive Global Dictionary.
+ * They should be deleted at the final step of building job.
+ *   1. Distinct Value Table (Temporary table)
+ *      TableName: ${FlatTable}_${DistinctValueSuffix}
+ *      Schema: One normal column, for original column value; with another partition column.
+ *   @see #distinctValueTable
+ *
+ *   2. Segment Level Dictionary Table (Temporary table)
+ *      TableName: ${FlatTable}_${DictTableSuffix}
+ *      Schema: Two normal columns, first for original column value, second for is its encoded integer;
+ *          also with another partition column
+ *   @see #segmentLevelDictTableName
+ *
+ * After that, Hive Global Dictionary itself is stored in a third hive table.
+ *   3. Hive Global Dictionary Table
+ *      TableName: ${CubeName}_${DictTableSuffix}
+ *      Schema: Two columns, first for original column value, second is its encoded integer; also with another partition column
+ *   @see #globalDictTableName
+ * </pre>
+ */
 public class MRHiveDictUtil {
     private static final Logger logger = LoggerFactory.getLogger(MRHiveDictUtil.class);
     protected static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';");
@@ -59,12 +84,27 @@ public class MRHiveDictUtil {
         }
     }
 
-    public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) {
-        StringBuilder ddl = new StringBuilder();
-        String table = flatDesc.getTableName()
-                + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
-        ddl.append("DROP TABLE IF EXISTS " + table + ";").append(" \n");
-        return ddl.toString();
+    public static String distinctValueTable(IJoinedFlatTableDesc flatDesc) {
+        return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDistinctValueTableSuffix();
+    }
+
+    public static String segmentLevelDictTableName(IJoinedFlatTableDesc flatDesc) {
+        return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictTableSuffix();
+    }
+
+    public static String globalDictTableName(IJoinedFlatTableDesc flatDesc, String cubeName) {
+        return cubeName + flatDesc.getSegment().getConfig().getMrHiveDictTableSuffix();
+    }
+
+    public static String generateDictionaryDdl(String db, String tbl) {
+        return "CREATE TABLE IF NOT EXISTS " + db + "." + tbl + "\n"
+                + " ( dict_key STRING COMMENT '', \n"
+                + "   dict_val INT COMMENT '' \n"
+                + ") \n"
+                + "COMMENT 'Hive Global Dictionary' \n"
+                + "PARTITIONED BY (dict_column string) \n"
+                + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' \n"
+                + "STORED AS TEXTFILE; \n";
     }
 
     public static String generateDropTableStatement(String tableName) {
@@ -73,14 +113,14 @@ public class MRHiveDictUtil {
         return ddl.toString();
     }
 
-    public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc) {
+    public static String generateDistinctValueTableStatement(IJoinedFlatTableDesc flatDesc) {
         StringBuilder ddl = new StringBuilder();
         String table = flatDesc.getTableName()
-                + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
+                + flatDesc.getSegment().getConfig().getMrHiveDistinctValueTableSuffix();
 
         ddl.append("CREATE TABLE IF NOT EXISTS " + table + " \n");
         ddl.append("( \n ");
-        ddl.append("dict_key" + " " + "STRING" + " COMMENT '' \n");
+        ddl.append("  dict_key" + " " + "STRING" + " COMMENT '' \n");
         ddl.append(") \n");
         ddl.append("COMMENT '' \n");
         ddl.append("PARTITIONED BY (dict_column string) \n");
@@ -89,13 +129,13 @@ public class MRHiveDictUtil {
         return ddl.toString();
     }
 
-    public static String generateCreateGlobalDicIntermediateTableStatement(String globalTableName) {
+    public static String generateDictTableStatement(String globalTableName) {
         StringBuilder ddl = new StringBuilder();
 
         ddl.append("CREATE TABLE IF NOT EXISTS " + globalTableName + " \n");
         ddl.append("( \n ");
-        ddl.append("dict_key" + " " + "STRING" + " COMMENT '' , \n");
-        ddl.append("dict_val" + " " + "STRING" + " COMMENT '' \n");
+        ddl.append("  dict_key" + " " + "STRING" + " COMMENT '' , \n");
+        ddl.append("  dict_val" + " " + "STRING" + " COMMENT '' \n");
         ddl.append(") \n");
         ddl.append("COMMENT '' \n");
         ddl.append("PARTITIONED BY (dict_column string) \n");
@@ -105,12 +145,12 @@ public class MRHiveDictUtil {
         return ddl.toString();
     }
 
+    /**
+     * Fetch distinct value from flat table and insert into distinctValueTable.
+     *
+     * @see #distinctValueTable
+     */
     public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, String dictColumn, String globalDictDatabase, String globalDictTable) {
-        String table = getMRHiveFlatTableGroupBytableName(flatDesc);
-
-        StringBuilder sql = new StringBuilder();
-        sql.append("SELECT a.DICT_KEY FROM (" + "\n");
-
         int index = 0;
         for (TblColRef tblColRef : flatDesc.getAllColumns()) {
             if (JoinedFlatTable.colName(tblColRef, flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) {
@@ -118,42 +158,56 @@ public class MRHiveDictUtil {
             }
             index++;
         }
-
         if (index == flatDesc.getAllColumns().size()) {
             String msg = "Can not find correct column for " + dictColumn
                     + ", please check 'kylin.dictionary.mr-hive.columns'";
             logger.error(msg);
             throw new IllegalArgumentException(msg);
         }
-        sql.append(" SELECT " + "\n");
-        TblColRef col = flatDesc.getAllColumns().get(index);
-        sql.append(JoinedFlatTable.colName(col) + "  as DICT_KEY \n");
-
-        MRHiveDictUtil.appendJoinStatement(flatDesc, sql);
 
-        //group by
-        sql.append("GROUP BY ");
-        sql.append(JoinedFlatTable.colName(col) + ") a \n");
+        String table = distinctValueTable(flatDesc);
+        StringBuilder sql = new StringBuilder();
+        TblColRef col = flatDesc.getAllColumns().get(index);
 
-        //join
-        sql.append(" LEFT JOIN \n");
-        sql.append("(SELECT  DICT_KEY FROM ");
-        sql.append(globalDictDatabase).append(".").append(globalDictTable);
-        sql.append(" WHERE DICT_COLUMN = '" + dictColumn + "'");
-        sql.append(") b \n");
+        sql.append("SELECT a.DICT_KEY FROM (\n");
+        sql.append("  SELECT " + "\n");
+        sql.append(JoinedFlatTable.colName(col)).append(" as DICT_KEY \n");
+        sql.append("  FROM ").append(flatDesc.getTableName()).append("\n");
+        sql.append("  GROUP BY ");
+        sql.append(JoinedFlatTable.colName(col)).append(") a \n");
+        sql.append("    LEFT JOIN \n");
+        sql.append("  (SELECT DICT_KEY FROM ").append(globalDictDatabase).append(".").append(globalDictTable);
+        sql.append("    WHERE DICT_COLUMN = '").append(dictColumn);
+        sql.append("' ) b \n");
         sql.append("ON a.DICT_KEY = b.DICT_KEY \n");
-        sql.append("WHERE   b.DICT_KEY IS NULL \n");
+        sql.append("WHERE b.DICT_KEY IS NULL \n");
 
-        return "INSERT OVERWRITE TABLE " + table + " \n" + "PARTITION (dict_column = '" + dictColumn + "')" + " \n"
-                + sql + ";\n";
+        return "INSERT OVERWRITE TABLE " + table + " \n"
+                + "PARTITION (dict_column = '" + dictColumn + "')" + " \n"
+                + sql.toString()
+                + ";\n";
     }
 
-    public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql) {
-        sql.append("FROM " + flatDesc.getTableName() + "\n");
+    /**
+     * Calculate and store "columnName,segmentDistinctCount,previousMaxDictId" into specific partition
+     */
+    public static String generateDictStatisticsSql(String distinctValueTable, String globalDictTable, String globalDictDatabase) {
+        return "INSERT OVERWRITE TABLE  " + distinctValueTable + " PARTITION (DICT_COLUMN = '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "') "
+                + "\n" + "SELECT CONCAT_WS(',', tc.dict_column, cast(tc.total_distinct_val AS String), if(tm.max_dict_val is null, '0', cast(max_dict_val as string))) "
+                + "\n" + "FROM ("
+                + "\n" + "    SELECT dict_column, count(1) total_distinct_val"
+                + "\n" + "    FROM " + globalDictDatabase + "." + distinctValueTable
+                + "\n" + "    WHERE DICT_COLUMN != '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "'"
+                + "\n" + "    GROUP BY dict_column) tc "
+                + "\n" + "LEFT JOIN (\n"
+                + "\n" + "    SELECT dict_column, if(max(dict_val) is null, 0, max(dict_val)) as max_dict_val "
+                + "\n" + "    FROM " + globalDictDatabase + "." + globalDictTable
+                + "\n" + "    GROUP BY dict_column) tm "
+                + "\n" + "ON tc.dict_column = tm.dict_column;";
     }
 
     public static void runLivySqlJob(PatternedLogger stepLogger, KylinConfig config, ImmutableList<String> sqls,
-            ExecutableManager executableManager, String jobId) throws IOException {
+                                     ExecutableManager executableManager, String jobId) throws IOException {
         final LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
         livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride());
         StringBuilder stringBuilder = new StringBuilder();
@@ -171,7 +225,7 @@ public class MRHiveDictUtil {
         executor.execute(livyRestBuilder, stepLogger);
 
         Map<String, String> info = stepLogger.getInfo();
-        //get the flat Hive table size
+        // get the flat Hive table size
         Matcher matcher = HDFS_LOCATION.matcher(args);
         if (matcher.find()) {
             String hiveFlatTableHdfsUrl = matcher.group(1);
@@ -194,14 +248,6 @@ public class MRHiveDictUtil {
         return DictHiveType.MrEphemeralDictLockPath.getName() + cubeName;
     }
 
-    public static String getMRHiveFlatTableGroupBytableName(IJoinedFlatTableDesc flatDesc) {
-        return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
-    }
-
-    public static String getMRHiveFlatTableGlobalDictTableName(IJoinedFlatTableDesc flatDesc) {
-        return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictTableSuffix();
-    }
-
     private static long getFileSize(String hdfsUrl) throws IOException {
         Configuration configuration = new Configuration();
         Path path = new Path(hdfsUrl);