You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/05/30 10:25:58 UTC
[kylin] 11/11: KYLIN-4342 Improve code smell
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 5731f43fcf350247e76fd7e36b0980d5cf9fc912
Author: XiaoxiangYu <hi...@126.com>
AuthorDate: Thu May 28 23:26:20 2020 +0800
KYLIN-4342 Improve code smell
---
.../org/apache/kylin/common/KylinConfigBase.java | 55 +++---
.../kylin/job/constant/ExecutableConstants.java | 12 +-
.../bitmap/BitmapIntersectValueAggFunc.java | 10 +-
.../kylin/measure/bitmap/BitmapMeasureType.java | 6 +-
.../apache/kylin/metadata/model/FunctionDesc.java | 4 +
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 53 +++---
.../java/org/apache/kylin/engine/mr/IInput.java | 24 ++-
.../apache/kylin/engine/mr/JobBuilderSupport.java | 30 +--
.../kylin/engine/mr/common/BaseCuboidBuilder.java | 8 +-
.../kylin/engine/mr/common/BatchConstants.java | 8 +-
.../mr/steps/BuildGlobalHiveDictPartBuildJob.java | 45 +++--
.../steps/BuildGlobalHiveDictPartBuildMapper.java | 13 +-
...va => BuildGlobalHiveDictPartBuildReducer.java} | 20 +-
...ava => BuildGlobalHiveDictPartPartitioner.java} | 17 +-
....java => BuildGlobalHiveDictTotalBuildJob.java} | 17 +-
.../steps/BuildGlobalHiveDictTotalBuildMapper.java | 93 +++++----
.../engine/spark/SparkBatchCubingJobBuilder2.java | 49 +++--
.../localmeta/cube_desc/ci_inner_join_cube.json | 3 +-
kubernetes/README.md | 12 +-
.../kylin/source/hive/CreateMrHiveDictStep.java | 60 ++----
.../apache/kylin/source/hive/HiveInputBase.java | 210 +++++++++++----------
.../apache/kylin/source/hive/MRHiveDictUtil.java | 140 +++++++++-----
22 files changed, 476 insertions(+), 413 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f7f73ac..3429963 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -14,7 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.common;
@@ -162,7 +162,6 @@ public abstract class KylinConfigBase implements Serializable {
}
/**
- *
* @param propertyKeys the collection of the properties; if null will return all properties
* @return properties which contained in propertyKeys
*/
@@ -581,21 +580,28 @@ public abstract class KylinConfigBase implements Serializable {
// ============================================================================
- // mr-hive dict
+ // Hive Global dictionary by mr/hive
// ============================================================================
public String[] getMrHiveDictColumns() {
String columnStr = getMrHiveDictColumnsStr();
- if (!columnStr.equals("")) {
+ if (!StringUtils.isEmpty(columnStr)) {
return columnStr.split(",");
}
return new String[0];
}
+ /**
+ * @return the hdfs path for Hive Global dictionary table
+ */
+ public String getHiveDatabaseDir() {
+ return this.getOptional("kylin.source.hive.databasedir", "");
+ }
+
public String[] getMrHiveDictColumnsExcludeRefColumns() {
String[] excludeRefCols = null;
String[] hiveDictColumns = getMrHiveDictColumns();
Map<String, String> refCols = getMrHiveDictRefColumns();
- if(Objects.nonNull(hiveDictColumns) && hiveDictColumns.length>0) {
+ if (Objects.nonNull(hiveDictColumns) && hiveDictColumns.length > 0) {
excludeRefCols = Arrays.stream(hiveDictColumns).filter(x -> !refCols.containsKey(x)).toArray(String[]::new);
}
return excludeRefCols;
@@ -603,40 +609,40 @@ public abstract class KylinConfigBase implements Serializable {
/**
* set kylin.dictionary.mr-hive.columns in Cube level config , value are the columns which want to use MR/Hive to build global dict ,
- * Format, tableAliasName_ColumnName, multiple columns separated by commas,eg KYLIN_SALES_BUYER_ID,KYLIN_SALES_SELLER_ID
- * @return if mr-hive dict not enabled, return "";
- * else return {TABLE_NAME}_{COLUMN_NAME1},{TABLE_NAME}_{COLUMN_NAME2}"
+ * Format, tableAliasName_ColumnName, multiple columns separated by comma,eg KYLIN_SALES_BUYER_ID,KYLIN_SALES_SELLER_ID
+ *
+ * @return if mr-hive dict not enabled, return "";
+ * else return {TABLE_NAME}_{COLUMN_NAME1},{TABLE_NAME}_{COLUMN_NAME2}"
*/
private String getMrHiveDictColumnsStr() {
return getOptional("kylin.dictionary.mr-hive.columns", "");
}
/**
- * @return The global dic reduce num per column. Default 2 per column.
+ * @return The global dic reduce num per column. Default 2 per column.
*/
public Integer[] getMrHiveDictColumnsReduceNumExcludeRefCols() {
String[] excludeRefCols = getMrHiveDictColumnsExcludeRefColumns();
- if(Objects.nonNull(excludeRefCols) && excludeRefCols.length>0) {
+ if (Objects.nonNull(excludeRefCols) && excludeRefCols.length > 0) {
String[] arr = null;
Map<String, Integer> colNum = new HashMap<>();
Integer[] reduceNumArr = new Integer[excludeRefCols.length];
String[] columnReduceNum = getMrHiveDictColumnsReduceNumStr().split(",");
- //change set columnReduceNum to map struct
try {
- for(int i=0;i<columnReduceNum.length;i++){
- if(!StringUtils.isBlank(columnReduceNum[i])) {
+ for (int i = 0; i < columnReduceNum.length; i++) {
+ if (!StringUtils.isBlank(columnReduceNum[i])) {
arr = columnReduceNum[i].split(":");
colNum.put(arr[0], Integer.parseInt(arr[1]));
}
}
- }catch (Exception e){
+ } catch (Exception e) {
logger.error("set kylin.dictionary.mr-hive.columns.reduce.num error {} , the value should like colAilasName:reduceNum,colAilasName:reduceNum", getMrHiveDictColumnsReduceNumStr());
}
for (int i = 0; i < excludeRefCols.length; i++) {
- reduceNumArr[i] = colNum.containsKey(excludeRefCols[i])?colNum.get(excludeRefCols[i]): DEFAULT_MR_HIVE_GLOBAL_DICT_REDUCE_NUM_PER_COLUMN;
+ reduceNumArr[i] = colNum.containsKey(excludeRefCols[i]) ? colNum.get(excludeRefCols[i]) : DEFAULT_MR_HIVE_GLOBAL_DICT_REDUCE_NUM_PER_COLUMN;
}
Arrays.asList(reduceNumArr).stream().forEach(x -> {
@@ -646,7 +652,7 @@ public abstract class KylinConfigBase implements Serializable {
});
return reduceNumArr;
- }else {
+ } else {
return null;
}
}
@@ -654,15 +660,13 @@ public abstract class KylinConfigBase implements Serializable {
/**
* Set kylin.dictionary.mr-hive.columns.reduce.num in Cube level config , value are the reduce number for global dict columns which are set in kylin.dictionary.mr-hive.columns.
* Format, tableAliasName_ColumnName:number, multiple columns separated by commas,eg KYLIN_SALES_BUYER_ID:5,KYLIN_SALES_SELLER_ID:3
- * @return
*/
private String getMrHiveDictColumnsReduceNumStr() {
return getOptional("kylin.dictionary.mr-hive.columns.reduce.num", "");
}
/**
- * MR/Hive global domain dic (reuse dict from other cube's MR/Hive global dic column)
- * @return
+ * MR/Hive global domain dictionary (reuse dict from other cube's MR/Hive global dic column)
*/
public Map<String, String> getMrHiveDictRefColumns() {
Map<String, String> result = new HashMap<>();
@@ -670,7 +674,7 @@ public abstract class KylinConfigBase implements Serializable {
if (!StringUtils.isEmpty(columnStr)) {
String[] pairs = columnStr.split(",");
for (String pair : pairs) {
- String [] infos = pair.split(":");
+ String[] infos = pair.split(":");
result.put(infos[0], infos[1]);
}
}
@@ -685,8 +689,8 @@ public abstract class KylinConfigBase implements Serializable {
return getOptional("kylin.dictionary.mr-hive.table.suffix", "_global_dict");
}
- public String getMrHiveDictIntermediateTTableSuffix() {
- return getOptional("kylin.dictionary.mr-hive.intermediate.table.suffix", "__group_by");
+ public String getMrHiveDistinctValueTableSuffix() {
+ return getOptional("kylin.dictionary.mr-hive.intermediate.table.suffix", "__distinct_value");
}
// ============================================================================
@@ -1100,9 +1104,6 @@ public abstract class KylinConfigBase implements Serializable {
return this.getOptional("kylin.source.hive.database-for-flat-table", DEFAULT);
}
- public String getHiveDatabaseDir() {
- return this.getOptional("kylin.source.hive.databasedir", "");
- }
public String getFlatTableStorageFormat() {
return this.getOptional("kylin.source.hive.flat-table-storage-format", "SEQUENCEFILE");
@@ -2326,7 +2327,7 @@ public abstract class KylinConfigBase implements Serializable {
return getPropertiesByPrefix("kylin.metrics.");
}
- public int printSampleEventRatio(){
+ public int printSampleEventRatio() {
String val = getOptional("kylin.metrics.kafka-sample-ratio", "10000");
return Integer.parseInt(val);
}
@@ -2558,7 +2559,7 @@ public abstract class KylinConfigBase implements Serializable {
return (getOptional("kylin.stream.event.timezone", ""));
}
- public boolean isAutoResubmitDiscardJob(){
+ public boolean isAutoResubmitDiscardJob() {
return Boolean.parseBoolean(getOptional("kylin.stream.auto-resubmit-after-discard-enabled", "true"));
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 8d4de09..576f4bf 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -84,12 +84,12 @@ public final class ExecutableConstants {
public static final String STEP_NAME_STREAMING_BUILD_BASE_CUBOID = "Build Base Cuboid Data For Streaming Job";
public static final String STEP_NAME_STREAMING_SAVE_DICTS = "Save Cube Dictionaries";
- // MR - Hive Dict
- public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL = "Build Global Dict - extract distinct value from data";
- public static final String STEP_NAME_GLOBAL_DICT_PART_BUILD_DICTVAL = "Build Global Dict - parallel part build";
- public static final String STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL = "Build Global Dict - parallel total build";
- public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL = "Build Global Dict - merge to dict table";
- public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = "Build Global Dict - replace intermediate table";
+ // Hive Global Dictionary built by MR
+ public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL = "Build Hive Global Dict - extract distinct value";
+ public static final String STEP_NAME_GLOBAL_DICT_PART_BUILD_DICTVAL = "Build Hive Global Dict - parallel part build";
+ public static final String STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL = "Build Hive Global Dict - parallel total build";
+ public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL = "Build Hive Global Dict - merge to dict table";
+ public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = "Build Hive Global Dict - replace intermediate table";
public static final String FLINK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY = "mergedict";
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectValueAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectValueAggFunc.java
index 7ec21b5..2ab4313 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectValueAggFunc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectValueAggFunc.java
@@ -14,7 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.measure.bitmap;
import java.util.List;
@@ -22,10 +22,7 @@ import java.util.List;
import org.apache.kylin.measure.ParamAsMeasureCount;
/**
- * BitmapIntersectDistinctCountAggFunc is an UDAF used for calculating the intersection of two or more bitmaps
- * Usage: intersect_count(columnToCount, columnToFilter, filterList)
- * Example: intersect_count(uuid, event, array['A', 'B', 'C']), meaning find the count of uuid in all A/B/C 3 bitmaps
- * requires an bitmap count distinct measure of uuid, and an dimension of event
+ *
*/
public class BitmapIntersectValueAggFunc implements ParamAsMeasureCount {
@@ -50,5 +47,4 @@ public class BitmapIntersectValueAggFunc implements ParamAsMeasureCount {
public static String result(RetentionPartialResult result) {
return result.valueResult();
}
-}
-
+}
\ No newline at end of file
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
index 70a64ea..9d95584 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
@@ -112,8 +112,8 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
int id;
TblColRef literalCol = measureDesc.getFunction().getParameter().getColRefs().get(0);
if (needDictionaryColumn(measureDesc.getFunction()) && dictionaryMap.containsKey(literalCol)) {
- Dictionary<String> dictionary = dictionaryMap.get(literalCol);
- id = dictionary.getIdFromValue(values[0]);
+ Dictionary<String> dictionary = dictionaryMap.get(literalCol);
+ id = dictionary.getIdFromValue(values[0]);
} else {
id = Integer.parseInt(values[0]);
}
@@ -153,6 +153,8 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
private boolean needDictionaryColumn(FunctionDesc functionDesc) {
DataType dataType = functionDesc.getParameter().getColRefs().get(0).getType();
if (functionDesc.isMrDict()) {
+ // If isMrDict set to true, it means related column has been
+ // encoded in previous step by Hive Global Dictionary
return false;
}
if (dataType.isIntegerFamily() && !dataType.isBigInt()) {
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index 93b4064..c4d002b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -88,6 +88,10 @@ public class FunctionDesc implements Serializable {
private DataType returnDataType;
private MeasureType<?> measureType;
private boolean isDimensionAsMetric = false;
+
+ /**
+ * The flag of Hive Global Dictionary for COUNT_DISTINCT
+ */
private boolean isMrDict = false;
public boolean isMrDict() {
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 8ec7d36..47f709d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -6,20 +6,21 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.engine.mr;
import java.util.List;
import java.util.Objects;
+
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.cube.cuboid.CuboidUtil;
@@ -59,24 +60,8 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
// Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
inputSide.addStepPhase1_CreateFlatTable(result);
- // build global dict
- KylinConfig dictConfig = seg.getConfig();
- String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
-
- if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0
- && !"".equals(mrHiveDictColumns[0])) {
-
- //parallel part build
- result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId));
-
- //parallel total build
- result.addTask(createBuildGlobalHiveDicTotalBuildJob(jobId));
- }
-
- //merge global dic and replace flat table
- if(Objects.nonNull(dictConfig.getMrHiveDictColumns()) && dictConfig.getMrHiveDictColumns().length > 0 && !"".equals(dictConfig.getMrHiveDictColumns()[0])){
- inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
- }
+ // Build global dictionary in distributed way
+ buildHiveGlobalDictionaryByMR(result, jobId);
// Phase 2: Build Dictionary
result.addTask(createFactDistinctColumnsStep(jobId));
@@ -106,7 +91,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext));
inputSide.addStepPhase4_Cleanup(result);
outputSide.addStepPhase4_Cleanup(result);
-
+
// Set the task priority if specified
result.setPriorityBasedOnPriorityOffset(priorityOffset);
result.getTasks().forEach(task -> task.setPriorityBasedOnPriorityOffset(priorityOffset));
@@ -216,6 +201,30 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
return ndCuboidStep;
}
+ /**
+ * Build hive global dictionary by MR and encode corresponding column into integer for flat table
+ */
+ protected void buildHiveGlobalDictionaryByMR(final CubingJob result, String jobId) {
+ KylinConfig dictConfig = seg.getConfig();
+ String[] mrHiveDictColumnExcludeRef = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
+ String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns();
+
+ if (Objects.nonNull(mrHiveDictColumnExcludeRef) && mrHiveDictColumnExcludeRef.length > 0
+ && !"".equals(mrHiveDictColumnExcludeRef[0])) {
+
+ // 1. parallel part build
+ result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId));
+
+ // 2. parallel total build
+ result.addTask(createBuildGlobalHiveDictTotalBuildJob(jobId));
+ }
+
+ // Merge new dictionary entry into global dictionary and replace/encode flat table
+ if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0 && !"".equals(mrHiveDictColumns[0])) {
+ inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
+ }
+ }
+
protected Class<? extends AbstractHadoopJob> getNDCuboidJob() {
return NDCuboidJob.class;
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
index 9fdb300..2775cb7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
@@ -24,26 +24,38 @@ import org.apache.kylin.metadata.model.ISegment;
public interface IInput {
- /** Return a helper to participate in batch cubing job flow. */
+ /**
+ * Return a helper to participate in batch cubing job flow.
+ */
public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
- /** Return a helper to participate in batch cubing merge job flow. */
+ /**
+ * Return a helper to participate in batch cubing merge job flow.
+ */
public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
public interface IBatchCubingInputSide {
- /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */
+ /**
+ * Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc
+ */
public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow);
- /** Add step that replace flat table global column value by global dic*/
+ /**
+ * An optional step that replace/encode flat table with Hive Global Dictionary
+ */
public void addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable jobFlow);
- /** Add step that does necessary clean up, like delete the intermediate flat table */
+ /**
+ * Add step that does necessary clean up, like delete the intermediate flat table
+ */
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
}
public interface IBatchMergeInputSide {
- /** Add step that executes before merge dictionary and before merge cube. */
+ /**
+ * Add step that executes before merge dictionary and before merge cube.
+ */
public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index a597279..479db86 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -6,15 +6,15 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.engine.mr;
@@ -37,7 +37,7 @@ import org.apache.kylin.engine.EngineFactory;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDicTotalBuildJob;
+import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDictTotalBuildJob;
import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDictPartBuildJob;
import org.apache.kylin.engine.mr.steps.CalculateStatsFromBaseCuboidJob;
import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
@@ -155,7 +155,7 @@ public class JobBuilderSupport {
}
public MapReduceExecutable createCalculateStatsFromBaseCuboid(String inputPath, String outputPath,
- CuboidModeEnum cuboidMode) {
+ CuboidModeEnum cuboidMode) {
MapReduceExecutable result = new MapReduceExecutable();
result.setName(ExecutableConstants.STEP_NAME_CALCULATE_STATS_FROM_BASE_CUBOID);
result.setMapReduceJobClass(CalculateStatsFromBaseCuboidJob.class);
@@ -224,10 +224,10 @@ public class JobBuilderSupport {
return result;
}
- public MapReduceExecutable createBuildGlobalHiveDicTotalBuildJob(String jobId) {
+ public MapReduceExecutable createBuildGlobalHiveDictTotalBuildJob(String jobId) {
MapReduceExecutable result = new MapReduceExecutable();
result.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL);
- result.setMapReduceJobClass(BuildGlobalHiveDicTotalBuildJob.class);
+ result.setMapReduceJobClass(BuildGlobalHiveDictTotalBuildJob.class);
StringBuilder cmd = new StringBuilder();
appendMapReduceParameters(cmd);
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
@@ -384,26 +384,26 @@ public class JobBuilderSupport {
}
public String getBuildGlobalHiveDicTotalBuildJobInputPath(String jobId) {
- return getBuildGlobalDictionaryBasePath(jobId)+"/part_sort";
+ return getBuildGlobalDictionaryBasePath(jobId) + "/part_sort";
}
public String getBuildGlobalDictionaryMaxDistinctCountPath(String jobId) {
KylinConfig conf = seg.getConfig();
String dbDir = conf.getHiveDatabaseDir();
IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg);
- String tableName = flatDesc.getTableName()+conf.getMrHiveDictIntermediateTTableSuffix();
- String outPut = dbDir+"/"+tableName+"/dict_column="+BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE;
+ String tableName = flatDesc.getTableName() + conf.getMrHiveDistinctValueTableSuffix();
+ String outPut = dbDir + "/" + tableName + "/dict_column=" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE;
return outPut;
}
public String getBuildGlobalDictionaryPartReduceStatsPathV2(String jobId) {
- return getBuildGlobalDictionaryBasePath(jobId)+ "/reduce_stats";
+ return getBuildGlobalDictionaryBasePath(jobId) + "/reduce_stats";
}
- public String getBuildGlobalDictionaryTotalOutput(KylinConfig config){
+ public String getBuildGlobalDictionaryTotalOutput(KylinConfig config) {
String dbDir = config.getHiveDatabaseDir();
- String tableName = EngineFactory.getJoinedFlatTableDesc(seg).getTableName()+config.getMrHiveDictTableSuffix();
- String path = dbDir+"/"+tableName;
+ String tableName = EngineFactory.getJoinedFlatTableDesc(seg).getTableName() + config.getMrHiveDictTableSuffix();
+ String path = dbDir + "/" + tableName;
return path;
}
@@ -509,7 +509,7 @@ public class JobBuilderSupport {
List<FileStatus> outputs = Lists.newArrayList();
scanFiles(input, fs, outputs);
long size = 0L;
- for (FileStatus stat: outputs) {
+ for (FileStatus stat : outputs) {
size += stat.getLen();
}
return size;
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
index aa377ed..baf415f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
@@ -73,7 +73,7 @@ public class BaseCuboidBuilder implements java.io.Serializable {
measureCodec = new BufferedMeasureCodec(measureDescList);
kvBuilder = new KeyValueBuilder(intermediateTableDesc);
- checkMrDictClolumn();
+ checkHiveGlobalDictionaryColumn();
}
public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment,
@@ -92,7 +92,7 @@ public class BaseCuboidBuilder implements java.io.Serializable {
measureCodec = new BufferedMeasureCodec(measureDescList);
kvBuilder = new KeyValueBuilder(intermediateTableDesc);
- checkMrDictClolumn();
+ checkHiveGlobalDictionaryColumn();
}
public byte[] buildKey(String[] flatRow) {
@@ -121,7 +121,7 @@ public class BaseCuboidBuilder implements java.io.Serializable {
}
}
- private void checkMrDictClolumn(){
+ private void checkHiveGlobalDictionaryColumn(){
Set<String> mrDictColumnSet = new HashSet<>();
if (kylinConfig.getMrHiveDictColumns() != null) {
Collections.addAll(mrDictColumnSet, kylinConfig.getMrHiveDictColumns());
@@ -133,7 +133,7 @@ public class BaseCuboidBuilder implements java.io.Serializable {
TblColRef colRef = functionDesc.getParameter().getColRefs().get(0);
if (mrDictColumnSet.contains(JoinedFlatTable.colName(colRef, true))) {
functionDesc.setMrDict(true);
- logger.info("setMrDict for {}", colRef);
+ logger.info("Enable hive global dictionary for {}", colRef);
measure.setFunction(functionDesc);
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 6031f3c..f8ab007 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -6,15 +6,15 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.engine.mr.common;
@@ -72,7 +72,7 @@ public interface BatchConstants {
String CFG_MR_SPARK_JOB = "mr.spark.job";
String CFG_SPARK_META_URL = "spark.meta.url";
String CFG_GLOBAL_DICT_BASE_DIR = "global.dict.base.dir";
- String CFG_GLOBAL_DICT_STATS_PARTITION_VALUE="KYLIN_MAX_DISTINCT_COUNT";
+ String CFG_GLOBAL_DICT_STATS_PARTITION_VALUE = "KYLIN_MAX_DISTINCT_COUNT";
String CFG_HLL_REDUCER_NUM = "cuboidHLLCounterReducerNum";
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java
index 07b0824..c51cd11 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java
@@ -19,6 +19,7 @@
package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
+
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
@@ -47,7 +48,7 @@ public class BuildGlobalHiveDictPartBuildJob extends AbstractHadoopJob {
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
- String[] dicColsArr=null;
+ String[] dicColsArr = null;
try {
options.addOption(OPTION_JOB_NAME);
@@ -78,7 +79,7 @@ public class BuildGlobalHiveDictPartBuildJob extends AbstractHadoopJob {
setJobClasspath(job, cube.getConfig());
//FileInputFormat.setInputPaths(job, input);
- setInputput(job, dicColsArr, getInputPath(config, segment));
+ setInput(job, dicColsArr, getInputPath(config, segment));
// make each reducer output to respective dir
setOutput(job, dicColsArr, getOptionValue(OPTION_OUTPUT_PATH));
@@ -94,15 +95,14 @@ public class BuildGlobalHiveDictPartBuildJob extends AbstractHadoopJob {
job.setOutputValueClass(Text.class);
job.setMapperClass(BuildGlobalHiveDictPartBuildMapper.class);
-
- job.setPartitionerClass(BuildGlobalHiveDicPartPartitioner.class);
- job.setReducerClass(BuildGlobalHiveDicPartBuildReducer.class);
+ job.setPartitionerClass(BuildGlobalHiveDictPartPartitioner.class);
+ job.setReducerClass(BuildGlobalHiveDictPartBuildReducer.class);
// prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
- //delete output
- Path baseOutputPath =new Path(getOptionValue(OPTION_OUTPUT_PATH));
+ // delete output
+ Path baseOutputPath = new Path(getOptionValue(OPTION_OUTPUT_PATH));
deletePath(job.getConfiguration(), baseOutputPath);
attachSegmentMetadataWithDict(segment, job.getConfiguration());
@@ -113,44 +113,41 @@ public class BuildGlobalHiveDictPartBuildJob extends AbstractHadoopJob {
}
}
- private void setOutput(Job job, String[] dicColsArry, String outputBase){
+ private void setOutput(Job job, String[] dicColsArr, String outputBase) {
// make each reducer output to respective dir
- //eg: /user/kylin/tmp/kylin/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/kylin_sales_cube_mr/dict_column=KYLIN_SALES_SELLER_ID/part_sort
- for(int i=0;i<dicColsArry.length;i++){
+ // eg: /user/kylin/tmp/kylin/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/kylin_sales_cube_mr/dict_column=KYLIN_SALES_SELLER_ID/part_sort
+ for (int i = 0; i < dicColsArr.length; i++) {
MultipleOutputs.addNamedOutput(job, i + "", TextOutputFormat.class, LongWritable.class, Text.class);
}
- Path outputPath=new Path(outputBase);
+ Path outputPath = new Path(outputBase);
FileOutputFormat.setOutputPath(job, outputPath);
}
- private void setInputput(Job job, String[] dicColsArray, String inputBase) throws IOException {
- StringBuffer paths=new StringBuffer();
+ private void setInput(Job job, String[] dicColsArray, String inputBase) throws IOException {
+ StringBuffer paths = new StringBuffer();
// make each reducer output to respective dir
- for(String col:dicColsArray){
+ for (String col : dicColsArray) {
paths.append(inputBase).append("/dict_column=").append(col).append(",");
}
-
paths.delete(paths.length() - 1, paths.length());
FileInputFormat.setInputPaths(job, paths.toString());
-
}
- private void setReduceNum(Job job, KylinConfig config){
+ private void setReduceNum(Job job, KylinConfig config) {
Integer[] reduceNumArr = config.getMrHiveDictColumnsReduceNumExcludeRefCols();
int totalReduceNum = 0;
- for(Integer num:reduceNumArr){
- totalReduceNum +=num;
+ for (Integer num : reduceNumArr) {
+ totalReduceNum += num;
}
logger.info("BuildGlobalHiveDictPartBuildJob total reduce num is {}", totalReduceNum);
job.setNumReduceTasks(totalReduceNum);
}
- private String getInputPath(KylinConfig config, CubeSegment segment){
+ private String getInputPath(KylinConfig config, CubeSegment segment) {
String dbDir = config.getHiveDatabaseDir();
- String tableName = EngineFactory.getJoinedFlatTableDesc(segment).getTableName()+config.getMrHiveDictIntermediateTTableSuffix();
- String input = dbDir+"/"+tableName;
- logger.info("part build base input path:"+input);
+ String tableName = EngineFactory.getJoinedFlatTableDesc(segment).getTableName() + config.getMrHiveDistinctValueTableSuffix();
+ String input = dbDir + "/" + tableName;
+ logger.info("part build base input path:" + input);
return input;
}
-
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java
index 54708f3..76c73f9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
import java.nio.ByteBuffer;
+
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -62,12 +63,12 @@ public class BuildGlobalHiveDictPartBuildMapper<KEYIN, Object> extends KylinMapp
String colName = name.split("=")[1];
logger.info("this map build col name :{}", colName);
- for(int i=0;i<dicCols.length;i++){
- if(dicCols[i].equalsIgnoreCase(colName)){
- colIndex=i;
+ for (int i = 0; i < dicCols.length; i++) {
+ if (dicCols[i].equalsIgnoreCase(colName)) {
+ colIndex = i;
}
}
- if(colIndex<0 || colIndex>127){
+ if (colIndex < 0 || colIndex > 127) {
logger.error("kylin.dictionary.mr-hive.columns colIndex :{} error ", colIndex);
logger.error("kylin.dictionary.mr-hive.columns set error,mr-hive columns's count should less than 128");
}
@@ -77,7 +78,7 @@ public class BuildGlobalHiveDictPartBuildMapper<KEYIN, Object> extends KylinMapp
@Override
public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException {
- count ++;
+ count++;
writeFieldValue(context, key.toString());
}
@@ -94,7 +95,7 @@ public class BuildGlobalHiveDictPartBuildMapper<KEYIN, Object> extends KylinMapp
tmpbuf.put(valueBytes);
outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
context.write(outputKey, NullWritable.get());
- if(count<10){
+ if (count < 10) {
logger.info("colIndex:{},input key:{}", colIndex, value);
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildReducer.java
similarity index 86%
rename from engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java
rename to engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildReducer.java
index 8cdd8f1..54cd4b9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildReducer.java
@@ -14,11 +14,12 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
@@ -31,11 +32,11 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BuildGlobalHiveDicPartBuildReducer extends KylinReducer<Text, LongWritable, LongWritable, Text> {
+public class BuildGlobalHiveDictPartBuildReducer extends KylinReducer<Text, LongWritable, LongWritable, Text> {
- private static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDicPartBuildReducer.class);
+ private static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDictPartBuildReducer.class);
- private Long count=0L;
+ private Long count = 0L;
private MultipleOutputs mos;
private String[] dicCols;
private String colName;
@@ -58,16 +59,16 @@ public class BuildGlobalHiveDicPartBuildReducer extends KylinReducer<Text, LongW
throws IOException, InterruptedException {
count++;
byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
- if(count==1){
- colIndex = key.getBytes()[0];//col index
+ if (count == 1) {
+ colIndex = key.getBytes()[0];
colName = dicCols[colIndex];
}
- if(count<10){
+ if (count < 10) {
logger.info("key:{}, temp dict num :{}, colIndex:{}, colName:{}", key.toString(), count, colIndex, colName);
}
- mos.write(colIndex+"", new LongWritable(count), new Text(keyBytes), "part_sort/"+colIndex);
+ mos.write(colIndex + "", new LongWritable(count), new Text(keyBytes), "part_sort/" + colIndex);
}
@Override
@@ -77,7 +78,6 @@ public class BuildGlobalHiveDicPartBuildReducer extends KylinReducer<Text, LongW
String partition = conf.get(MRJobConfig.TASK_PARTITION);
mos.write(colIndex + "", new LongWritable(count), new Text(partition), "reduce_stats/" + colIndex);
mos.close();
- logger.info("Reduce partition num {} finish, this reduce done item count is {}" , partition, count);
+ logger.info("Reduce partition num {} finish, this reduce done item count is {}", partition, count);
}
-
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartPartitioner.java
similarity index 79%
rename from engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java
rename to engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartPartitioner.java
index 97ad4f4..8858e20 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartPartitioner.java
@@ -19,6 +19,7 @@
package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
+
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
@@ -28,7 +29,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-public class BuildGlobalHiveDicPartPartitioner extends Partitioner<Text, NullWritable> implements Configurable {
+public class BuildGlobalHiveDictPartPartitioner extends Partitioner<Text, NullWritable> implements Configurable {
private Configuration conf;
private Integer[] reduceNumArr;
@@ -49,21 +50,19 @@ public class BuildGlobalHiveDicPartPartitioner extends Partitioner<Text, NullWri
@Override
public int getPartition(Text key, NullWritable value, int numReduceTasks) {
- //get first byte, the first byte value is the dic col index ,start from 0
+ // get first byte, the first byte value is the dic col index ,start from 0
int colIndex = key.getBytes()[0];
int colReduceNum = reduceNumArr[colIndex];
int colReduceNumOffset = 0;
- for (int i=0;i<colIndex;i++){
- colReduceNumOffset += reduceNumArr[i] ;
+ for (int i = 0; i < colIndex; i++) {
+ colReduceNumOffset += reduceNumArr[i];
}
- //Calculate reduce number , reduce num = (value.hash % colReduceNum) + colReduceNumOffset
+ // Calculate reduce number , reduce num = (value.hash % colReduceNum) + colReduceNumOffset
byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
- int hashCode = new Text(keyBytes).hashCode() & 0x7FFFFFFF ;
- int reduceNo = hashCode % colReduceNum + colReduceNumOffset;
-
- return reduceNo;
+ int hashCode = new Text(keyBytes).hashCode() & 0x7FFFFFFF;
+ return hashCode % colReduceNum + colReduceNumOffset;
}
@Override
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildJob.java
similarity index 92%
rename from engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java
rename to engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildJob.java
index acdbb07..20bdfc7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildJob.java
@@ -19,6 +19,7 @@
package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
+
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -40,8 +41,8 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob {
- protected static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDicTotalBuildJob.class);
+public class BuildGlobalHiveDictTotalBuildJob extends AbstractHadoopJob {
+ protected static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDictTotalBuildJob.class);
@Override
public int run(String[] args) throws Exception {
@@ -77,7 +78,7 @@ public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob {
job.getConfiguration().set("last.max.dic.value.path", getOptionValue(OPTION_GLOBAL_DIC_MAX_DISTINCT_COUNT));
job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", false);
- job.setJarByClass(BuildGlobalHiveDicTotalBuildJob.class);
+ job.setJarByClass(BuildGlobalHiveDictTotalBuildJob.class);
setJobClasspath(job, cube.getConfig());
@@ -95,8 +96,8 @@ public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob {
// prevent to create zero-sized default output
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
- //delete output
- Path baseOutputPath =new Path(getOptionValue(OPTION_OUTPUT_PATH));
+ // delete output
+ Path baseOutputPath = new Path(getOptionValue(OPTION_OUTPUT_PATH));
deletePath(job.getConfiguration(), baseOutputPath);
attachSegmentMetadataWithDict(segment, job.getConfiguration());
@@ -107,10 +108,10 @@ public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob {
}
}
- private void setOutput(Job job, String[] dicColsArry, String outputBase){
+ private void setOutput(Job job, String[] dicColsArr, String outputBase) {
// make each reducer output to respective dir
///user/prod_kylin/tmp/kylin2/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/bs_order_scene_day_new_cube_clone/dict_column=DM_ES_REPORT_ORDER_VIEW0420_DRIVER_ID/part_sort
- for(int i=0;i<dicColsArry.length;i++){
+ for (int i = 0; i < dicColsArr.length; i++) {
MultipleOutputs.addNamedOutput(job, i + "", TextOutputFormat.class, Text.class, LongWritable.class);
}
Path outputPath = new Path(outputBase);
@@ -120,7 +121,7 @@ public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob {
private void setInput(Job job, String input) throws IOException {
Path path = new Path(input);
FileSystem fs = path.getFileSystem(job.getConfiguration());
- if(!fs.exists(path)){
+ if (!fs.exists(path)) {
fs.mkdirs(path);
}
FileInputFormat.setInputPaths(job, getOptionValue(OPTION_INPUT_PATH));
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java
index b2252c0..8af341c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java
@@ -21,10 +21,12 @@ package org.apache.kylin.engine.mr.steps;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -40,7 +42,7 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMapper<KEYIN, Text, Text, LongWritable> {
+public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, KEYOUT> extends KylinMapper<KEYIN, Text, Text, LongWritable> {
private static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDictTotalBuildMapper.class);
private MultipleOutputs mos;
@@ -65,26 +67,26 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
String statPath = conf.get("partition.statistics.path");
- //get the input file name ,the file name format by colIndex-part-partitionNum, eg: 1-part-000019
+ // get the input file name ,the file name format by colIndex-part-partitionNum, eg: 1-part-000019
FileSplit fileSplit = (FileSplit) context.getInputSplit();
String[] arr = fileSplit.getPath().getName().split("-");
int partitionNum = Integer.parseInt(arr[2]);
colIndex = Integer.parseInt(arr[0]);
colName = cols[colIndex];
- logger.info("Input fileName:{},colIndex:{},colName:{},partitionNum:{}", fileSplit.getPath().getName(), colIndex, colName, partitionNum);
+ logger.info("Input fileName:{}, colIndex:{}, colName:{}, partitionNum:{}", fileSplit.getPath().getName(), colIndex, colName, partitionNum);
//last max dic value per column
String lastMaxValuePath = conf.get("last.max.dic.value.path");
- logger.info("last.max.dic.value.path:"+lastMaxValuePath);
+ logger.info("last.max.dic.value.path:" + lastMaxValuePath);
long lastMaxDictValue = this.getLastMaxDicValue(conf, lastMaxValuePath);
- logger.info("last.max.dic.value.path:"+lastMaxValuePath+",value="+lastMaxDictValue);
+ logger.info("last.max.dic.value.path:" + lastMaxValuePath + ",value=" + lastMaxDictValue);
- //Calculate the starting position of this file, the starting position of this file = sum (count) of all previous numbers + last max dic value of the column
- Map<Integer, TreeMap<Integer, Long>> allStats = getPartitionsCount(conf, statPath);//<colIndex,<reduceNum,count>>
- TreeMap<Integer, Long> partitionStats =allStats.get(colIndex);
- if(partitionNum!=0) {
+ // Calculate the starting position of this file, the starting position of this file = sum (count) of all previous numbers + last max dic value of the column
+ Map<Integer, TreeMap<Integer, Long>> allStats = getPartitionsCount(conf, statPath); //<colIndex,<reduceNum,count>>
+ TreeMap<Integer, Long> partitionStats = allStats.get(colIndex);
+ if (partitionNum != 0) {
SortedMap<Integer, Long> subStat = partitionStats.subMap(0, true, partitionNum, false);
- subStat.forEach((k, v)->{
+ subStat.forEach((k, v) -> {
logger.info("Split num:{} and it's count:{}", k, v);
start += v;
});
@@ -96,7 +98,7 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
@Override
public void doMap(KEYIN key, Text record, Context context) throws IOException, InterruptedException {
long inkey = Long.parseLong(key.toString());
- mos.write(colIndex+"", record, new LongWritable(start + inkey), "dict_column="+colName+"/"+colIndex);
+ mos.write(colIndex + "", record, new LongWritable(start + inkey), "dict_column=" + colName + "/" + colIndex);
}
@Override
@@ -106,7 +108,7 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
private Map<Integer, TreeMap<Integer, Long>> getPartitionsCount(Configuration conf, String partitionStatPath) throws IOException {
StringBuffer sb = new StringBuffer();
- String temp=null;
+ String temp = null;
String[] fileNameArr = null;
String[] statsArr = null;
@@ -121,14 +123,14 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
if (fs.exists(path) && fs.isDirectory(path)) {
for (FileStatus status : fs.listStatus(path)) {
//fileNameArr[0] is globaldict colIndex
- fileNameArr=status.getPath().getName().split("-");
+ fileNameArr = status.getPath().getName().split("-");
colStats = allStats.get(Integer.parseInt(fileNameArr[0]));
- if(colStats==null){
+ if (colStats == null) {
colStats = new TreeMap<>();
}
- temp=cat(status.getPath(), fs);
+ temp = cat(status.getPath(), fs);
logger.info("partitionStatPath:{},content:{}", partitionStatPath, temp);
- if(temp!=null){
+ if (temp != null) {
statsArr = temp.split("\t");
colStats.put(Integer.parseInt(statsArr[1]), Long.parseLong(statsArr[0]));
allStats.put(Integer.parseInt(fileNameArr[0]), colStats);
@@ -136,8 +138,8 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
}
}
- allStats.forEach((k, v)->{
- v.forEach((k1, v1)->{
+ allStats.forEach((k, v) -> {
+ v.forEach((k1, v1) -> {
logger.info("allStats.colIndex:{},this split num:{},this split num's count:{}", k, k1, v1);
});
});
@@ -148,21 +150,21 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
private String cat(Path remotePath, FileSystem fs) throws IOException {
FSDataInputStream in = null;
BufferedReader buffer = null;
- StringBuffer stat= new StringBuffer();
+ StringBuffer stat = new StringBuffer();
try {
- in= fs.open(remotePath);
- buffer= new BufferedReader(new InputStreamReader(in, "UTF-8")) ;
+ in = fs.open(remotePath);
+ buffer = new BufferedReader(new InputStreamReader(in, "UTF-8"));
String line = null;
while ((line = buffer.readLine()) != null) {
stat.append(line);
}
} catch (IOException e) {
e.printStackTrace();
- }finally {
- if(buffer!=null) {
+ } finally {
+ if (buffer != null) {
buffer.close();
}
- if(in!=null) {
+ if (in != null) {
in.close();
}
}
@@ -170,15 +172,12 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
}
/**
- *
- * @param conf
* @param lastMaxDicValuePath eg: /user/kylin/warehouse/db/kylin_intermediate_kylin_sales_cube_mr_6222c210_ce2d_e8ce_dd0f_f12c38fa9115__group_by/dict_column=KYLIN_MAX_DISTINCT_COUNT/part-00000-450ee120-39ff-4806-afaf-ed482ceffc68-c000
- * remotePath content is dict colum stats info of per column: dic column name,extract distinct value count,last max dic value
+ * remotePath content is dict column stats info of per column: dic column name,extract distinct value count,last max dic value
* @return this colIndex's last max dic value
- * @throws IOException
*/
private long getLastMaxDicValue(Configuration conf, String lastMaxDicValuePath) throws IOException {
- StringBuffer sb=new StringBuffer();
+ StringBuffer sb = new StringBuffer();
Map<Integer, Long> map = null;
Path path = new Path(lastMaxDicValuePath);
FileSystem fs = path.getFileSystem(conf);
@@ -187,39 +186,35 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
logger.info("start buildMaxCountMap :");
map = buildMaxCountMap(status.getPath(), fs);
logger.info("end buildMaxCountMap :");
-
}
}
- if(map == null){
+ if (map == null) {
return 0L;
- }else{
- return map.get(colIndex)==null?0L:map.get(colIndex);
+ } else {
+ return map.get(colIndex) == null ? 0L : map.get(colIndex);
}
}
/**
- *
* @param remotePath , eg: /user/kylin/warehouse/db/kylin_intermediate_kylin_sales_cube_mr_6222c210_ce2d_e8ce_dd0f_f12c38fa9115__group_by/dict_column=KYLIN_MAX_DISTINCT_COUNT/part-00000-450ee120-39ff-4806-afaf-ed482ceffc68-c000
- * remotePath content is dict colum stats info of per column: dic column name,extract distinct value count,last max dic value
- * @param fs
+ * remotePath content is dict column stats info of per column: dic column name,extract distinct value count,last max dic value
* @return Map<>,key is colIndex, value is last max dict value
- * @throws IOException
*/
- private Map<Integer, Long> buildMaxCountMap(Path remotePath, FileSystem fs) throws IOException {
+ private Map<Integer, Long> buildMaxCountMap(Path remotePath, FileSystem fs) throws IOException {
FSDataInputStream in = null;
BufferedReader buffer = null;
- String[] arr=null;
- Map<Integer, Long> map= new HashMap();
+ String[] arr = null;
+ Map<Integer, Long> map = new HashMap<>();
try {
- in= fs.open(remotePath);
- buffer= new BufferedReader(new InputStreamReader(in, "UTF-8")) ;
+ in = fs.open(remotePath);
+ buffer = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
String line = null;
while ((line = buffer.readLine()) != null) {
arr = line.split(",");
- logger.info("line="+line+",arr.length:"+arr.length);
- if(arr.length==3) {
+ logger.info("line=" + line + ",arr.length:" + arr.length);
+ if (arr.length == 3) {
for (int i = 0; i < cols.length; i++) {
- if(cols[i].equalsIgnoreCase(arr[0])) {
+ if (cols[i].equalsIgnoreCase(arr[0])) {
map.put(i, Long.parseLong(arr[2]));
logger.info("col.{}.maxValue={}", cols[i], Long.parseLong(arr[2]));
break;
@@ -229,15 +224,15 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap
}
} catch (IOException e) {
e.printStackTrace();
- }finally {
- if(buffer!=null) {
+ } finally {
+ if (buffer != null) {
buffer.close();
}
- if(in!=null) {
+ if (in != null) {
in.close();
}
}
- logger.info("BuildMaxCountMap map="+map);
+ logger.info("BuildMaxCountMap map=" + map);
return map;
}
}
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 9309a3d..7d6a367 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -14,7 +14,7 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.engine.spark;
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
+ *
*/
public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
@@ -48,7 +49,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
this(newSegment, submitter, 0);
}
-
+
public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter, Integer priorityOffset) {
super(newSegment, submitter, priorityOffset);
this.inputSide = SparkUtil.getBatchCubingInputSide(seg);
@@ -65,22 +66,8 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
// Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
inputSide.addStepPhase1_CreateFlatTable(result);
- // build global dict
- KylinConfig dictConfig = seg.getConfig();
- String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
-
- if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0
- && !"".equals(mrHiveDictColumns[0])) {
- //parallel part build
- result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId));
- //parallel total build
- result.addTask(createBuildGlobalHiveDicTotalBuildJob(jobId));
- }
-
- // merge global dic and replace flat table
- if(Objects.nonNull(dictConfig.getMrHiveDictColumns()) && dictConfig.getMrHiveDictColumns().length > 0 && !"".equals(dictConfig.getMrHiveDictColumns()[0])){
- inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
- }
+ // Build global dictionary in distributed way
+ buildHiveGlobalDictionaryByMR(result, jobId);
// Phase 2: Build Dictionary
if (seg.getConfig().isSparkFactDistinctEnable()) {
@@ -202,7 +189,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
public void configureSparkJob(final CubeSegment seg, final SparkExecutable sparkExecutable,
- final String jobId, final String cuboidRootPath) {
+ final String jobId, final String cuboidRootPath) {
final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg);
final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId));
sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName());
@@ -228,4 +215,28 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
param.put("path", getDumpMetadataPath(jobId));
return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString();
}
+
+ /**
+ * Build hive global dictionary by MR and encode corresponding column into integer for flat table
+ */
+ protected void buildHiveGlobalDictionaryByMR(final CubingJob result, String jobId) {
+ KylinConfig dictConfig = seg.getConfig();
+ String[] mrHiveDictColumnExcludeRef = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
+ String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns();
+
+ if (Objects.nonNull(mrHiveDictColumnExcludeRef) && mrHiveDictColumnExcludeRef.length > 0
+ && !"".equals(mrHiveDictColumnExcludeRef[0])) {
+
+ // 1. parallel part build
+ result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId));
+
+ // 2. parallel total build
+ result.addTask(createBuildGlobalHiveDictTotalBuildJob(jobId));
+ }
+
+ // merge global dic and replace flat table
+ if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0 && !"".equals(mrHiveDictColumns[0])) {
+ inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
+ }
+ }
}
diff --git a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
index fce237e..763cf90 100644
--- a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
+++ b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
@@ -595,7 +595,8 @@
"override_kylin_properties": {
"kylin.cube.algorithm": "LAYER",
"kylin.dictionary.shrunken-from-global-enabled": "false",
- "kylin.dictionary.mr-hive.columns": "TEST_KYLIN_FACT_TEST_COUNT_DISTINCT_BITMAP"
+ "kylin.dictionary.mr-hive.columns": "TEST_KYLIN_FACT_TEST_COUNT_DISTINCT_BITMAP",
+ "kylin.source.hive.databasedir" : "/apps/hive/warehouse"
},
"partition_date_start": 0
}
diff --git a/kubernetes/README.md b/kubernetes/README.md
index 964c612..2d0ecc6 100644
--- a/kubernetes/README.md
+++ b/kubernetes/README.md
@@ -1,4 +1,4 @@
-## Backgroud
+## Background
Kubernetes is a portable, extensible, open-source platform for managing containerized workloads and services, that facilitates
both declarative configuration and automation. It has a large, rapidly growing ecosystem. Kubernetes services, support,
and tools are widely available.
@@ -11,7 +11,7 @@ cluster, will reduce cost of maintenance and extension.
Please update your configuration file here.
- **template**
This directory provided two deployment templates, one for **quick-start** purpose, another for **production/distributed** deployment.
- 1. Quick-start template is for one node deployment with an **ALL** kylin instance.
+ 1. Quick-start template is for one node deployment with an **ALL** kylin instance for test or PoC purpose.
2. Production template is for multi-nodes deployment with a few of **job**/**query** kylin instances; and some other service
like **memcached** and **filebeat** will help to satisfy log collection/query cache/session sharing demand.
- **docker**
@@ -21,16 +21,16 @@ cluster, will reduce cost of maintenance and extension.
This is a complete example by applying production template in a CDH 5.7 hadoop env with step by step guide.
### Note
-1. CuratorScheduler is used as default JobScheduler because it is more flexible.
+1. **CuratorScheduler** is used as default JobScheduler because it is more flexible.
2. Spark building require use `cluster` as deployMode. If you forget it, your spark application will never submitted successfully because Hadoop cluster can not resolve hostname of Pod (Spark Driver).
3. To modify `/etc/hosts` in Pod, please check this : https://kubernetes.io/docs/concepts/services-networking/add-entries-to-pod-etc-hosts-with-host-aliases/ .
-4. To build you own kylin-client docker image, please don't forget to download and put following jars into KYLIN_HOME/tomcat/lib to enable tomcat session sharing.
+4. To build you own kylin-client docker image, please don't forget to download and put following jars into `KYLIN_HOME/tomcat/lib` to enable tomcat session sharing.
- https://repo1.maven.org/maven2/de/javakaffee/msm/memcached-session-manager-tc7/2.1.1/
- https://repo1.maven.org/maven2/de/javakaffee/msm/memcached-session-manager/2.1.1/
5. If you have difficulty in configure filebeat, please check this https://www.elastic.co/guide/en/beats/filebeat/current/index.html .
6. External query cache is enabled by default, if you are interested in detail, you may check http://kylin.apache.org/blog/2019/07/30/detailed-analysis-of-refine-query-cache/ .
-7. All configuration files is separated from Docker image, please use configMap or secret. Compared to configMap, secrets is more recommended for security reason.
-8. Some verified kylin-client image will be published to DockerHub, here is the link https://hub.docker.com/r/apachekylin/kylin-client . You may consider contributed your Dockerfile to kylin's repo if you are interested.
+7. All configuration files is separated from Docker image, please use **configMap** or **secret**. Compared to **configMap**, **secret** is more recommended for security reason.
+8. Some verified kylin-client image will be published to DockerHub, here is the link https://hub.docker.com/r/apachekylin/kylin-client . You may consider contributed your `Dockerfile` to kylin's repo if you are interested.
### Reference
- JIRA ticket: https://issues.apache.org/jira/browse/KYLIN-4447
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
index 305cdae..8538622 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
@@ -58,8 +58,9 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
private static final String GET_SQL = "\" Get Max Dict Value Sql : \"";
protected void createMrHiveDict(KylinConfig config, DistributedLock lock) throws Exception {
- logger.info("start to run createMrHiveDict {}", getId());
+ logger.info("Start to run createMrHiveDict {}", getId());
try {
+ // Step 1: Apply for lock if required
if (getIsLock()) {
getLock(lock);
}
@@ -72,12 +73,13 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
if (sql != null && sql.length() > 0) {
hiveCmdBuilder.addStatement(sql);
}
- Map<String, String> maxDictValMap = deserilizeForMap(getMaxDictStatementMap());
- Map<String, String> dictSqlMap = deserilizeForMap(getCreateTableStatementMap());
+ Map<String, String> maxDictValMap = deserializeForMap(getMaxDictStatementMap());
+ Map<String, String> dictSqlMap = deserializeForMap(getCreateTableStatementMap());
- if (dictSqlMap != null && dictSqlMap.size() > 0) {
+ // Step 2: Execute HQL
+ if (!dictSqlMap.isEmpty()) {
IHiveClient hiveClient = HiveClientFactory.getHiveClient();
- if (maxDictValMap != null && maxDictValMap.size() > 0) {
+ if (!maxDictValMap.isEmpty()) {
if (maxDictValMap.size() == dictSqlMap.size()) {
maxDictValMap.forEach((columnName, maxDictValSql) -> {
int max = 0;
@@ -111,7 +113,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
final String cmd = hiveCmdBuilder.toString();
- stepLogger.log("MR/Hive dict, cmd: " + cmd);
+ stepLogger.log("Build Hive Global Dictionary by: " + cmd);
CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
CubeInstance cube = manager.getCube(getCubeName());
@@ -123,9 +125,9 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
if (response.getFirst() != 0) {
throw new RuntimeException("Failed to create MR/Hive dict, error code " + response.getFirst());
}
- getManager().addJobInfo(getId(), stepLogger.getInfo());
}
+ // Step 3: Release lock if required
if (getIsUnlock()) {
unLock(lock);
}
@@ -153,20 +155,10 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
}
- String preHdfsShell = getPreHdfsShell();
- if (Objects.nonNull(preHdfsShell) && !"".equalsIgnoreCase(preHdfsShell)) {
- doRetry(preHdfsShell, config);
- }
-
createMrHiveDict(config, lock);
- String postfixHdfsCmd = getPostfixHdfsShell();
- if (Objects.nonNull(postfixHdfsCmd) && !"".equalsIgnoreCase(postfixHdfsCmd)) {
- doRetry(postfixHdfsCmd, config);
- }
-
if (isDiscarded()) {
- if (getIsLock()) {
+ if (getIsLock() && lock != null) {
unLock(lock);
}
return new ExecuteResult(ExecuteResult.State.DISCARDED, stepLogger.getBufferedLog());
@@ -224,44 +216,28 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
}
public void setCreateTableStatementMap(Map<String, String> dictSqlMap) {
- setParam("HiveRedistributeDataMap", serilizeToMap(dictSqlMap));
+ setParam("DictSqlMap", serializeMap(dictSqlMap));
}
public String getCreateTableStatementMap() {
- return getParam("HiveRedistributeDataMap");
+ return getParam("DictSqlMap");
}
public void setMaxDictStatementMap(Map<String, String> maxDictValMap) {
- setParam("DictMaxMap", serilizeToMap(maxDictValMap));
+ setParam("DictMaxMap", serializeMap(maxDictValMap));
}
public String getMaxDictStatementMap() {
return getParam("DictMaxMap");
}
- public String getPreHdfsShell() {
- return getParam("preHdfsCmd");
- }
-
- public void setPrefixHdfsShell(String cmd) {
- setParam("preHdfsCmd", cmd);
- }
-
- public String getPostfixHdfsShell() {
- return getParam("postfixHdfsCmd");
- }
-
- public void setPostfixHdfsShell(String cmd) {
- setParam("postfixHdfsCmd", cmd);
- }
-
public void setIsLock(Boolean isLock) {
setParam("isLock", String.valueOf(isLock));
}
public boolean getIsLock() {
String isLock = getParam("isLock");
- return Strings.isNullOrEmpty(isLock) ? false : Boolean.parseBoolean(isLock);
+ return !Strings.isNullOrEmpty(isLock) && Boolean.parseBoolean(isLock);
}
public void setJobFlowJobId(String jobId) {
@@ -278,7 +254,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
public boolean getIsUnlock() {
String isUnLock = getParam("isUnLock");
- return Strings.isNullOrEmpty(isUnLock) ? false : Boolean.parseBoolean(isUnLock);
+ return !Strings.isNullOrEmpty(isUnLock) && Boolean.parseBoolean(isUnLock);
}
public void setLockPathName(String pathName) {
@@ -368,7 +344,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
}
}
}
- isLocked = true;//get lock fail,will try again
+ isLocked = true; //get lock fail,will try again
}
}
// wait 1 min and try again
@@ -402,7 +378,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
}
}
- private static String serilizeToMap(Map<String, String> map) {
+ private static String serializeMap(Map<String, String> map) {
JSONArray result = new JSONArray();
if (map != null && map.size() > 0) {
map.forEach((key, value) -> {
@@ -418,7 +394,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
return result.toString();
}
- private static Map<String, String> deserilizeForMap(String mapStr) {
+ private static Map<String, String> deserializeForMap(String mapStr) {
Map<String, String> result = new HashMap<>();
if (mapStr != null) {
try {
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 49e3f8d..c60a2ce 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -6,15 +6,15 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
-*/
+ */
package org.apache.kylin.source.hive;
@@ -43,7 +43,6 @@ import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.IInput;
import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.spark.SparkCreatingFlatTable;
import org.apache.kylin.engine.spark.SparkExecutable;
@@ -97,17 +96,12 @@ public class HiveInputBase {
// create flat table first
addStepPhase1_DoCreateFlatTable(jobFlow);
- // create global dict
- KylinConfig dictConfig = (flatDesc.getSegment()).getConfig();
+ // create hive global dictionary
+ KylinConfig dictConfig = flatDesc.getSegment().getConfig();
String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0
&& !"".equals(mrHiveDictColumns[0])) {
- String globalDictDatabase = dictConfig.getMrHiveDictDB();
- if (null == globalDictDatabase) {
- throw new IllegalArgumentException("Mr-Hive Global dict database is null.");
- }
- String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix();
- addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, mrHiveDictColumns, globalDictDatabase, globalDictTable);
+ addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, mrHiveDictColumns);
}
// then count and redistribute
@@ -129,80 +123,75 @@ public class HiveInputBase {
@Override
public void addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable jobFlow) {
- KylinConfig dictConfig = (flatDesc.getSegment()).getConfig();
+ KylinConfig dictConfig = flatDesc.getSegment().getConfig();
final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
+ String globalDictTable = MRHiveDictUtil.globalDictTableName(flatDesc, cubeName);
+ String globalDictDatabase = dictConfig.getMrHiveDictDB();
+
String[] mrHiveDictColumnsExcludeRefCols = dictConfig.getMrHiveDictColumnsExcludeRefColumns();
Map<String, String> dictRef = dictConfig.getMrHiveDictRefColumns();
final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
- String globalDictDatabase = dictConfig.getMrHiveDictDB();
- if (null == globalDictDatabase) {
- throw new IllegalArgumentException("Mr-Hive Global dict database is null.");
- }
- String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix();
- if(Objects.nonNull(mrHiveDictColumnsExcludeRefCols) && mrHiveDictColumnsExcludeRefCols.length > 0) {
- //merge to dict table step
+ if (Objects.nonNull(mrHiveDictColumnsExcludeRefCols) && mrHiveDictColumnsExcludeRefCols.length > 0) {
jobFlow.addTask(createHiveGlobalDictMergeGlobalDict(flatDesc, hiveInitStatements, cubeName, mrHiveDictColumnsExcludeRefCols, globalDictDatabase, globalDictTable));
-
for (String item : mrHiveDictColumnsExcludeRefCols) {
dictRef.put(item, "");
}
}
- //replace step
- if(dictRef.size()>0) {
+ // replace step
+ if (!dictRef.isEmpty()) {
jobFlow.addTask(createMrHiveGlobalDictReplaceStep(flatDesc, hiveInitStatements, cubeName,
dictRef, flatTableDatabase, globalDictDatabase, globalDictTable, dictConfig.getMrHiveDictTableSuffix(), jobFlow.getId()));
}
-
}
- protected void addStepPhase1_DoCreateMrHiveGlobalDict(DefaultChainedExecutable jobFlow,
- String[] mrHiveDictColumns, String globalDictDatabase, String globalDictTable) {
+ /**
+ * 1. Create three related tables
+ * 2. Insert distinct value into distinct value table
+ * 3. Calculate statistics for dictionary
+ */
+ protected void addStepPhase1_DoCreateMrHiveGlobalDict(DefaultChainedExecutable jobFlow, String[] mrHiveDictColumns) {
final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams());
final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
- //Crete tables for global dict and extract distinct value
jobFlow.addTask(createMrHiveGlobalDictExtractStep(flatDesc, hiveInitStatements, cubeName,
- mrHiveDictColumns, globalDictDatabase, globalDictTable, jobFlow.getId()));
+ mrHiveDictColumns, jobFlow.getId()));
}
- protected static AbstractExecutable createMrHiveGlobalDictExtractStep(IJoinedFlatTableDesc flatDesc,
- String hiveInitStatements, String cubeName, String[] mrHiveDictColumns,
- String globalDictDatabase, String globalDictTable, String jobId) {
- // Firstly, determine if the global dict hive table of cube is exists.
- String createGlobalDictTableHql = "CREATE TABLE IF NOT EXISTS " + globalDictDatabase + "." + globalDictTable
- + "\n" + "( dict_key STRING COMMENT '', \n" + "dict_val INT COMMENT '' \n" + ") \n"
- + "COMMENT '' \n" + "PARTITIONED BY (dict_column string) \n" + " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' \n" + "STORED AS TEXTFILE; \n";
-
- final String dropDictIntermediateTableHql = MRHiveDictUtil.generateDropTableStatement(flatDesc);
- final String createDictIntermediateTableHql = MRHiveDictUtil.generateCreateTableStatement(flatDesc);
- final String groupByTable = flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
- final String globalDictIntermediateTable = MRHiveDictUtil.getMRHiveFlatTableGlobalDictTableName(flatDesc);
- final String dropGlobalDictIntermediateTableHql = MRHiveDictUtil.generateDropTableStatement(globalDictIntermediateTable);
- final String createGlobalDictIntermediateTableHql = MRHiveDictUtil.generateCreateGlobalDicIntermediateTableStatement(globalDictIntermediateTable);
-
- String maxAndDistinctCountSql = "INSERT OVERWRITE TABLE " + groupByTable + " PARTITION (DICT_COLUMN = '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "') "
- + "\n" + "SELECT CONCAT_WS(',', tc.dict_column, cast(tc.total_distinct_val AS String), if(tm.max_dict_val is null, '0', cast(max_dict_val as string))) "
- + "\n" + "FROM ("
- + "\n" + " SELECT dict_column,count(1) total_distinct_val FROM "
- + "\n" + groupByTable + " where DICT_COLUMN != '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "' group by dict_column) tc "
- + "\n" + "LEFT JOIN (\n"
- + "\n" + " SELECT dict_column,if(max(dict_val) is null, 0, max(dict_val)) as max_dict_val FROM "
- + "\n" + globalDictDatabase + "." + globalDictTable + " group by dict_column) tm "
- + "\n" + "ON tc.dict_column = tm.dict_column;";
+ protected static AbstractExecutable createMrHiveGlobalDictExtractStep(IJoinedFlatTableDesc flatDesc, String hiveInitStatements,
+ String cubeName, String[] mrHiveDictColumns, String jobId) {
+ KylinConfig cfg = flatDesc.getSegment().getConfig();
+ String globalDictTable = MRHiveDictUtil.globalDictTableName(flatDesc, cubeName);
+ String globalDictDatabase = cfg.getMrHiveDictDB();
+ final String distinctValueTable = MRHiveDictUtil.distinctValueTable(flatDesc);
+ final String segmentLevelDictTableName = MRHiveDictUtil.segmentLevelDictTableName(flatDesc);
+
+ final String createGlobalDictTableHql = MRHiveDictUtil.generateDictionaryDdl(globalDictDatabase, globalDictTable);
+ final String dropDistinctValueTableHql = MRHiveDictUtil.generateDropTableStatement(distinctValueTable);
+ final String createDistinctValueTableHql = MRHiveDictUtil.generateDistinctValueTableStatement(flatDesc);
+ final String dropSegmentLevelDictTableHql = MRHiveDictUtil.generateDropTableStatement(segmentLevelDictTableName);
+ final String createSegmentLevelDictTableHql = MRHiveDictUtil.generateDictTableStatement(segmentLevelDictTableName);
+
+ String maxAndDistinctCountSql = MRHiveDictUtil.generateDictStatisticsSql(distinctValueTable, globalDictTable, globalDictDatabase);
StringBuilder insertDataToDictIntermediateTableSql = new StringBuilder();
for (String dictColumn : mrHiveDictColumns) {
insertDataToDictIntermediateTableSql
.append(MRHiveDictUtil.generateInsertDataStatement(flatDesc, dictColumn, globalDictDatabase, globalDictTable));
}
- String set = "set hive.exec.compress.output=false;set hive.mapred.mode=unstrict;";
+ String setParametersHql = "set hive.exec.compress.output=false;set hive.mapred.mode=unstrict;";
CreateMrHiveDictStep step = new CreateMrHiveDictStep();
step.setInitStatement(hiveInitStatements);
- step.setCreateTableStatement(set + createGlobalDictTableHql + dropDictIntermediateTableHql
- + createDictIntermediateTableHql + dropGlobalDictIntermediateTableHql + createGlobalDictIntermediateTableHql + insertDataToDictIntermediateTableSql.toString() + maxAndDistinctCountSql);
+ step.setCreateTableStatement(setParametersHql
+ + createGlobalDictTableHql
+ + dropDistinctValueTableHql
+ + createDistinctValueTableHql
+ + dropSegmentLevelDictTableHql
+ + createSegmentLevelDictTableHql
+ + insertDataToDictIntermediateTableSql.toString()
+ + maxAndDistinctCountSql);
CubingExecutableUtil.setCubeName(cubeName, step.getParams());
step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL);
step.setIsLock(true);
@@ -212,20 +201,27 @@ public class HiveInputBase {
return step;
}
+ /**
+ * In the previous step, data of hive global dictionary is prepared by MR,
+ * so now it is time for create partition for Segment Dictionary Table
+ * and merge into Hive Global Dictionary Table.
+ */
protected static AbstractExecutable createHiveGlobalDictMergeGlobalDict(IJoinedFlatTableDesc flatDesc,
String hiveInitStatements, String cubeName, String[] mrHiveDictColumns,
String globalDictDatabase, String globalDictTable) {
- String globalDictItermediateTable = MRHiveDictUtil.getMRHiveFlatTableGlobalDictTableName(flatDesc);
+ String globalDictIntermediateTable = MRHiveDictUtil.segmentLevelDictTableName(flatDesc);
+ StringBuilder addPartitionHql = new StringBuilder();
- StringBuffer addPartition = new StringBuffer();
- Map<String, String> maxDictValMap = new HashMap<>();
Map<String, String> dictHqlMap = new HashMap<>();
for (String dictColumn : mrHiveDictColumns) {
try {
- addPartition.append("alter table ").append(globalDictItermediateTable)
- .append(" add IF NOT EXISTS partition (dict_column='").append(dictColumn)
- .append("');").append(" \n");
+ addPartitionHql.append("ALTER TABLE ")
+ .append(globalDictIntermediateTable)
+ .append(" ADD IF NOT EXISTS PARTITION (dict_column='")
+ .append(dictColumn)
+ .append("');")
+ .append(" \n");
String dictHql = "INSERT OVERWRITE TABLE " + globalDictDatabase + "." + globalDictTable + " \n"
+ "PARTITION (dict_column = '" + dictColumn + "') \n"
@@ -233,7 +229,7 @@ public class HiveInputBase {
+ globalDictDatabase + "." + globalDictTable + " \n" + "WHERE dict_column = '" + dictColumn
+ "' \n" + flatDesc.getDataModel().getConfig().getHiveUnionStyle() + " \n"
+ "SELECT dict_key, dict_val FROM "
- + globalDictItermediateTable + " \n" + " WHERE dict_column = '" + dictColumn + "' ;\n";
+ + globalDictIntermediateTable + " \n" + " WHERE dict_column = '" + dictColumn + "' ;\n";
dictHqlMap.put(dictColumn, dictHql);
} catch (Exception e) {
logger.error("", e);
@@ -241,9 +237,8 @@ public class HiveInputBase {
}
String hiveInitStatementForUnstrict = "set hive.mapred.mode=unstrict;";
CreateMrHiveDictStep step = new CreateMrHiveDictStep();
- step.setInitStatement(hiveInitStatements + hiveInitStatementForUnstrict + addPartition);
+ step.setInitStatement(hiveInitStatements + hiveInitStatementForUnstrict + addPartitionHql);
step.setCreateTableStatementMap(dictHqlMap);
- step.setMaxDictStatementMap(maxDictValMap);
step.setIsLock(false);
step.setIsUnLock(false);
step.setLockPathName(cubeName);
@@ -252,58 +247,75 @@ public class HiveInputBase {
return step;
}
+ /**
+ * Use Hive Global Dictionary to replace/encode flat table
+ *
+ * @param mrHiveDictColumns a Map which key is and vale is .
+ */
protected static AbstractExecutable createMrHiveGlobalDictReplaceStep(IJoinedFlatTableDesc flatDesc, String hiveInitStatements, String cubeName, Map<String, String> mrHiveDictColumns, String flatTableDatabase, String globalDictDatabase, String globalDictTable, String dictSuffix, String jobId) {
Map<String, String> dictHqlMap = new HashMap<>();
- StringBuilder addPartition = new StringBuilder();
for (String dictColumn : mrHiveDictColumns.keySet()) {
- StringBuilder dictHql = new StringBuilder();
+ StringBuilder insertOverwriteHql = new StringBuilder();
TblColRef dictColumnRef = null;
String flatTable = flatTableDatabase + "." + flatDesc.getTableName();
- // replace the flat table's dict column value
- dictHql.append("INSERT OVERWRITE TABLE " + flatTable + " \n");
+ insertOverwriteHql.append("INSERT OVERWRITE TABLE ").append(flatTable).append(" \n");
try {
- dictHql.append("SELECT \n");
- Integer flatTableColumnSize = flatDesc.getAllColumns().size();
+ insertOverwriteHql.append("SELECT \n");
+ int flatTableColumnSize = flatDesc.getAllColumns().size();
for (int i = 0; i < flatTableColumnSize; i++) {
TblColRef tblColRef = flatDesc.getAllColumns().get(i);
+ String colName = JoinedFlatTable.colName(tblColRef, flatDesc.useAlias());
+
if (i > 0) {
- dictHql.append(",");
+ insertOverwriteHql.append(",");
}
- if (JoinedFlatTable.colName(tblColRef, flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) {
- dictHql.append("b.dict_val \n");
+
+ if (colName.equalsIgnoreCase(dictColumn)) {
+ // Note: replace original value into encoded integer
+ insertOverwriteHql.append("b.dict_val \n");
dictColumnRef = tblColRef;
} else {
- dictHql.append("a." + JoinedFlatTable.colName(tblColRef) + " \n");
+ // Note: keep its original value
+ insertOverwriteHql.append("a.")
+ .append(JoinedFlatTable.colName(tblColRef))
+ .append(" \n");
}
}
if (!Strings.isNullOrEmpty(mrHiveDictColumns.get(dictColumn))) {
- String[] cubePartion = mrHiveDictColumns.get(dictColumn).split("\\.");
-
- String refGlobalDictTable = cubePartion[0] + dictSuffix;
- String refDictColumn = cubePartion[1];
-
- dictHql.append("FROM " + flatTable + " a \n" + "LEFT OUTER JOIN \n" + "( \n"
- + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + refGlobalDictTable
- + " WHERE dict_column = '" + refDictColumn + "' \n" + ") b \n" + " ON a."
- + JoinedFlatTable.colName(dictColumnRef) + " = b.dict_key;");
- dictHqlMap.put(dictColumn, dictHql.toString());
- }else {
- dictHql.append("FROM " + flatTable + " a \n" + "LEFT OUTER JOIN \n" + "( \n"
- + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + globalDictTable
- + " WHERE dict_column = '" + dictColumn + "' \n" + ") b \n" + " ON a."
- + JoinedFlatTable.colName(dictColumnRef) + " = b.dict_key;");
+ // Note: reuse previous hive global dictionary
+ String[] tableColumn = mrHiveDictColumns.get(dictColumn).split("\\.");
+
+ String refGlobalDictTable = tableColumn[0] + dictSuffix;
+ String refDictColumn = tableColumn[1];
+
+ insertOverwriteHql
+ .append("FROM ").append(flatTable).append(" a \nLEFT OUTER JOIN \n (")
+ .append("SELECT dict_key, dict_val FROM ")
+ .append(globalDictDatabase).append(".").append(refGlobalDictTable)
+ .append(" WHERE dict_column = '").append(refDictColumn).append("') b \n")
+ .append("ON a.").append(JoinedFlatTable.colName(dictColumnRef)).append(" = b.dict_key;");
+ dictHqlMap.put(dictColumn, insertOverwriteHql.toString());
+ } else {
+ // Note: use hive global dictionary built by current cube
+ insertOverwriteHql
+ .append("FROM ").append(flatTable).append(" a \nLEFT OUTER JOIN \n (")
+ .append("SELECT dict_key, dict_val FROM ")
+ .append(globalDictDatabase).append(".").append(globalDictTable)
+ .append(" WHERE dict_column = '").append(dictColumn).append("') b \n")
+ .append("ON a.").append(JoinedFlatTable.colName(dictColumnRef)).append(" = b.dict_key;");
}
- dictHqlMap.put(dictColumn, dictHql.toString());
+ dictHqlMap.put(dictColumn, insertOverwriteHql.toString());
} catch (Exception e) {
logger.error("", e);
}
}
- String set = "set hive.exec.compress.output=false; set hive.mapred.mode=unstrict;";
+ String setParameterHal = "set hive.exec.compress.output=false; set hive.mapred.mode=unstrict;";
CreateMrHiveDictStep step = new CreateMrHiveDictStep();
- step.setInitStatement(hiveInitStatements + set + addPartition);
+ step.setInitStatement(hiveInitStatements + setParameterHal);
step.setCreateTableStatementMap(dictHqlMap);
+
step.setIsUnLock(true);
step.setLockPathName(cubeName);
step.setJobFlowJobId(jobId);
@@ -389,7 +401,7 @@ public class HiveInputBase {
}
protected static AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir,
- String cubeName, IJoinedFlatTableDesc flatDesc) {
+ String cubeName, IJoinedFlatTableDesc flatDesc) {
//from hive to hive
final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
@@ -404,7 +416,7 @@ public class HiveInputBase {
}
protected static AbstractExecutable createFlatHiveTableByLivyStep(String hiveInitStatements, String jobWorkingDir,
- String cubeName, IJoinedFlatTableDesc flatDesc) {
+ String cubeName, IJoinedFlatTableDesc flatDesc) {
//from hive to hive
final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
@@ -419,7 +431,7 @@ public class HiveInputBase {
}
protected static AbstractExecutable createFlatHiveTableBySparkSql(String hiveInitStatements,
- String jobWorkingDir, String cubeName, IJoinedFlatTableDesc flatDesc) {
+ String jobWorkingDir, String cubeName, IJoinedFlatTableDesc flatDesc) {
final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc);
final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc,
jobWorkingDir);
@@ -472,7 +484,7 @@ public class HiveInputBase {
}
protected static AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName,
- IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
+ IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep();
step.setInitStatement(hiveInitStatements);
step.setIntermediateTable(flatDesc.getTableName());
@@ -483,7 +495,7 @@ public class HiveInputBase {
}
protected static AbstractExecutable createRedistributeFlatHiveTableByLivyStep(String hiveInitStatements,
- String cubeName, IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
+ String cubeName, IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
RedistributeFlatHiveTableByLivyStep step = new RedistributeFlatHiveTableByLivyStep();
step.setInitStatement(hiveInitStatements);
step.setIntermediateTable(flatDesc.getTableName());
@@ -493,8 +505,8 @@ public class HiveInputBase {
return step;
}
- protected static ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements,
- String jobWorkingDir, IJoinedFlatTableDesc flatDesc, List<String> intermediateTables, String uuid) {
+ protected static ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, String jobWorkingDir, IJoinedFlatTableDesc flatDesc,
+ List<String> intermediateTables, String uuid) {
ShellExecutable step = new ShellExecutable();
step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
index 85cd855..573ecd3 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
@@ -27,6 +27,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.livy.LivyRestBuilder;
import org.apache.kylin.common.livy.LivyRestExecutor;
import org.apache.kylin.common.livy.LivyTypeEnum;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.job.JoinedFlatTable;
import org.apache.kylin.job.common.PatternedLogger;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -41,6 +42,30 @@ import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+/**
+ * <pre>
+ * Hold some constant/enum/statement for Hive Global Dictionary.
+ *
+ * There are two different temporary tables which help to build Hive Global Dictionary.
+ * They should be deleted at the final step of building job.
+ * 1. Distinct Value Table (Temporary table)
+ * TableName: ${FlatTable}_${DistinctValueSuffix}
+ * Schema: One normal column, for original column value; with another partition column.
+ * @see #distinctValueTable
+ *
+ * 2. Segment Level Dictionary Table (Temporary table)
+ * TableName: ${FlatTable}_${DictTableSuffix}
+ * Schema: Two normal columns, first for original column value, second for is its encoded integer;
+ * also with another partition column
+ * @see #segmentLevelDictTableName
+ *
+ * After that, Hive Global Dictionary itself is stored in a third hive table.
+ * 3. Hive Global Dictionary Table
+ * TableName: ${CubeName}_${DictTableSuffix}
+ * Schema: Two columns, first for original column value, second is its encoded integer; also with another partition column
+ * @see #globalDictTableName
+ * </pre>
+ */
public class MRHiveDictUtil {
private static final Logger logger = LoggerFactory.getLogger(MRHiveDictUtil.class);
protected static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';");
@@ -59,12 +84,27 @@ public class MRHiveDictUtil {
}
}
- public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) {
- StringBuilder ddl = new StringBuilder();
- String table = flatDesc.getTableName()
- + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
- ddl.append("DROP TABLE IF EXISTS " + table + ";").append(" \n");
- return ddl.toString();
+ public static String distinctValueTable(IJoinedFlatTableDesc flatDesc) {
+ return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDistinctValueTableSuffix();
+ }
+
+ public static String segmentLevelDictTableName(IJoinedFlatTableDesc flatDesc) {
+ return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictTableSuffix();
+ }
+
+ public static String globalDictTableName(IJoinedFlatTableDesc flatDesc, String cubeName) {
+ return cubeName + flatDesc.getSegment().getConfig().getMrHiveDictTableSuffix();
+ }
+
+ public static String generateDictionaryDdl(String db, String tbl) {
+ return "CREATE TABLE IF NOT EXISTS " + db + "." + tbl + "\n"
+ + " ( dict_key STRING COMMENT '', \n"
+ + " dict_val INT COMMENT '' \n"
+ + ") \n"
+ + "COMMENT 'Hive Global Dictionary' \n"
+ + "PARTITIONED BY (dict_column string) \n"
+ + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' \n"
+ + "STORED AS TEXTFILE; \n";
}
public static String generateDropTableStatement(String tableName) {
@@ -73,14 +113,14 @@ public class MRHiveDictUtil {
return ddl.toString();
}
- public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc) {
+ public static String generateDistinctValueTableStatement(IJoinedFlatTableDesc flatDesc) {
StringBuilder ddl = new StringBuilder();
String table = flatDesc.getTableName()
- + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
+ + flatDesc.getSegment().getConfig().getMrHiveDistinctValueTableSuffix();
ddl.append("CREATE TABLE IF NOT EXISTS " + table + " \n");
ddl.append("( \n ");
- ddl.append("dict_key" + " " + "STRING" + " COMMENT '' \n");
+ ddl.append(" dict_key" + " " + "STRING" + " COMMENT '' \n");
ddl.append(") \n");
ddl.append("COMMENT '' \n");
ddl.append("PARTITIONED BY (dict_column string) \n");
@@ -89,13 +129,13 @@ public class MRHiveDictUtil {
return ddl.toString();
}
- public static String generateCreateGlobalDicIntermediateTableStatement(String globalTableName) {
+ public static String generateDictTableStatement(String globalTableName) {
StringBuilder ddl = new StringBuilder();
ddl.append("CREATE TABLE IF NOT EXISTS " + globalTableName + " \n");
ddl.append("( \n ");
- ddl.append("dict_key" + " " + "STRING" + " COMMENT '' , \n");
- ddl.append("dict_val" + " " + "STRING" + " COMMENT '' \n");
+ ddl.append(" dict_key" + " " + "STRING" + " COMMENT '' , \n");
+ ddl.append(" dict_val" + " " + "STRING" + " COMMENT '' \n");
ddl.append(") \n");
ddl.append("COMMENT '' \n");
ddl.append("PARTITIONED BY (dict_column string) \n");
@@ -105,12 +145,12 @@ public class MRHiveDictUtil {
return ddl.toString();
}
+ /**
+ * Fetch distinct value from flat table and insert into distinctValueTable.
+ *
+ * @see #distinctValueTable
+ */
public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, String dictColumn, String globalDictDatabase, String globalDictTable) {
- String table = getMRHiveFlatTableGroupBytableName(flatDesc);
-
- StringBuilder sql = new StringBuilder();
- sql.append("SELECT a.DICT_KEY FROM (" + "\n");
-
int index = 0;
for (TblColRef tblColRef : flatDesc.getAllColumns()) {
if (JoinedFlatTable.colName(tblColRef, flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) {
@@ -118,42 +158,56 @@ public class MRHiveDictUtil {
}
index++;
}
-
if (index == flatDesc.getAllColumns().size()) {
String msg = "Can not find correct column for " + dictColumn
+ ", please check 'kylin.dictionary.mr-hive.columns'";
logger.error(msg);
throw new IllegalArgumentException(msg);
}
- sql.append(" SELECT " + "\n");
- TblColRef col = flatDesc.getAllColumns().get(index);
- sql.append(JoinedFlatTable.colName(col) + " as DICT_KEY \n");
-
- MRHiveDictUtil.appendJoinStatement(flatDesc, sql);
- //group by
- sql.append("GROUP BY ");
- sql.append(JoinedFlatTable.colName(col) + ") a \n");
+ String table = distinctValueTable(flatDesc);
+ StringBuilder sql = new StringBuilder();
+ TblColRef col = flatDesc.getAllColumns().get(index);
- //join
- sql.append(" LEFT JOIN \n");
- sql.append("(SELECT DICT_KEY FROM ");
- sql.append(globalDictDatabase).append(".").append(globalDictTable);
- sql.append(" WHERE DICT_COLUMN = '" + dictColumn + "'");
- sql.append(") b \n");
+ sql.append("SELECT a.DICT_KEY FROM (\n");
+ sql.append(" SELECT " + "\n");
+ sql.append(JoinedFlatTable.colName(col)).append(" as DICT_KEY \n");
+ sql.append(" FROM ").append(flatDesc.getTableName()).append("\n");
+ sql.append(" GROUP BY ");
+ sql.append(JoinedFlatTable.colName(col)).append(") a \n");
+ sql.append(" LEFT JOIN \n");
+ sql.append(" (SELECT DICT_KEY FROM ").append(globalDictDatabase).append(".").append(globalDictTable);
+ sql.append(" WHERE DICT_COLUMN = '").append(dictColumn);
+ sql.append("' ) b \n");
sql.append("ON a.DICT_KEY = b.DICT_KEY \n");
- sql.append("WHERE b.DICT_KEY IS NULL \n");
+ sql.append("WHERE b.DICT_KEY IS NULL \n");
- return "INSERT OVERWRITE TABLE " + table + " \n" + "PARTITION (dict_column = '" + dictColumn + "')" + " \n"
- + sql + ";\n";
+ return "INSERT OVERWRITE TABLE " + table + " \n"
+ + "PARTITION (dict_column = '" + dictColumn + "')" + " \n"
+ + sql.toString()
+ + ";\n";
}
- public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql) {
- sql.append("FROM " + flatDesc.getTableName() + "\n");
+ /**
+ * Calculate and store "columnName,segmentDistinctCount,previousMaxDictId" into specific partition
+ */
+ public static String generateDictStatisticsSql(String distinctValueTable, String globalDictTable, String globalDictDatabase) {
+ return "INSERT OVERWRITE TABLE " + distinctValueTable + " PARTITION (DICT_COLUMN = '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "') "
+ + "\n" + "SELECT CONCAT_WS(',', tc.dict_column, cast(tc.total_distinct_val AS String), if(tm.max_dict_val is null, '0', cast(max_dict_val as string))) "
+ + "\n" + "FROM ("
+ + "\n" + " SELECT dict_column, count(1) total_distinct_val"
+ + "\n" + " FROM " + globalDictDatabase + "." + distinctValueTable
+ + "\n" + " WHERE DICT_COLUMN != '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "'"
+ + "\n" + " GROUP BY dict_column) tc "
+ + "\n" + "LEFT JOIN (\n"
+ + "\n" + " SELECT dict_column, if(max(dict_val) is null, 0, max(dict_val)) as max_dict_val "
+ + "\n" + " FROM " + globalDictDatabase + "." + globalDictTable
+ + "\n" + " GROUP BY dict_column) tm "
+ + "\n" + "ON tc.dict_column = tm.dict_column;";
}
public static void runLivySqlJob(PatternedLogger stepLogger, KylinConfig config, ImmutableList<String> sqls,
- ExecutableManager executableManager, String jobId) throws IOException {
+ ExecutableManager executableManager, String jobId) throws IOException {
final LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride());
StringBuilder stringBuilder = new StringBuilder();
@@ -171,7 +225,7 @@ public class MRHiveDictUtil {
executor.execute(livyRestBuilder, stepLogger);
Map<String, String> info = stepLogger.getInfo();
- //get the flat Hive table size
+ // get the flat Hive table size
Matcher matcher = HDFS_LOCATION.matcher(args);
if (matcher.find()) {
String hiveFlatTableHdfsUrl = matcher.group(1);
@@ -194,14 +248,6 @@ public class MRHiveDictUtil {
return DictHiveType.MrEphemeralDictLockPath.getName() + cubeName;
}
- public static String getMRHiveFlatTableGroupBytableName(IJoinedFlatTableDesc flatDesc) {
- return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
- }
-
- public static String getMRHiveFlatTableGlobalDictTableName(IJoinedFlatTableDesc flatDesc) {
- return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictTableSuffix();
- }
-
private static long getFileSize(String hdfsUrl) throws IOException {
Configuration configuration = new Configuration();
Path path = new Path(hdfsUrl);