You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/05/30 10:25:52 UTC
[kylin] 05/11: KYLIN-4366 Build Global Dict by MR/Hive,
Merge to dict table Step implementation
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 3e5058155a32ab4f4867290227ef5ec4bc2c6ba9
Author: wangxiaojing <wa...@didichuxing.com>
AuthorDate: Wed May 6 14:43:08 2020 +0800
KYLIN-4366 Build Global Dict by MR/Hive, Merge to dict table
Step implementation
---
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 5 +-
.../java/org/apache/kylin/engine/mr/IInput.java | 3 +
.../apache/kylin/source/hive/HiveInputBase.java | 82 +++++++++++++---------
.../apache/kylin/source/kafka/KafkaInputBase.java | 5 ++
4 files changed, 62 insertions(+), 33 deletions(-)
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index b62650a..8ec7d36 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -73,7 +73,10 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
result.addTask(createBuildGlobalHiveDicTotalBuildJob(jobId));
}
- //toDo merge global dic and replace flat table
+ //merge global dic and replace flat table
+ if(Objects.nonNull(dictConfig.getMrHiveDictColumns()) && dictConfig.getMrHiveDictColumns().length > 0 && !"".equals(dictConfig.getMrHiveDictColumns()[0])){
+ inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
+ }
// Phase 2: Build Dictionary
result.addTask(createFactDistinctColumnsStep(jobId));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
index 758b081..9fdb300 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
@@ -34,6 +34,9 @@ public interface IInput {
/** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
+ /** Add step that replace flat table global column value by global dic*/
+ public void addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable jobFlow);
+
/** Add step that does necessary clean up, like delete the intermediate flat table */
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 65b8dc6..8bad023 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -126,6 +126,30 @@ public class HiveInputBase {
addStepPhase1_DoMaterializeLookupTable(jobFlow);
}
+ @Override
+ public void addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable jobFlow) {
+ KylinConfig dictConfig = (flatDesc.getSegment()).getConfig();
+ final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+ String[] mrHiveDictColumnsExcludeRefCols = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
+ Map<String, String> dictRef = dictConfig.getMrHiveDictRefColumns();
+ final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+
+ String globalDictDatabase = dictConfig.getMrHiveDictDB();
+ if (null == globalDictDatabase) {
+ throw new IllegalArgumentException("Mr-Hive Global dict database is null.");
+ }
+ String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix();
+ if(Objects.nonNull(mrHiveDictColumnsExcludeRefCols) && mrHiveDictColumnsExcludeRefCols.length > 0) {
+ //merge to dict table step
+ jobFlow.addTask(createHiveGlobalDictMergeGlobalDict(flatDesc, hiveInitStatements, cubeName, mrHiveDictColumnsExcludeRefCols, globalDictDatabase, globalDictTable));
+
+ for (String item : mrHiveDictColumnsExcludeRefCols) {
+ dictRef.put(item, "");
+ }
+ }
+ //toDo add replace step
+ }
+
protected void addStepPhase1_DoCreateMrHiveGlobalDict(DefaultChainedExecutable jobFlow,
String[] mrHiveDictColumns, String globalDictDatabase, String globalDictTable) {
final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
@@ -180,49 +204,43 @@ public class HiveInputBase {
return step;
}
- protected static AbstractExecutable createMrHiveGlobalDictReplaceStep(IJoinedFlatTableDesc flatDesc,
- String hiveInitStatements, String hdfsWorkingDir, String cubeName, String[] mrHiveDictColumns,
- String flatTableDatabase, String globalDictDatabase, String globalDictTable) {
+ protected static AbstractExecutable createHiveGlobalDictMergeGlobalDict(IJoinedFlatTableDesc flatDesc,
+ String hiveInitStatements, String cubeName, String[] mrHiveDictColumns,
+ String globalDictDatabase, String globalDictTable) {
+
+ String globalDictItermediateTable = MRHiveDictUtil.getMRHiveFlatTableGlobalDictTableName(flatDesc);
+
+ StringBuffer addPartition = new StringBuffer();
+ Map<String, String> maxDictValMap = new HashMap<>();
Map<String, String> dictHqlMap = new HashMap<>();
for (String dictColumn : mrHiveDictColumns) {
- StringBuilder dictHql = new StringBuilder();
- TblColRef dictColumnRef = null;
-
- String flatTable = flatTableDatabase + "." + flatDesc.getTableName();
- // replace the flat table's dict column value
- dictHql.append("INSERT OVERWRITE TABLE " + flatTable + " \n");
try {
- dictHql.append("SELECT \n");
- Integer flatTableColumnSize = flatDesc.getAllColumns().size();
- for (int i = 0; i < flatTableColumnSize; i++) {
- TblColRef tblColRef = flatDesc.getAllColumns().get(i);
- if (i > 0) {
- dictHql.append(",");
- }
- if (JoinedFlatTable.colName(tblColRef, flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) {
- dictHql.append("b. dict_val \n");
- dictColumnRef = tblColRef;
- } else {
- dictHql.append("a." + JoinedFlatTable.colName(tblColRef) + " \n");
- }
- }
- dictHql.append("FROM " + flatTable + " a \n" + "LEFT OUTER JOIN \n" + "( \n"
- + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + globalDictTable
- + " WHERE dict_column = '" + dictColumn + "' \n" + ") b \n" + " ON a."
- + JoinedFlatTable.colName(dictColumnRef) + " = b.dict_key;");
- dictHqlMap.put(dictColumn, dictHql.toString());
+ addPartition.append("alter table ").append(globalDictItermediateTable)
+ .append(" add IF NOT EXISTS partition (dict_column='").append(dictColumn)
+ .append("');").append(" \n");
+
+ String dictHql = "INSERT OVERWRITE TABLE " + globalDictDatabase + "." + globalDictTable + " \n"
+ + "PARTITION (dict_column = '" + dictColumn + "') \n"
+ + "SELECT dict_key, dict_val FROM "
+ + globalDictDatabase + "." + globalDictTable + " \n" + "WHERE dict_column = '" + dictColumn
+ + "' \n" + flatDesc.getDataModel().getConfig().getHiveUnionStyle() + " \n"
+ + "SELECT dict_key, dict_val FROM "
+ + globalDictItermediateTable + " \n" + " WHERE dict_column = '" + dictColumn + "' ;\n";
+ dictHqlMap.put(dictColumn, dictHql);
} catch (Exception e) {
logger.error("", e);
}
}
+ String hiveInitStatementForUnstrict = "set hive.mapred.mode=unstrict;";
CreateMrHiveDictStep step = new CreateMrHiveDictStep();
- step.setInitStatement(hiveInitStatements);
+ step.setInitStatement(hiveInitStatements + hiveInitStatementForUnstrict + addPartition);
step.setCreateTableStatementMap(dictHqlMap);
- step.setIsUnLock(true);
+ step.setMaxDictStatementMap(maxDictValMap);
+ step.setIsLock(false);
+ step.setIsUnLock(false);
step.setLockPathName(cubeName);
-
CubingExecutableUtil.setCubeName(cubeName, step.getParams());
- step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL);
+ step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL);
return step;
}
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
index d9e112c..5ba9082 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
@@ -88,6 +88,11 @@ public class KafkaInputBase {
}
}
+ @Override
+ public void addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable jobFlow) {
+ //do nothing
+ }
+
protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
return JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), jobFlow.getId());
}