You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/05/30 10:25:53 UTC

[kylin] 06/11: KYLIN-4367 Build Global Dict by MR/Hive, Replace intermediate table Step implementation

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5addc9da885a606ca1f236261df1c314bdb13e0f
Author: wangxiaojing <wa...@didichuxing.com>
AuthorDate: Wed May 6 14:43:45 2020 +0800

    KYLIN-4367 Build Global Dict by MR/Hive, Replace intermediate
     table Step implementation
---
 .../apache/kylin/source/hive/HiveInputBase.java    | 69 +++++++++++++++++++++-
 1 file changed, 68 insertions(+), 1 deletion(-)

diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 8bad023..624c8f9 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -30,6 +30,7 @@ import java.util.Set;
 import java.util.Locale;
 import java.util.Collections;
 
+import org.apache.kylin.shaded.com.google.common.base.Strings;
 import org.apache.kylin.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -147,7 +148,13 @@ public class HiveInputBase {
                     dictRef.put(item, "");
                 }
             }
-            //toDo add replace step
+
+            //replace step
+            if(dictRef.size()>0) {
+                jobFlow.addTask(createMrHiveGlobalDictReplaceStep(flatDesc, hiveInitStatements, cubeName,
+                        dictRef, flatTableDatabase, globalDictDatabase, globalDictTable, dictConfig.getMrHiveDictTableSuffix(), jobFlow.getId()));
+            }
+
         }
 
         protected void addStepPhase1_DoCreateMrHiveGlobalDict(DefaultChainedExecutable jobFlow,
@@ -244,6 +251,66 @@ public class HiveInputBase {
             return step;
         }
 
+        protected static AbstractExecutable createMrHiveGlobalDictReplaceStep(IJoinedFlatTableDesc flatDesc, String hiveInitStatements, String cubeName, Map<String, String> mrHiveDictColumns, String flatTableDatabase, String globalDictDatabase, String globalDictTable, String dictSuffix, String jobId) {
+            Map<String, String> dictHqlMap = new HashMap<>();
+            StringBuilder addPartition = new StringBuilder();
+            for (String dictColumn : mrHiveDictColumns.keySet()) {
+                StringBuilder dictHql = new StringBuilder();
+                TblColRef dictColumnRef = null;
+
+                String flatTable = flatTableDatabase + "." + flatDesc.getTableName();
+                // replace the flat table's dict column value
+                dictHql.append("INSERT OVERWRITE TABLE " + flatTable + " \n");
+                try {
+                    dictHql.append("SELECT \n");
+                    Integer flatTableColumnSize = flatDesc.getAllColumns().size();
+                    for (int i = 0; i < flatTableColumnSize; i++) {
+                        TblColRef tblColRef = flatDesc.getAllColumns().get(i);
+                        if (i > 0) {
+                            dictHql.append(",");
+                        }
+                        if (JoinedFlatTable.colName(tblColRef, flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) {
+                            dictHql.append("b.dict_val \n");
+                            dictColumnRef = tblColRef;
+                        } else {
+                            dictHql.append("a." + JoinedFlatTable.colName(tblColRef) + " \n");
+                        }
+                    }
+
+                    if (!Strings.isNullOrEmpty(mrHiveDictColumns.get(dictColumn))) {
+                        String[] cubePartion = mrHiveDictColumns.get(dictColumn).split("\\.");
+
+                        String refGlobalDictTable = cubePartion[0] + dictSuffix;
+                        String refDictColumn = cubePartion[1];
+
+                        dictHql.append("FROM " + flatTable + " a \n" + "LEFT OUTER JOIN \n" + "( \n"
+                                + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + refGlobalDictTable
+                                + " WHERE dict_column = '" + refDictColumn + "' \n" + ") b \n" + " ON a."
+                                + JoinedFlatTable.colName(dictColumnRef) + " = b.dict_key;");
+                        dictHqlMap.put(dictColumn, dictHql.toString());
+                    }else {
+                        dictHql.append("FROM " + flatTable + " a \n" + "LEFT OUTER JOIN \n" + "( \n"
+                                + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + globalDictTable
+                                + " WHERE dict_column = '" + dictColumn + "' \n" + ") b \n" + " ON a."
+                                + JoinedFlatTable.colName(dictColumnRef) + " = b.dict_key;");
+                    }
+                    dictHqlMap.put(dictColumn, dictHql.toString());
+                } catch (Exception e) {
+                    logger.error("", e);
+                }
+            }
+            String set = "set hive.exec.compress.output=false; set hive.mapred.mode=unstrict;";
+            CreateMrHiveDictStep step = new CreateMrHiveDictStep();
+            step.setInitStatement(hiveInitStatements + set + addPartition);
+            step.setCreateTableStatementMap(dictHqlMap);
+            step.setIsUnLock(true);
+            step.setLockPathName(cubeName);
+            //toDo Fix distributed concurrency lock bug
+            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+            step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL);
+            return step;
+        }
+
         protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
             final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
             final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);