You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/05/30 10:25:51 UTC

[kylin] 04/11: KYLIN-4344 Build Global Dict by MR/Hive, Extract Fact Table Distinct Columns Step

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit ad954b76a25fb70a14f4c5c034ae58fd78b538f2
Author: wangxiaojing <wa...@didichuxing.com>
AuthorDate: Wed May 6 14:42:19 2020 +0800

    KYLIN-4344 Build Global Dict by MR/Hive, Extract Fact Table
     Distinct Columns Step
---
 .../apache/kylin/source/hive/HiveInputBase.java    | 86 ++++++++--------------
 .../apache/kylin/source/hive/MRHiveDictUtil.java   | 74 ++++++++++++++-----
 2 files changed, 84 insertions(+), 76 deletions(-)

diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index c3f6b6f..65b8dc6 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -42,6 +42,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.IInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.spark.SparkCreatingFlatTable;
 import org.apache.kylin.engine.spark.SparkExecutable;
@@ -97,8 +98,9 @@ public class HiveInputBase {
 
             // create global dict
             KylinConfig dictConfig = (flatDesc.getSegment()).getConfig();
-            String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns();
-            if (mrHiveDictColumns.length > 0) {
+            String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
+            if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0
+                    && !"".equals(mrHiveDictColumns[0])) {
                 String globalDictDatabase = dictConfig.getMrHiveDictDB();
                 if (null == globalDictDatabase) {
                     throw new IllegalArgumentException("Mr-Hive Global dict database is null.");
@@ -128,81 +130,53 @@ public class HiveInputBase {
                 String[] mrHiveDictColumns, String globalDictDatabase, String globalDictTable) {
             final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
             final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
 
-            jobFlow.addTask(createMrHiveGlobalDictExtractStep(flatDesc, hiveInitStatements, jobWorkingDir, cubeName,
-                    mrHiveDictColumns, globalDictDatabase, globalDictTable));
-            jobFlow.addTask(createMrHIveGlobalDictBuildStep(flatDesc, hiveInitStatements, hdfsWorkingDir, cubeName,
-                    mrHiveDictColumns, flatTableDatabase, globalDictDatabase, globalDictTable));
-            jobFlow.addTask(createMrHiveGlobalDictReplaceStep(flatDesc, hiveInitStatements, hdfsWorkingDir, cubeName,
-                    mrHiveDictColumns, flatTableDatabase, globalDictDatabase, globalDictTable));
+            //Crete tables for global dict and extract distinct value
+            jobFlow.addTask(createMrHiveGlobalDictExtractStep(flatDesc, hiveInitStatements, cubeName,
+                    mrHiveDictColumns, globalDictDatabase, globalDictTable, jobFlow.getId()));
+
         }
 
         protected static AbstractExecutable createMrHiveGlobalDictExtractStep(IJoinedFlatTableDesc flatDesc,
-                String hiveInitStatements, String jobWorkingDir, String cubeName, String[] mrHiveDictColumns,
-                String globalDictDatabase, String globalDictTable) {
+                String hiveInitStatements, String cubeName, String[] mrHiveDictColumns,
+                String globalDictDatabase, String globalDictTable, String jobId) {
             // Firstly, determine if the global dict hive table of cube is exists.
             String createGlobalDictTableHql = "CREATE TABLE IF NOT EXISTS " + globalDictDatabase + "." + globalDictTable
                     + "\n" + "( dict_key STRING COMMENT '', \n" + "dict_val INT COMMENT '' \n" + ") \n"
-                    + "COMMENT '' \n" + "PARTITIONED BY (dict_column string) \n" + "STORED AS TEXTFILE; \n";
+                    + "COMMENT '' \n" + "PARTITIONED BY (dict_column string) \n" + " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' \n" + "STORED AS TEXTFILE; \n";
 
             final String dropDictIntermediateTableHql = MRHiveDictUtil.generateDropTableStatement(flatDesc);
             final String createDictIntermediateTableHql = MRHiveDictUtil.generateCreateTableStatement(flatDesc);
+            final String groupByTable = flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
+            final String globalDictIntermediateTable = MRHiveDictUtil.getMRHiveFlatTableGlobalDictTableName(flatDesc);
+            final String dropGlobalDictIntermediateTableHql = MRHiveDictUtil.generateDropTableStatement(globalDictIntermediateTable);
+            final String createGlobalDictIntermediateTableHql = MRHiveDictUtil.generateCreateGlobalDicIntermediateTableStatement(globalDictIntermediateTable);
+
+            String maxAndDistinctCountSql = "INSERT OVERWRITE TABLE  " + groupByTable + " PARTITION (DICT_COLUMN = '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "') "
+                    + "\n" + "SELECT  CONCAT_WS(',', tc.dict_column, cast(tc.total_distinct_val AS String), if(tm.max_dict_val is null, '0', cast(max_dict_val as string))) "
+                    + "\n" + "FROM ("
+                    + "\n" + "    SELECT  dict_column,count(1) total_distinct_val FROM "
+                    + "\n" + groupByTable + " where DICT_COLUMN != '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "' group by dict_column) tc "
+                    + "\n" + "LEFT JOIN (\n"
+                    + "\n" + "    SELECT  dict_column,if(max(dict_val) is null, 0, max(dict_val)) as max_dict_val FROM "
+                    + "\n" + globalDictDatabase + "." + globalDictTable + " group by dict_column) tm "
+                    + "\n" + "ON  tc.dict_column = tm.dict_column;";
 
             StringBuilder insertDataToDictIntermediateTableSql = new StringBuilder();
             for (String dictColumn : mrHiveDictColumns) {
                 insertDataToDictIntermediateTableSql
-                        .append(MRHiveDictUtil.generateInsertDataStatement(flatDesc, dictColumn));
+                        .append(MRHiveDictUtil.generateInsertDataStatement(flatDesc, dictColumn, globalDictDatabase, globalDictTable));
             }
-
+            String set = "set hive.exec.compress.output=false;set hive.mapred.mode=unstrict;";
             CreateMrHiveDictStep step = new CreateMrHiveDictStep();
             step.setInitStatement(hiveInitStatements);
-            step.setCreateTableStatement(createGlobalDictTableHql + dropDictIntermediateTableHql
-                    + createDictIntermediateTableHql + insertDataToDictIntermediateTableSql.toString());
+            step.setCreateTableStatement(set + createGlobalDictTableHql + dropDictIntermediateTableHql
+                    + createDictIntermediateTableHql + dropGlobalDictIntermediateTableHql + createGlobalDictIntermediateTableHql + insertDataToDictIntermediateTableSql.toString() + maxAndDistinctCountSql);
             CubingExecutableUtil.setCubeName(cubeName, step.getParams());
             step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL);
-            return step;
-        }
-
-        protected static AbstractExecutable createMrHIveGlobalDictBuildStep(IJoinedFlatTableDesc flatDesc,
-                String hiveInitStatements, String hdfsWorkingDir, String cubeName, String[] mrHiveDictColumns,
-                String flatTableDatabase, String globalDictDatabase, String globalDictTable) {
-            String flatTable = flatTableDatabase + "."
-                    + MRHiveDictUtil.getHiveTableName(flatDesc, MRHiveDictUtil.DictHiveType.GroupBy);
-            Map<String, String> maxDictValMap = new HashMap<>();
-            Map<String, String> dictHqlMap = new HashMap<>();
-
-            for (String dictColumn : mrHiveDictColumns) {
-                // get dict max value
-                String maxDictValHql = "SELECT if(max(dict_val) is null,0,max(dict_val)) as max_dict_val \n" + " FROM "
-                        + globalDictDatabase + "." + globalDictTable + " \n" + " WHERE dict_column = '" + dictColumn
-                        + "' \n";
-                maxDictValMap.put(dictColumn, maxDictValHql);
-                try {
-                    String dictHql = "INSERT OVERWRITE TABLE " + globalDictDatabase + "." + globalDictTable + " \n"
-                            + "PARTITION (dict_column = '" + dictColumn + "') \n" + "SELECT dict_key, dict_val FROM "
-                            + globalDictDatabase + "." + globalDictTable + " \n" + "WHERE dict_column = '" + dictColumn
-                            + "' \n" + flatDesc.getDataModel().getConfig().getHiveUnionStyle()
-                            + "\nSELECT a.dict_key as dict_key, (row_number() over(order by a.dict_key asc)) + (___maxDictVal___) as dict_val \n"
-                            + "FROM \n" + "( \n" + " SELECT dict_key FROM " + flatTable + " WHERE dict_column = '"
-                            + dictColumn + "' AND dict_key is not null \n" + ") a \n" + "LEFT JOIN \n" + "( \n"
-                            + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + globalDictTable
-                            + " WHERE dict_column = '" + dictColumn + "' \n" + ") b \n"
-                            + "ON a.dict_key = b.dict_key \n" + "WHERE b.dict_val is null; \n";
-                    dictHqlMap.put(dictColumn, dictHql);
-                } catch (Exception e) {
-                    logger.error("", e);
-                }
-            }
-            String hiveInitStatementForUnstrict = "set hive.mapred.mode=unstrict;";
-            CreateMrHiveDictStep step = new CreateMrHiveDictStep();
-            step.setInitStatement(hiveInitStatements + hiveInitStatementForUnstrict);
-            step.setCreateTableStatementMap(dictHqlMap);
-            step.setMaxDictStatementMap(maxDictValMap);
             step.setIsLock(true);
+            step.setIsUnLock(false);
             step.setLockPathName(cubeName);
-            CubingExecutableUtil.setCubeName(cubeName, step.getParams());
-            step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL);
             return step;
         }
 
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
index 804a183..fd2d103 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
@@ -60,14 +60,22 @@ public class MRHiveDictUtil {
 
     public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) {
         StringBuilder ddl = new StringBuilder();
-        String table = getHiveTableName(flatDesc, DictHiveType.GroupBy);
+        String table = flatDesc.getTableName()
+                + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
         ddl.append("DROP TABLE IF EXISTS " + table + ";").append(" \n");
         return ddl.toString();
     }
 
+    public static String generateDropTableStatement(String tableName) {
+        StringBuilder ddl = new StringBuilder();
+        ddl.append("DROP TABLE IF EXISTS " + tableName + ";").append(" \n");
+        return ddl.toString();
+    }
+
     public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc) {
         StringBuilder ddl = new StringBuilder();
-        String table = getHiveTableName(flatDesc, DictHiveType.GroupBy);
+        String table = flatDesc.getTableName()
+                + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
 
         ddl.append("CREATE TABLE IF NOT EXISTS " + table + " \n");
         ddl.append("( \n ");
@@ -75,16 +83,32 @@ public class MRHiveDictUtil {
         ddl.append(") \n");
         ddl.append("COMMENT '' \n");
         ddl.append("PARTITIONED BY (dict_column string) \n");
-        ddl.append("STORED AS SEQUENCEFILE \n");
+        ddl.append("STORED AS TEXTFILE \n");
+        ddl.append(";").append("\n");
+        return ddl.toString();
+    }
+
+    public static String generateCreateGlobalDicIntermediateTableStatement(String globalTableName) {
+        StringBuilder ddl = new StringBuilder();
+
+        ddl.append("CREATE TABLE IF NOT EXISTS " + globalTableName + " \n");
+        ddl.append("( \n ");
+        ddl.append("dict_key" + " " + "STRING" + " COMMENT '' , \n");
+        ddl.append("dict_val" + " " + "STRING" + " COMMENT '' \n");
+        ddl.append(") \n");
+        ddl.append("COMMENT '' \n");
+        ddl.append("PARTITIONED BY (dict_column string) \n");
+        ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' \n");
+        ddl.append("STORED AS TEXTFILE \n");
         ddl.append(";").append("\n");
         return ddl.toString();
     }
 
-    public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, String dictColumn) {
-        String table = getHiveTableName(flatDesc, DictHiveType.GroupBy);
+    public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, String dictColumn, String globalDictDatabase, String globalDictTable) {
+        String table = getMRHiveFlatTableGroupBytableName(flatDesc);
 
         StringBuilder sql = new StringBuilder();
-        sql.append("SELECT" + "\n");
+        sql.append("SELECT a.DICT_KEY FROM (" + "\n");
 
         int index = 0;
         for (TblColRef tblColRef : flatDesc.getAllColumns()) {
@@ -95,32 +119,34 @@ public class MRHiveDictUtil {
         }
 
         if (index == flatDesc.getAllColumns().size()) {
-            String msg = "Can not find correct column for " + dictColumn + ", please check 'kylin.dictionary.mr-hive.columns'";
+            String msg = "Can not find correct column for " + dictColumn
+                    + ", please check 'kylin.dictionary.mr-hive.columns'";
             logger.error(msg);
             throw new IllegalArgumentException(msg);
         }
-
+        sql.append(" SELECT " + "\n");
         TblColRef col = flatDesc.getAllColumns().get(index);
-        sql.append(JoinedFlatTable.colName(col) + " \n");
+        sql.append(JoinedFlatTable.colName(col) + "  as DICT_KEY \n");
 
         MRHiveDictUtil.appendJoinStatement(flatDesc, sql);
 
         //group by
         sql.append("GROUP BY ");
-        sql.append(JoinedFlatTable.colName(col) + " \n");
-
-        return "INSERT OVERWRITE TABLE " + table + " \n"
-                + "PARTITION (dict_column = '" + dictColumn + "')" + " \n"
+        sql.append(JoinedFlatTable.colName(col) + ") a \n");
+
+        //join
+        sql.append(" LEFT JOIN \n");
+        sql.append("(SELECT  DICT_KEY FROM ");
+        sql.append(globalDictDatabase).append(".").append(globalDictTable);
+        sql.append(" WHERE DICT_COLUMN = '" + dictColumn + "'");
+        sql.append(") b \n");
+        sql.append("ON a.DICT_KEY = b.DICT_KEY \n");
+        sql.append("WHERE   b.DICT_KEY IS NULL \n");
+
+        return "INSERT OVERWRITE TABLE " + table + " \n" + "PARTITION (dict_column = '" + dictColumn + "')" + " \n"
                 + sql + ";\n";
     }
 
-    public static String getHiveTableName(IJoinedFlatTableDesc flatDesc, DictHiveType dictHiveType) {
-        StringBuffer table = new StringBuffer(flatDesc.getTableName());
-        table.append("__");
-        table.append(dictHiveType.getName());
-        return table.toString();
-    }
-
     public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql) {
         sql.append("FROM " + flatDesc.getTableName() + "\n");
     }
@@ -155,6 +181,14 @@ public class MRHiveDictUtil {
         executableManager.addJobInfo(jobId, info);
     }
 
+    public static String getMRHiveFlatTableGroupBytableName(IJoinedFlatTableDesc flatDesc) {
+        return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
+    }
+
+    public static String getMRHiveFlatTableGlobalDictTableName(IJoinedFlatTableDesc flatDesc) {
+        return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictTableSuffix();
+    }
+
     private static long getFileSize(String hdfsUrl) throws IOException {
         Configuration configuration = new Configuration();
         Path path = new Path(hdfsUrl);