You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/05/30 10:25:52 UTC

[kylin] 05/11: KYLIN-4366 Build Global Dict by MR/Hive, Merge to dict table Step implementation

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 3e5058155a32ab4f4867290227ef5ec4bc2c6ba9
Author: wangxiaojing <wa...@didichuxing.com>
AuthorDate: Wed May 6 14:43:08 2020 +0800

    KYLIN-4366 Build Global Dict by MR/Hive, Merge to dict table
     Step implementation
---
 .../kylin/engine/mr/BatchCubingJobBuilder2.java    |  5 +-
 .../java/org/apache/kylin/engine/mr/IInput.java    |  3 +
 .../apache/kylin/source/hive/HiveInputBase.java    | 82 +++++++++++++---------
 .../apache/kylin/source/kafka/KafkaInputBase.java  |  5 ++
 4 files changed, 62 insertions(+), 33 deletions(-)

diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index b62650a..8ec7d36 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -73,7 +73,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
             result.addTask(createBuildGlobalHiveDicTotalBuildJob(jobId));
         }
 
-        //toDo merge global dic and replace flat table
+        //merge global dic and replace flat table
+        if(Objects.nonNull(dictConfig.getMrHiveDictColumns()) && dictConfig.getMrHiveDictColumns().length > 0 && !"".equals(dictConfig.getMrHiveDictColumns()[0])){
+            inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
+        }
 
         // Phase 2: Build Dictionary
         result.addTask(createFactDistinctColumnsStep(jobId));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
index 758b081..9fdb300 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
@@ -34,6 +34,9 @@ public interface IInput {
         /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
 
+        /** Add step that replace flat table global column value by global dic*/
+        public void addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable jobFlow);
+
         /** Add step that does necessary clean up, like delete the intermediate flat table */
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 65b8dc6..8bad023 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -126,6 +126,30 @@ public class HiveInputBase {
             addStepPhase1_DoMaterializeLookupTable(jobFlow);
         }
 
+        @Override
+        public void addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable jobFlow) {
+            KylinConfig dictConfig = (flatDesc.getSegment()).getConfig();
+            final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+            String[] mrHiveDictColumnsExcludeRefCols = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
+            Map<String, String> dictRef = dictConfig.getMrHiveDictRefColumns();
+            final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+
+            String globalDictDatabase = dictConfig.getMrHiveDictDB();
+            if (null == globalDictDatabase) {
+                throw new IllegalArgumentException("Mr-Hive Global dict database is null.");
+            }
+            String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix();
+            if(Objects.nonNull(mrHiveDictColumnsExcludeRefCols) && mrHiveDictColumnsExcludeRefCols.length > 0) {
+                //merge to dict table step
+                jobFlow.addTask(createHiveGlobalDictMergeGlobalDict(flatDesc, hiveInitStatements, cubeName, mrHiveDictColumnsExcludeRefCols, globalDictDatabase, globalDictTable));
+
+                for (String item : mrHiveDictColumnsExcludeRefCols) {
+                    dictRef.put(item, "");
+                }
+            }
+            //toDo add replace step
+        }
+
         protected void addStepPhase1_DoCreateMrHiveGlobalDict(DefaultChainedExecutable jobFlow,
                 String[] mrHiveDictColumns, String globalDictDatabase, String globalDictTable) {
             final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
@@ -180,49 +204,43 @@ public class HiveInputBase {
             return step;
         }
 
-        protected static AbstractExecutable createMrHiveGlobalDictReplaceStep(IJoinedFlatTableDesc flatDesc,
-                String hiveInitStatements, String hdfsWorkingDir, String cubeName, String[] mrHiveDictColumns,
-                String flatTableDatabase, String globalDictDatabase, String globalDictTable) {
+        protected static AbstractExecutable createHiveGlobalDictMergeGlobalDict(IJoinedFlatTableDesc flatDesc,
+                                                                                String hiveInitStatements, String cubeName, String[] mrHiveDictColumns,
+                                                                                String globalDictDatabase, String globalDictTable) {
+
+            String globalDictItermediateTable = MRHiveDictUtil.getMRHiveFlatTableGlobalDictTableName(flatDesc);
+
+            StringBuffer addPartition = new StringBuffer();
+            Map<String, String> maxDictValMap = new HashMap<>();
             Map<String, String> dictHqlMap = new HashMap<>();
             for (String dictColumn : mrHiveDictColumns) {
-                StringBuilder dictHql = new StringBuilder();
-                TblColRef dictColumnRef = null;
-
-                String flatTable = flatTableDatabase + "." + flatDesc.getTableName();
-                // replace the flat table's dict column value
-                dictHql.append("INSERT OVERWRITE TABLE " + flatTable + " \n");
                 try {
-                    dictHql.append("SELECT \n");
-                    Integer flatTableColumnSize = flatDesc.getAllColumns().size();
-                    for (int i = 0; i < flatTableColumnSize; i++) {
-                        TblColRef tblColRef = flatDesc.getAllColumns().get(i);
-                        if (i > 0) {
-                            dictHql.append(",");
-                        }
-                        if (JoinedFlatTable.colName(tblColRef, flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) {
-                            dictHql.append("b. dict_val \n");
-                            dictColumnRef = tblColRef;
-                        } else {
-                            dictHql.append("a." + JoinedFlatTable.colName(tblColRef) + " \n");
-                        }
-                    }
-                    dictHql.append("FROM " + flatTable + " a \n" + "LEFT OUTER JOIN \n" + "( \n"
-                            + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + globalDictTable
-                            + " WHERE dict_column = '" + dictColumn + "' \n" + ") b \n" + " ON a."
-                            + JoinedFlatTable.colName(dictColumnRef) + " = b.dict_key;");
-                    dictHqlMap.put(dictColumn, dictHql.toString());
+                    addPartition.append("alter table ").append(globalDictItermediateTable)
+                            .append(" add  IF NOT EXISTS partition (dict_column='").append(dictColumn)
+                            .append("');").append(" \n");
+
+                    String dictHql = "INSERT OVERWRITE TABLE " + globalDictDatabase + "." + globalDictTable + " \n"
+                            + "PARTITION (dict_column = '" + dictColumn + "') \n"
+                            + "SELECT dict_key, dict_val FROM "
+                            + globalDictDatabase + "." + globalDictTable + " \n" + "WHERE dict_column = '" + dictColumn
+                            + "' \n" + flatDesc.getDataModel().getConfig().getHiveUnionStyle() + " \n"
+                            + "SELECT dict_key, dict_val FROM "
+                            + globalDictItermediateTable + " \n" + " WHERE dict_column = '" + dictColumn + "' ;\n";
+                    dictHqlMap.put(dictColumn, dictHql);
                 } catch (Exception e) {
                     logger.error("", e);
                 }
             }
+            String hiveInitStatementForUnstrict = "set hive.mapred.mode=unstrict;";
             CreateMrHiveDictStep step = new CreateMrHiveDictStep();
-            step.setInitStatement(hiveInitStatements);
+            step.setInitStatement(hiveInitStatements + hiveInitStatementForUnstrict + addPartition);
             step.setCreateTableStatementMap(dictHqlMap);
-            step.setIsUnLock(true);
+            step.setMaxDictStatementMap(maxDictValMap);
+            step.setIsLock(false);
+            step.setIsUnLock(false);
             step.setLockPathName(cubeName);
-
             CubingExecutableUtil.setCubeName(cubeName, step.getParams());
-            step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL);
+            step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL);
             return step;
         }
 
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
index d9e112c..5ba9082 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
@@ -88,6 +88,11 @@ public class KafkaInputBase {
             }
         }
 
+        @Override
+        public void addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable jobFlow) {
+            //do nothing
+        }
+
         protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
             return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
         }