You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/05/30 10:25:51 UTC
[kylin] 04/11: KYLIN-4344 Build Global Dict by MR/Hive,
Extract Fact Table Distinct Columns Step
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ad954b76a25fb70a14f4c5c034ae58fd78b538f2
Author: wangxiaojing <wa...@didichuxing.com>
AuthorDate: Wed May 6 14:42:19 2020 +0800
KYLIN-4344 Build Global Dict by MR/Hive, Extract Fact Table
Distinct Columns Step
---
.../apache/kylin/source/hive/HiveInputBase.java | 86 ++++++++--------------
.../apache/kylin/source/hive/MRHiveDictUtil.java | 74 ++++++++++++++-----
2 files changed, 84 insertions(+), 76 deletions(-)
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index c3f6b6f..65b8dc6 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -42,6 +42,7 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.IInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.spark.SparkCreatingFlatTable;
import org.apache.kylin.engine.spark.SparkExecutable;
@@ -97,8 +98,9 @@ public class HiveInputBase {
// create global dict
KylinConfig dictConfig = (flatDesc.getSegment()).getConfig();
- String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns();
- if (mrHiveDictColumns.length > 0) {
+ String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
+ if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0
+ && !"".equals(mrHiveDictColumns[0])) {
String globalDictDatabase = dictConfig.getMrHiveDictDB();
if (null == globalDictDatabase) {
throw new IllegalArgumentException("Mr-Hive Global dict database is null.");
@@ -128,81 +130,53 @@ public class HiveInputBase {
String[] mrHiveDictColumns, String globalDictDatabase, String globalDictTable) {
final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
- final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
- jobFlow.addTask(createMrHiveGlobalDictExtractStep(flatDesc, hiveInitStatements, jobWorkingDir, cubeName,
- mrHiveDictColumns, globalDictDatabase, globalDictTable));
- jobFlow.addTask(createMrHIveGlobalDictBuildStep(flatDesc, hiveInitStatements, hdfsWorkingDir, cubeName,
- mrHiveDictColumns, flatTableDatabase, globalDictDatabase, globalDictTable));
- jobFlow.addTask(createMrHiveGlobalDictReplaceStep(flatDesc, hiveInitStatements, hdfsWorkingDir, cubeName,
- mrHiveDictColumns, flatTableDatabase, globalDictDatabase, globalDictTable));
+ //Crete tables for global dict and extract distinct value
+ jobFlow.addTask(createMrHiveGlobalDictExtractStep(flatDesc, hiveInitStatements, cubeName,
+ mrHiveDictColumns, globalDictDatabase, globalDictTable, jobFlow.getId()));
+
}
protected static AbstractExecutable createMrHiveGlobalDictExtractStep(IJoinedFlatTableDesc flatDesc,
- String hiveInitStatements, String jobWorkingDir, String cubeName, String[] mrHiveDictColumns,
- String globalDictDatabase, String globalDictTable) {
+ String hiveInitStatements, String cubeName, String[] mrHiveDictColumns,
+ String globalDictDatabase, String globalDictTable, String jobId) {
// Firstly, determine if the global dict hive table of cube is exists.
String createGlobalDictTableHql = "CREATE TABLE IF NOT EXISTS " + globalDictDatabase + "." + globalDictTable
+ "\n" + "( dict_key STRING COMMENT '', \n" + "dict_val INT COMMENT '' \n" + ") \n"
- + "COMMENT '' \n" + "PARTITIONED BY (dict_column string) \n" + "STORED AS TEXTFILE; \n";
+ + "COMMENT '' \n" + "PARTITIONED BY (dict_column string) \n" + " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' \n" + "STORED AS TEXTFILE; \n";
final String dropDictIntermediateTableHql = MRHiveDictUtil.generateDropTableStatement(flatDesc);
final String createDictIntermediateTableHql = MRHiveDictUtil.generateCreateTableStatement(flatDesc);
+ final String groupByTable = flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
+ final String globalDictIntermediateTable = MRHiveDictUtil.getMRHiveFlatTableGlobalDictTableName(flatDesc);
+ final String dropGlobalDictIntermediateTableHql = MRHiveDictUtil.generateDropTableStatement(globalDictIntermediateTable);
+ final String createGlobalDictIntermediateTableHql = MRHiveDictUtil.generateCreateGlobalDicIntermediateTableStatement(globalDictIntermediateTable);
+
+ String maxAndDistinctCountSql = "INSERT OVERWRITE TABLE " + groupByTable + " PARTITION (DICT_COLUMN = '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "') "
+ + "\n" + "SELECT CONCAT_WS(',', tc.dict_column, cast(tc.total_distinct_val AS String), if(tm.max_dict_val is null, '0', cast(max_dict_val as string))) "
+ + "\n" + "FROM ("
+ + "\n" + " SELECT dict_column,count(1) total_distinct_val FROM "
+ + "\n" + groupByTable + " where DICT_COLUMN != '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "' group by dict_column) tc "
+ + "\n" + "LEFT JOIN (\n"
+ + "\n" + " SELECT dict_column,if(max(dict_val) is null, 0, max(dict_val)) as max_dict_val FROM "
+ + "\n" + globalDictDatabase + "." + globalDictTable + " group by dict_column) tm "
+ + "\n" + "ON tc.dict_column = tm.dict_column;";
StringBuilder insertDataToDictIntermediateTableSql = new StringBuilder();
for (String dictColumn : mrHiveDictColumns) {
insertDataToDictIntermediateTableSql
- .append(MRHiveDictUtil.generateInsertDataStatement(flatDesc, dictColumn));
+ .append(MRHiveDictUtil.generateInsertDataStatement(flatDesc, dictColumn, globalDictDatabase, globalDictTable));
}
-
+ String set = "set hive.exec.compress.output=false;set hive.mapred.mode=unstrict;";
CreateMrHiveDictStep step = new CreateMrHiveDictStep();
step.setInitStatement(hiveInitStatements);
- step.setCreateTableStatement(createGlobalDictTableHql + dropDictIntermediateTableHql
- + createDictIntermediateTableHql + insertDataToDictIntermediateTableSql.toString());
+ step.setCreateTableStatement(set + createGlobalDictTableHql + dropDictIntermediateTableHql
+ + createDictIntermediateTableHql + dropGlobalDictIntermediateTableHql + createGlobalDictIntermediateTableHql + insertDataToDictIntermediateTableSql.toString() + maxAndDistinctCountSql);
CubingExecutableUtil.setCubeName(cubeName, step.getParams());
step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL);
- return step;
- }
-
- protected static AbstractExecutable createMrHIveGlobalDictBuildStep(IJoinedFlatTableDesc flatDesc,
- String hiveInitStatements, String hdfsWorkingDir, String cubeName, String[] mrHiveDictColumns,
- String flatTableDatabase, String globalDictDatabase, String globalDictTable) {
- String flatTable = flatTableDatabase + "."
- + MRHiveDictUtil.getHiveTableName(flatDesc, MRHiveDictUtil.DictHiveType.GroupBy);
- Map<String, String> maxDictValMap = new HashMap<>();
- Map<String, String> dictHqlMap = new HashMap<>();
-
- for (String dictColumn : mrHiveDictColumns) {
- // get dict max value
- String maxDictValHql = "SELECT if(max(dict_val) is null,0,max(dict_val)) as max_dict_val \n" + " FROM "
- + globalDictDatabase + "." + globalDictTable + " \n" + " WHERE dict_column = '" + dictColumn
- + "' \n";
- maxDictValMap.put(dictColumn, maxDictValHql);
- try {
- String dictHql = "INSERT OVERWRITE TABLE " + globalDictDatabase + "." + globalDictTable + " \n"
- + "PARTITION (dict_column = '" + dictColumn + "') \n" + "SELECT dict_key, dict_val FROM "
- + globalDictDatabase + "." + globalDictTable + " \n" + "WHERE dict_column = '" + dictColumn
- + "' \n" + flatDesc.getDataModel().getConfig().getHiveUnionStyle()
- + "\nSELECT a.dict_key as dict_key, (row_number() over(order by a.dict_key asc)) + (___maxDictVal___) as dict_val \n"
- + "FROM \n" + "( \n" + " SELECT dict_key FROM " + flatTable + " WHERE dict_column = '"
- + dictColumn + "' AND dict_key is not null \n" + ") a \n" + "LEFT JOIN \n" + "( \n"
- + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + globalDictTable
- + " WHERE dict_column = '" + dictColumn + "' \n" + ") b \n"
- + "ON a.dict_key = b.dict_key \n" + "WHERE b.dict_val is null; \n";
- dictHqlMap.put(dictColumn, dictHql);
- } catch (Exception e) {
- logger.error("", e);
- }
- }
- String hiveInitStatementForUnstrict = "set hive.mapred.mode=unstrict;";
- CreateMrHiveDictStep step = new CreateMrHiveDictStep();
- step.setInitStatement(hiveInitStatements + hiveInitStatementForUnstrict);
- step.setCreateTableStatementMap(dictHqlMap);
- step.setMaxDictStatementMap(maxDictValMap);
step.setIsLock(true);
+ step.setIsUnLock(false);
step.setLockPathName(cubeName);
- CubingExecutableUtil.setCubeName(cubeName, step.getParams());
- step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL);
return step;
}
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
index 804a183..fd2d103 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
@@ -60,14 +60,22 @@ public class MRHiveDictUtil {
public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) {
StringBuilder ddl = new StringBuilder();
- String table = getHiveTableName(flatDesc, DictHiveType.GroupBy);
+ String table = flatDesc.getTableName()
+ + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
ddl.append("DROP TABLE IF EXISTS " + table + ";").append(" \n");
return ddl.toString();
}
+ public static String generateDropTableStatement(String tableName) {
+ StringBuilder ddl = new StringBuilder();
+ ddl.append("DROP TABLE IF EXISTS " + tableName + ";").append(" \n");
+ return ddl.toString();
+ }
+
public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc) {
StringBuilder ddl = new StringBuilder();
- String table = getHiveTableName(flatDesc, DictHiveType.GroupBy);
+ String table = flatDesc.getTableName()
+ + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
ddl.append("CREATE TABLE IF NOT EXISTS " + table + " \n");
ddl.append("( \n ");
@@ -75,16 +83,32 @@ public class MRHiveDictUtil {
ddl.append(") \n");
ddl.append("COMMENT '' \n");
ddl.append("PARTITIONED BY (dict_column string) \n");
- ddl.append("STORED AS SEQUENCEFILE \n");
+ ddl.append("STORED AS TEXTFILE \n");
+ ddl.append(";").append("\n");
+ return ddl.toString();
+ }
+
+ public static String generateCreateGlobalDicIntermediateTableStatement(String globalTableName) {
+ StringBuilder ddl = new StringBuilder();
+
+ ddl.append("CREATE TABLE IF NOT EXISTS " + globalTableName + " \n");
+ ddl.append("( \n ");
+ ddl.append("dict_key" + " " + "STRING" + " COMMENT '' , \n");
+ ddl.append("dict_val" + " " + "STRING" + " COMMENT '' \n");
+ ddl.append(") \n");
+ ddl.append("COMMENT '' \n");
+ ddl.append("PARTITIONED BY (dict_column string) \n");
+ ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' \n");
+ ddl.append("STORED AS TEXTFILE \n");
ddl.append(";").append("\n");
return ddl.toString();
}
- public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, String dictColumn) {
- String table = getHiveTableName(flatDesc, DictHiveType.GroupBy);
+ public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, String dictColumn, String globalDictDatabase, String globalDictTable) {
+ String table = getMRHiveFlatTableGroupBytableName(flatDesc);
StringBuilder sql = new StringBuilder();
- sql.append("SELECT" + "\n");
+ sql.append("SELECT a.DICT_KEY FROM (" + "\n");
int index = 0;
for (TblColRef tblColRef : flatDesc.getAllColumns()) {
@@ -95,32 +119,34 @@ public class MRHiveDictUtil {
}
if (index == flatDesc.getAllColumns().size()) {
- String msg = "Can not find correct column for " + dictColumn + ", please check 'kylin.dictionary.mr-hive.columns'";
+ String msg = "Can not find correct column for " + dictColumn
+ + ", please check 'kylin.dictionary.mr-hive.columns'";
logger.error(msg);
throw new IllegalArgumentException(msg);
}
-
+ sql.append(" SELECT " + "\n");
TblColRef col = flatDesc.getAllColumns().get(index);
- sql.append(JoinedFlatTable.colName(col) + " \n");
+ sql.append(JoinedFlatTable.colName(col) + " as DICT_KEY \n");
MRHiveDictUtil.appendJoinStatement(flatDesc, sql);
//group by
sql.append("GROUP BY ");
- sql.append(JoinedFlatTable.colName(col) + " \n");
-
- return "INSERT OVERWRITE TABLE " + table + " \n"
- + "PARTITION (dict_column = '" + dictColumn + "')" + " \n"
+ sql.append(JoinedFlatTable.colName(col) + ") a \n");
+
+ //join
+ sql.append(" LEFT JOIN \n");
+ sql.append("(SELECT DICT_KEY FROM ");
+ sql.append(globalDictDatabase).append(".").append(globalDictTable);
+ sql.append(" WHERE DICT_COLUMN = '" + dictColumn + "'");
+ sql.append(") b \n");
+ sql.append("ON a.DICT_KEY = b.DICT_KEY \n");
+ sql.append("WHERE b.DICT_KEY IS NULL \n");
+
+ return "INSERT OVERWRITE TABLE " + table + " \n" + "PARTITION (dict_column = '" + dictColumn + "')" + " \n"
+ sql + ";\n";
}
- public static String getHiveTableName(IJoinedFlatTableDesc flatDesc, DictHiveType dictHiveType) {
- StringBuffer table = new StringBuffer(flatDesc.getTableName());
- table.append("__");
- table.append(dictHiveType.getName());
- return table.toString();
- }
-
public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql) {
sql.append("FROM " + flatDesc.getTableName() + "\n");
}
@@ -155,6 +181,14 @@ public class MRHiveDictUtil {
executableManager.addJobInfo(jobId, info);
}
+ public static String getMRHiveFlatTableGroupBytableName(IJoinedFlatTableDesc flatDesc) {
+ return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
+ }
+
+ public static String getMRHiveFlatTableGlobalDictTableName(IJoinedFlatTableDesc flatDesc) {
+ return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictTableSuffix();
+ }
+
private static long getFileSize(String hdfsUrl) throws IOException {
Configuration configuration = new Configuration();
Path path = new Path(hdfsUrl);