You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/05/30 10:25:53 UTC
[kylin] 06/11: KYLIN-4367 Build Global Dict by MR/Hive,
Replace intermediate table Step implementation
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 5addc9da885a606ca1f236261df1c314bdb13e0f
Author: wangxiaojing <wa...@didichuxing.com>
AuthorDate: Wed May 6 14:43:45 2020 +0800
KYLIN-4367 Build Global Dict by MR/Hive, Replace intermediate
table Step implementation
---
.../apache/kylin/source/hive/HiveInputBase.java | 69 +++++++++++++++++++++-
1 file changed, 68 insertions(+), 1 deletion(-)
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 8bad023..624c8f9 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -30,6 +30,7 @@ import java.util.Set;
import java.util.Locale;
import java.util.Collections;
+import org.apache.kylin.shaded.com.google.common.base.Strings;
import org.apache.kylin.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -147,7 +148,13 @@ public class HiveInputBase {
dictRef.put(item, "");
}
}
- //toDo add replace step
+
+ //replace step
+ if(dictRef.size()>0) {
+ jobFlow.addTask(createMrHiveGlobalDictReplaceStep(flatDesc, hiveInitStatements, cubeName,
+ dictRef, flatTableDatabase, globalDictDatabase, globalDictTable, dictConfig.getMrHiveDictTableSuffix(), jobFlow.getId()));
+ }
+
}
protected void addStepPhase1_DoCreateMrHiveGlobalDict(DefaultChainedExecutable jobFlow,
@@ -244,6 +251,66 @@ public class HiveInputBase {
return step;
}
+ protected static AbstractExecutable createMrHiveGlobalDictReplaceStep(IJoinedFlatTableDesc flatDesc, String hiveInitStatements, String cubeName, Map<String, String> mrHiveDictColumns, String flatTableDatabase, String globalDictDatabase, String globalDictTable, String dictSuffix, String jobId) {
+ Map<String, String> dictHqlMap = new HashMap<>();
+ StringBuilder addPartition = new StringBuilder();
+ for (String dictColumn : mrHiveDictColumns.keySet()) {
+ StringBuilder dictHql = new StringBuilder();
+ TblColRef dictColumnRef = null;
+
+ String flatTable = flatTableDatabase + "." + flatDesc.getTableName();
+ // replace the flat table's dict column value
+ dictHql.append("INSERT OVERWRITE TABLE " + flatTable + " \n");
+ try {
+ dictHql.append("SELECT \n");
+ Integer flatTableColumnSize = flatDesc.getAllColumns().size();
+ for (int i = 0; i < flatTableColumnSize; i++) {
+ TblColRef tblColRef = flatDesc.getAllColumns().get(i);
+ if (i > 0) {
+ dictHql.append(",");
+ }
+ if (JoinedFlatTable.colName(tblColRef, flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) {
+ dictHql.append("b.dict_val \n");
+ dictColumnRef = tblColRef;
+ } else {
+ dictHql.append("a." + JoinedFlatTable.colName(tblColRef) + " \n");
+ }
+ }
+
+ if (!Strings.isNullOrEmpty(mrHiveDictColumns.get(dictColumn))) {
+ String[] cubePartion = mrHiveDictColumns.get(dictColumn).split("\\.");
+
+ String refGlobalDictTable = cubePartion[0] + dictSuffix;
+ String refDictColumn = cubePartion[1];
+
+ dictHql.append("FROM " + flatTable + " a \n" + "LEFT OUTER JOIN \n" + "( \n"
+ + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + refGlobalDictTable
+ + " WHERE dict_column = '" + refDictColumn + "' \n" + ") b \n" + " ON a."
+ + JoinedFlatTable.colName(dictColumnRef) + " = b.dict_key;");
+ dictHqlMap.put(dictColumn, dictHql.toString());
+ }else {
+ dictHql.append("FROM " + flatTable + " a \n" + "LEFT OUTER JOIN \n" + "( \n"
+ + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + globalDictTable
+ + " WHERE dict_column = '" + dictColumn + "' \n" + ") b \n" + " ON a."
+ + JoinedFlatTable.colName(dictColumnRef) + " = b.dict_key;");
+ }
+ dictHqlMap.put(dictColumn, dictHql.toString());
+ } catch (Exception e) {
+ logger.error("", e);
+ }
+ }
+ String set = "set hive.exec.compress.output=false; set hive.mapred.mode=unstrict;";
+ CreateMrHiveDictStep step = new CreateMrHiveDictStep();
+ step.setInitStatement(hiveInitStatements + set + addPartition);
+ step.setCreateTableStatementMap(dictHqlMap);
+ step.setIsUnLock(true);
+ step.setLockPathName(cubeName);
+ //toDo Fix distributed concurrency lock bug
+ CubingExecutableUtil.setCubeName(cubeName, step.getParams());
+ step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL);
+ return step;
+ }
+
protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);