You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2017/10/21 14:18:14 UTC
kylin git commit: KYLIN-2764 Build UHC Dict Use MR
Repository: kylin
Updated Branches:
refs/heads/2764 [created] 3941b4da3
KYLIN-2764 Build UHC Dict Use MR
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3941b4da
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3941b4da
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3941b4da
Branch: refs/heads/2764
Commit: 3941b4da36b255c8d44ce3dcea75ecef11577155
Parents: d0ff774
Author: kangkaisen <ka...@meituan.com>
Authored: Fri Jul 21 15:22:30 2017 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Sat Oct 21 22:17:42 2017 +0800
----------------------------------------------------------------------
.../apache/kylin/common/KylinConfigBase.java | 11 +-
.../java/org/apache/kylin/cube/CubeManager.java | 102 ++++++++----
.../apache/kylin/dict/DictionaryGenerator.java | 14 +-
.../org/apache/kylin/dict/DictionaryInfo.java | 4 +
.../apache/kylin/dict/DictionaryManager.java | 40 +++++
.../kylin/dict/GlobalDictionaryBuilder.java | 9 +-
.../apache/kylin/dict/IDictionaryBuilder.java | 2 +-
.../global/SegmentAppendTrieDictBuilder.java | 4 +-
.../kylin/dict/DictionaryProviderTest.java | 2 +-
.../kylin/job/constant/ExecutableConstants.java | 1 +
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 26 +++-
.../kylin/engine/mr/JobBuilderSupport.java | 24 ++-
.../engine/mr/common/AbstractHadoopJob.java | 2 +
.../kylin/engine/mr/common/BatchConstants.java | 2 +
.../engine/mr/steps/CreateDictionaryJob.java | 16 +-
.../mr/steps/FactDistinctColumnsReducer.java | 2 +-
.../kylin/engine/mr/steps/UHCDictionaryJob.java | 154 +++++++++++++++++++
.../engine/mr/steps/UHCDictionaryMapper.java | 101 ++++++++++++
.../mr/steps/UHCDictionaryPartitioner.java | 30 ++++
.../engine/mr/steps/UHCDictionaryReducer.java | 113 ++++++++++++++
.../test_case_data/sandbox/kylin.properties | 5 +
.../dict/ITGlobalDictionaryBuilderTest.java | 4 +-
22 files changed, 611 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f3cf6c0..d204f71 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -967,6 +967,10 @@ abstract public class KylinConfigBase implements Serializable {
return getPropertiesByPrefix("kylin.engine.mr.config-override.");
}
+ public Map<String, String> getUHCMRConfigOverride() {
+ return getPropertiesByPrefix("kylin.engine.mr.uhc-config-override.");
+ }
+
public Map<String, String> getSparkConfigOverride() {
return getPropertiesByPrefix("kylin.engine.spark-conf.");
}
@@ -993,9 +997,14 @@ abstract public class KylinConfigBase implements Serializable {
//UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns
public int getUHCReducerCount() {
- return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "1"));
+ return Integer.parseInt(getOptional("kylin.engine.mr.uhc-reducer-count", "5"));
}
+ public boolean isBuildUHCDictWithMREnabled() {
+ return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-uhc-dict", "true"));
+ }
+
+
public boolean isBuildDictInReducerEnabled() {
return Boolean.parseBoolean(getOptional("kylin.engine.mr.build-dict-in-reducer", "true"));
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 3bb9f21..180c3f4 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -30,7 +30,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -832,6 +831,74 @@ public class CubeManager implements IRealizationProvider {
return getCube(name);
}
+ // ============================================================================
+
+
+ /**
+ * Get the columns which need build the dictionary from fact table. (the column exists on fact and is not fk)
+ * @param cubeDesc
+ * @return
+ * @throws IOException
+ */
+ public List<TblColRef> getAllDictColumnsOnFact(CubeDesc cubeDesc) throws IOException {
+ List<TblColRef> factDictCols = new ArrayList<TblColRef>();
+ DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+ for (TblColRef col : cubeDesc.getAllColumnsNeedDictionaryBuilt()) {
+
+ String scanTable = dictMgr.decideSourceData(cubeDesc.getModel(), col).getTable();
+ if (cubeDesc.getModel().isFactTable(scanTable)) {
+ factDictCols.add(col);
+ }
+ }
+ return factDictCols;
+ }
+
+ public List<TblColRef> getAllGlobalDictColumns(CubeDesc cubeDesc) {
+ List<TblColRef> globalDictCols = new ArrayList<TblColRef>();
+ List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries();
+
+ if (dictionaryDescList == null) {
+ return globalDictCols;
+ }
+
+ for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
+ if (dictionaryDesc.getBuilderClass() != null) {
+ globalDictCols.add(dictionaryDesc.getColumnRef());
+ }
+ }
+ return globalDictCols;
+ }
+
+ //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
+ public List<TblColRef> getAllUHCColumns(CubeDesc cubeDesc) {
+ List<TblColRef> uhcColumns = new ArrayList<TblColRef>();
+ uhcColumns.addAll(getAllGlobalDictColumns(cubeDesc));
+ uhcColumns.addAll(cubeDesc.getShardByColumns());
+
+ //handle PK-FK, see getAllDictColumnsOnFact
+ try {
+ uhcColumns.retainAll(getAllDictColumnsOnFact(cubeDesc));
+ } catch (IOException e) {
+ throw new RuntimeException("Get all dict columns on fact failed");
+ }
+
+ return uhcColumns;
+ }
+
+ public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
+ List<TblColRef> factDictCols = getAllDictColumnsOnFact(cubeDesc);
+ List<TblColRef> uhcColumns = getAllUHCColumns(cubeDesc);
+ int[] uhcIndex = new int[factDictCols.size()];
+
+ for (int i = 0; i < factDictCols.size(); i++) {
+ if (uhcColumns.contains(factDictCols.get(i))) {
+ uhcIndex[i] = 1;
+ }
+ }
+
+ return uhcIndex;
+ }
+
/**
* Calculate the holes (gaps) in segments.
* @param cubeName
@@ -872,37 +939,4 @@ public class CubeManager implements IRealizationProvider {
return holes;
}
- private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder";
-
- //UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns
- public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException {
- List<TblColRef> dictCols = Lists.newArrayList(cubeDesc.getAllColumnsNeedDictionaryBuilt());
- int[] uhcIndex = new int[dictCols.size()];
-
- //add GlobalDictionaryColumns
- List<DictionaryDesc> dictionaryDescList = cubeDesc.getDictionaries();
- if (dictionaryDescList != null) {
- for (DictionaryDesc dictionaryDesc : dictionaryDescList) {
- if (dictionaryDesc.getBuilderClass() != null
- && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) {
- for (int i = 0; i < dictCols.size(); i++) {
- if (dictCols.get(i).equals(dictionaryDesc.getColumnRef())) {
- uhcIndex[i] = 1;
- break;
- }
- }
- }
- }
- }
-
- //add ShardByColumns
- Set<TblColRef> shardByColumns = cubeDesc.getShardByColumns();
- for (int i = 0; i < dictCols.size(); i++) {
- if (shardByColumns.contains(dictCols.get(i))) {
- uhcIndex[i] = 1;
- }
- }
-
- return uhcIndex;
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index 61a0664..5fdecdb 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -70,7 +70,7 @@ public class DictionaryGenerator {
ArrayList<String> samples = new ArrayList<String>(nSamples);
// init the builder
- builder.init(dictInfo, baseId);
+ builder.init(dictInfo, baseId, null);
// add values
while (valueEnumerator.moveNext()) {
@@ -111,7 +111,7 @@ public class DictionaryGenerator {
private String datePattern;
@Override
- public void init(DictionaryInfo info, int baseId) throws IOException {
+ public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
this.baseId = baseId;
}
@@ -152,7 +152,7 @@ public class DictionaryGenerator {
private static class TimeDictBuilder implements IDictionaryBuilder {
@Override
- public void init(DictionaryInfo info, int baseId) throws IOException {
+ public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
}
@Override
@@ -176,7 +176,7 @@ public class DictionaryGenerator {
TrieDictionaryBuilder builder;
@Override
- public void init(DictionaryInfo info, int baseId) throws IOException {
+ public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
this.baseId = baseId;
this.builder = new TrieDictionaryBuilder(new StringBytesConverter());
}
@@ -200,7 +200,7 @@ public class DictionaryGenerator {
TrieDictionaryForestBuilder builder;
@Override
- public void init(DictionaryInfo info, int baseId) throws IOException {
+ public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
builder = new TrieDictionaryForestBuilder(new StringBytesConverter(), baseId);
}
@@ -225,7 +225,7 @@ public class DictionaryGenerator {
NumberDictionaryBuilder builder;
@Override
- public void init(DictionaryInfo info, int baseId) throws IOException {
+ public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
this.baseId = baseId;
this.builder = new NumberDictionaryBuilder();
}
@@ -249,7 +249,7 @@ public class DictionaryGenerator {
NumberDictionaryForestBuilder builder;
@Override
- public void init(DictionaryInfo info, int baseId) throws IOException {
+ public void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException {
builder = new NumberDictionaryForestBuilder(baseId);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
index ae5c0f1..bfb1995 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
@@ -52,6 +52,10 @@ public class DictionaryInfo extends RootPersistentEntity {
public DictionaryInfo() {
}
+ public DictionaryInfo(ColumnDesc col, String dataType) {
+ this(col.getTable().getIdentity(), col.getName(), col.getZeroBasedIndex(), dataType, null);
+ }
+
public DictionaryInfo(ColumnDesc col, String dataType, TableSignature input) {
this(col.getTable().getIdentity(), col.getName(), col.getZeroBasedIndex(), dataType, input);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index e97899c..7ce608b 100755
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -29,13 +29,17 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.ArrayUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.DataModelDesc;
import org.apache.kylin.metadata.model.DataModelManager;
+import org.apache.kylin.metadata.model.JoinDesc;
+import org.apache.kylin.metadata.model.TableRef;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.source.IReadableTable;
import org.apache.kylin.source.IReadableTable.TableSignature;
@@ -342,6 +346,42 @@ public class DictionaryManager {
return dictInfo;
}
+ /**
+ * Decide a dictionary's source data, leverage PK-FK relationship.
+ */
+ public TblColRef decideSourceData(DataModelDesc model, TblColRef col) {
+ // Note FK on fact table is supported by scan the related PK on lookup table
+ // FK on fact table and join type is inner, use PK from lookup instead
+ if (model.isFactTable(col.getTable()) == false)
+ return col;
+
+ // find a lookup table that the col joins as FK
+ for (TableRef lookup : model.getLookupTables()) {
+ JoinDesc lookupJoin = model.getJoinByPKSide(lookup);
+ int find = ArrayUtils.indexOf(lookupJoin.getForeignKeyColumns(), col);
+ if (find < 0)
+ continue;
+
+ // make sure the joins are all inner up to the root
+ if (isAllInnerJoinsToRoot(model, lookupJoin))
+ return lookupJoin.getPrimaryKeyColumns()[find];
+ }
+
+ return col;
+ }
+
+ private boolean isAllInnerJoinsToRoot(DataModelDesc model, JoinDesc join) {
+ while (join != null) {
+ if (join.isInnerJoin() == false)
+ return false;
+
+ TableRef table = join.getFKSide();
+ join = model.getJoinByPKSide(table);
+ }
+ return true;
+ }
+
+
private String checkDupByInfo(DictionaryInfo dictInfo) throws IOException {
final ResourceStore store = DataModelManager.getInstance(config).getStore();
final List<DictionaryInfo> allResources = store.getAllResources(dictInfo.getResourceDir(), DictionaryInfo.class, DictionaryInfoSerializer.INFO_SERIALIZER);
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
index 404d53c..8250fed 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/GlobalDictionaryBuilder.java
@@ -42,14 +42,17 @@ public class GlobalDictionaryBuilder implements IDictionaryBuilder {
private static Logger logger = LoggerFactory.getLogger(GlobalDictionaryBuilder.class);
- @Override
- public void init(DictionaryInfo dictInfo, int baseId) throws IOException {
+ public void init(DictionaryInfo dictInfo, int baseId, String hdfsDir) throws IOException {
sourceColumn = dictInfo.getSourceTable() + "_" + dictInfo.getSourceColumn();
lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
lock.lock(getLockPath(sourceColumn), Long.MAX_VALUE);
int maxEntriesPerSlice = KylinConfig.getInstanceFromEnv().getAppendDictEntrySize();
- String baseDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory() + "resources/GlobalDict" + dictInfo.getResourceDir() + "/";
+ if (hdfsDir == null) {
+ //build in Kylin job server
+ hdfsDir = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory();
+ }
+ String baseDir = hdfsDir + "resources/GlobalDict" + dictInfo.getResourceDir() + "/";
this.builder = new AppendTrieDictionaryBuilder(baseDir, maxEntriesPerSlice, true);
this.baseId = baseId;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
index 0934a7d..18bbb07 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryBuilder.java
@@ -28,7 +28,7 @@ import org.apache.kylin.common.util.Dictionary;
public interface IDictionaryBuilder {
/** Sets the dictionary info for the dictionary being built. Mainly for GlobalDictionaryBuilder. */
- void init(DictionaryInfo info, int baseId) throws IOException;
+ void init(DictionaryInfo info, int baseId, String hdfsDir) throws IOException;
/** Add a new value into dictionary, returns it is accepted (not null) or not. */
boolean addValue(String value);
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java b/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java
index c8bc13d..e5f2d57 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/global/SegmentAppendTrieDictBuilder.java
@@ -38,14 +38,14 @@ public class SegmentAppendTrieDictBuilder implements IDictionaryBuilder {
private String sourceColumn;
@Override
- public void init(DictionaryInfo dictInfo, int baseId) throws IOException {
+ public void init(DictionaryInfo dictInfo, int baseId, String hdfsDir) throws IOException {
sourceColumn = dictInfo.getSourceTable() + "." + dictInfo.getSourceColumn();
KylinConfig config = KylinConfig.getInstanceFromEnv();
int maxEntriesPerSlice = config.getAppendDictEntrySize();
//use UUID to make each segment dict in different HDFS dir and support concurrent build
//use timestamp to make the segment dict easily to delete
- String baseDir = config.getHdfsWorkingDirectory() + "resources/SegmentDict" + dictInfo.getResourceDir() + "/" + UUID.randomUUID().toString() + "_" + System.currentTimeMillis()+ "/";
+ String baseDir = hdfsDir + "resources/SegmentDict" + dictInfo.getResourceDir() + "/" + UUID.randomUUID().toString() + "_" + System.currentTimeMillis()+ "/";
this.builder = new AppendTrieDictionaryBuilder(baseDir, maxEntriesPerSlice, false);
this.baseId = baseId;
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
index 4b386a7..7e2e218 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/DictionaryProviderTest.java
@@ -84,7 +84,7 @@ public class DictionaryProviderTest extends LocalFileMetadataTestCase{
private Dictionary<String> getDict(DataType type, Iterator<String> values) throws Exception {
IDictionaryBuilder builder = DictionaryGenerator.newDictionaryBuilder(type);
- builder.init(null, 0);
+ builder.init(null, 0, null);
while (values.hasNext()) {
builder.addValue(values.next());
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 2de3efa..36496fe 100644
--- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -33,6 +33,7 @@ public final class ExecutableConstants {
public static final String SOURCE_RECORDS_SIZE = "source_records_size";
public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
+ public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC Dictionary";
public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
public static final String STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP = "Materialize Hive View in Lookup Tables";
public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 51c9056..4b808d1 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -18,6 +18,7 @@
package org.apache.kylin.engine.mr;
+import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
@@ -31,9 +32,12 @@ import org.apache.kylin.engine.mr.steps.NDCuboidJob;
import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.model.TblColRef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+
public class BatchCubingJobBuilder2 extends JobBuilderSupport {
private static final Logger logger = LoggerFactory.getLogger(BatchCubingJobBuilder2.class);
@@ -58,6 +62,11 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
// Phase 2: Build Dictionary
result.addTask(createFactDistinctColumnsStepWithStats(jobId));
+
+ if (isEnableUHCDictStep()) {
+ result.addTask(createBuildUHCDictStep(jobId));
+ }
+
result.addTask(createBuildDictionaryStep(jobId));
result.addTask(createSaveStatisticsStep(jobId));
outputSide.addStepPhase2_BuildDictionary(result);
@@ -75,13 +84,26 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
return result;
}
+ private boolean isEnableUHCDictStep() {
+ if (!config.getConfig().isBuildUHCDictWithMREnabled()) {
+ return false;
+ }
+
+ List<TblColRef> uhcColumns = CubeManager.getInstance(config.getConfig()).getAllUHCColumns(seg.getCubeDesc());
+ if (uhcColumns.size() == 0) {
+ return false;
+ }
+
+ return true;
+ }
+
protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
final int maxLevel = seg.getCuboidScheduler().getBuildLevel();
// base cuboid step
result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId));
// n dim cuboid steps
for (int i = 1; i <= maxLevel; i++) {
- result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i-1), getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId));
+ result.addTask(createNDimensionCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, i - 1), getCuboidOutputPathsByLevel(cuboidRootPath, i), i, jobId));
}
}
@@ -138,7 +160,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
baseCuboidStep.setMapReduceParams(cmd.toString());
baseCuboidStep.setMapReduceJobClass(getBaseCuboidJob());
-// baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+ // baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
return baseCuboidStep;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index c1ed345..2a51c89 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -26,6 +26,7 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
+import org.apache.kylin.engine.mr.steps.UHCDictionaryJob;
import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
@@ -81,6 +82,22 @@ public class JobBuilderSupport {
return result;
}
+ public MapReduceExecutable createBuildUHCDictStep(String jobId) {
+ MapReduceExecutable result = new MapReduceExecutable();
+ result.setName(ExecutableConstants.STEP_NAME_BUILD_UHC_DICTIONARY);
+ result.setMapReduceJobClass(UHCDictionaryJob.class);
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd);
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getDictRootPath(jobId));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getFactDistinctColumnsPath(jobId));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, "Kylin_Build_UHC_Dict" + seg.getRealization().getName());
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
+ result.setMapReduceParams(cmd.toString());
+ result.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+ return result;
+ }
+
public HadoopShellExecutable createBuildDictionaryStep(String jobId) {
// base cuboid job
HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
@@ -89,6 +106,7 @@ public class JobBuilderSupport {
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getFactDistinctColumnsPath(jobId));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_DICT_PATH, getDictRootPath(jobId));
buildDictionaryStep.setJobParams(cmd.toString());
buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
@@ -104,7 +122,6 @@ public class JobBuilderSupport {
CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
CubingExecutableUtil.setCubingJobId(jobId, result.getParams());
-
return result;
}
@@ -176,6 +193,10 @@ public class JobBuilderSupport {
return getRealizationRootPath(jobId) + "/fact_distinct_columns/" + BatchConstants.CFG_OUTPUT_STATISTICS;
}
+ public String getDictRootPath(String jobId) {
+ return getRealizationRootPath(jobId) + "/dict";
+ }
+
// ============================================================================
// static methods also shared by other job flow participant
// ----------------------------------------------------------------------------
@@ -203,5 +224,4 @@ public class JobBuilderSupport {
}
}
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index babf69b..1756251 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -97,6 +97,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
.hasArg().isRequired(false).withDescription("Input format").create(BatchConstants.ARG_INPUT_FORMAT);
protected static final Option OPTION_OUTPUT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_OUTPUT).hasArg()
.isRequired(true).withDescription("Output path").create(BatchConstants.ARG_OUTPUT);
+ protected static final Option OPTION_DICT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_DICT_PATH).hasArg()
+ .isRequired(false).withDescription("Dict path").create(BatchConstants.ARG_DICT_PATH);
protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName(BatchConstants.ARG_LEVEL).hasArg()
.isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create(BatchConstants.ARG_LEVEL);
protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName(BatchConstants.ARG_PARTITION)
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 52b6af5..25a67f9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -66,6 +66,7 @@ public interface BatchConstants {
String CFG_OUTPUT_PARTITION = "partition";
String CFG_MR_SPARK_JOB = "mr.spark.job";
String CFG_SPARK_META_URL = "spark.meta.url";
+ String CFG_GLOBAL_DICT_BASE_DIR = "global.dict.base.dir";
/**
* command line ARGuments
@@ -87,6 +88,7 @@ public interface BatchConstants {
String ARG_INPUT_FORMAT = "inputformat";
String ARG_LEVEL = "level";
String ARG_CONF = "conf";
+ String ARG_DICT_PATH = "dictPath";
/**
* logger and counter
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
index 98ebbb4..d64d300 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java
@@ -21,6 +21,7 @@ package org.apache.kylin.engine.mr.steps;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.List;
import org.apache.commons.cli.Options;
import org.apache.hadoop.fs.FileSystem;
@@ -35,6 +36,8 @@ import org.apache.kylin.common.util.ByteBufferBackedInputStream;
import org.apache.kylin.common.util.ClassUtil;
import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.cli.DictionaryGeneratorCLI;
import org.apache.kylin.dict.DictionaryProvider;
import org.apache.kylin.dict.DistinctColumnValuesProvider;
@@ -55,11 +58,13 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_INPUT_PATH);
+ options.addOption(OPTION_DICT_PATH);
parseOptions(options, args);
final String cubeName = getOptionValue(OPTION_CUBE_NAME);
final String segmentID = getOptionValue(OPTION_SEGMENT_ID);
final String factColumnsInputPath = getOptionValue(OPTION_INPUT_PATH);
+ final String dictPath = getOptionValue(OPTION_DICT_PATH);
final KylinConfig config = KylinConfig.getInstanceFromEnv();
@@ -72,7 +77,16 @@ public class CreateDictionaryJob extends AbstractHadoopJob {
@Override
public Dictionary<String> getDictionary(TblColRef col) throws IOException {
- Path colDir = new Path(factColumnsInputPath, col.getIdentity());
+ CubeManager cubeManager = CubeManager.getInstance(config);
+ CubeInstance cube = cubeManager.getCube(cubeName);
+ List<TblColRef> uhcColumns = CubeManager.getInstance(config).getAllUHCColumns(cube.getDescriptor());
+
+ Path colDir;
+ if (uhcColumns.contains(col)) {
+ colDir = new Path(dictPath, col.getIdentity());
+ } else {
+ colDir = new Path(factColumnsInputPath, col.getIdentity());
+ }
FileSystem fs = HadoopUtil.getWorkingFileSystem();
Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX);
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
index 73c8a20..74c8c2c 100755
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java
@@ -141,7 +141,7 @@ public class FactDistinctColumnsReducer extends KylinReducer<SelfDefineSortableK
}
if (buildDictInReducer) {
builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
- builder.init(null, 0);
+ builder.init(null, 0, null);
}
logger.info("Reducer " + taskId + " handling column " + col + ", buildDictInReducer=" + buildDictInReducer);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java
new file mode 100644
index 0000000..485975a
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryJob.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class UHCDictionaryJob extends AbstractHadoopJob {
+ protected static final Logger logger = LoggerFactory.getLogger(UHCDictionaryJob.class);
+
+ private boolean isSkipped = false;
+
+ @Override
+ public int run(String[] args) throws Exception {
+ Options options = new Options();
+
+ try {
+ options.addOption(OPTION_JOB_NAME);
+ options.addOption(OPTION_CUBE_NAME);
+ options.addOption(OPTION_CUBING_JOB_ID);
+ options.addOption(OPTION_OUTPUT_PATH);
+ options.addOption(OPTION_INPUT_PATH);
+ parseOptions(options, args);
+
+ job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+ String job_id = getOptionValue(OPTION_CUBING_JOB_ID);
+ String cubeName = getOptionValue(OPTION_CUBE_NAME);
+ Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+ Path input = new Path(getOptionValue(OPTION_INPUT_PATH));
+
+ //add metadata to distributed cache
+ CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+ CubeInstance cube = cubeMgr.getCube(cubeName);
+ attachCubeMetadata(cube, job.getConfiguration());
+
+ List<TblColRef> uhcColumns = cubeMgr.getAllUHCColumns(cube.getDescriptor());
+ int reducerCount = uhcColumns.size();
+
+ //Note! handle uhc columns is null.
+ boolean hasUHCValue = false;
+ for (TblColRef tblColRef : uhcColumns) {
+ Path path = new Path(input.toString() + "/" + tblColRef.getIdentity());
+ if (HadoopUtil.getFileSystem(path).exists(path)) {
+ FileInputFormat.addInputPath(job, path);
+ hasUHCValue = true;
+ }
+ }
+
+ if (!hasUHCValue) {
+ isSkipped = true;
+ return 0;
+ }
+
+ setJobClasspath(job, cube.getConfig());
+ setupMapper();
+ setupReducer(output, reducerCount);
+
+ job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+ job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, job_id);
+ job.getConfiguration().set(BatchConstants.CFG_GLOBAL_DICT_BASE_DIR, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
+ job.getConfiguration().set(BatchConstants.CFG_MAPRED_OUTPUT_COMPRESS, "false");
+
+ //8G memory is enough for all global dict, because the input is sequential and we handle global dict slice by slice
+ job.getConfiguration().set("mapreduce.reduce.memory.mb", "8500");
+ job.getConfiguration().set("mapred.reduce.child.java.opts", "-Xmx8g");
+ //Copying global dict to working dir in GlobalDictHDFSStore maybe elapsed a long time (Maybe we could improve it)
+ //Waiting the global dict lock maybe also take a long time.
+ //So we set 8 hours here
+ job.getConfiguration().set("mapreduce.task.timeout", "28800000");
+
+ //allow user specially set config for uhc step
+ for (Map.Entry<String, String> entry : cube.getConfig().getUHCMRConfigOverride().entrySet()) {
+ job.getConfiguration().set(entry.getKey(), entry.getValue());
+ }
+
+ return waitForCompletion(job);
+ } finally {
+ if (job != null)
+ cleanupTempConfFile(job.getConfiguration());
+ }
+ }
+
+ private void setupMapper() throws IOException {
+ job.setInputFormatClass(SequenceFileInputFormat.class);
+ job.setMapperClass(UHCDictionaryMapper.class);
+ job.setMapOutputKeyClass(SelfDefineSortableKey.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ }
+
+ private void setupReducer(Path output, int numberOfReducers) throws IOException {
+ job.setReducerClass(UHCDictionaryReducer.class);
+ job.setPartitionerClass(UHCDictionaryPartitioner.class);
+ job.setNumReduceTasks(numberOfReducers);
+
+ MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, BytesWritable.class);
+ FileOutputFormat.setOutputPath(job, output);
+ job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString());
+
+ //prevent to create zero-sized default output
+ LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class);
+
+ deletePath(job.getConfiguration(), output);
+ }
+
+ @Override
+ public boolean isSkipped() {
+ return isSkipped;
+ }
+
+ public static void main(String[] args) throws Exception {
+ UHCDictionaryJob job = new UHCDictionaryJob();
+ int exitCode = ToolRunner.run(job, args);
+ System.exit(exitCode);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java
new file mode 100644
index 0000000..d9d7b60
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryMapper.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class UHCDictionaryMapper extends KylinMapper<NullWritable, Text, SelfDefineSortableKey, NullWritable> {
+ private static final Logger logger = LoggerFactory.getLogger(UHCDictionaryMapper.class);
+
+ protected int index;
+ protected DataType type;
+
+ protected Text outputKey = new Text();
+ private ByteBuffer tmpBuf;
+ private SelfDefineSortableKey sortableKey = new SelfDefineSortableKey();
+
+ @Override
+ protected void doSetup(Context context) throws IOException {
+ tmpBuf = ByteBuffer.allocate(4096);
+
+ Configuration conf = context.getConfiguration();
+ bindCurrentConfiguration(conf);
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+ CubeInstance cube = CubeManager.getInstance(config).getCube(conf.get(BatchConstants.CFG_CUBE_NAME));
+ List<TblColRef> uhcColumns = CubeManager.getInstance(config).getAllUHCColumns(cube.getDescriptor());
+
+ FileSplit fileSplit = (FileSplit) context.getInputSplit();
+ String colName = fileSplit.getPath().getParent().getName();
+
+ for (int i = 0; i < uhcColumns.size(); i++) {
+ if (uhcColumns.get(i).getIdentity().equalsIgnoreCase(colName)) {
+ index = i;
+ break;
+ }
+ }
+ type = uhcColumns.get(index).getType();
+
+ //for debug
+ logger.info("column name: " + colName);
+ logger.info("index: " + index);
+ logger.info("type: " + type);
+ }
+
+ @Override
+ public void doMap(NullWritable key, Text value, Context context) throws IOException, InterruptedException {
+ tmpBuf.clear();
+ int size = value.getLength()+ 1;
+ if (size >= tmpBuf.capacity()) {
+ tmpBuf = ByteBuffer.allocate(countNewSize(tmpBuf.capacity(), size));
+ }
+ tmpBuf.put(Bytes.toBytes(index)[3]);
+ tmpBuf.put(value.getBytes(), 0, value.getLength());
+ outputKey.set(tmpBuf.array(), 0, tmpBuf.position());
+
+ sortableKey.init(outputKey, type);
+ context.write(sortableKey, NullWritable.get());
+ }
+
+ private int countNewSize(int oldSize, int dataSize) {
+ int newSize = oldSize * 2;
+ while (newSize < dataSize) {
+ newSize = newSize * 2;
+ }
+ return newSize;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryPartitioner.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryPartitioner.java
new file mode 100644
index 0000000..5e8ffa6
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryPartitioner.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.kylin.common.util.BytesUtil;
+
+public class UHCDictionaryPartitioner extends Partitioner<SelfDefineSortableKey, NullWritable> {
+ @Override
+ public int getPartition(SelfDefineSortableKey skey, NullWritable value, int numReduceTasks) {
+ return BytesUtil.readUnsigned(skey.getText().getBytes(), 0, 1);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java
new file mode 100644
index 0000000..ce9b792
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UHCDictionaryReducer.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.IDictionaryBuilder;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.List;
+
+import static org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducer.DICT_FILE_POSTFIX;
+
+public class UHCDictionaryReducer extends KylinReducer<SelfDefineSortableKey, NullWritable, NullWritable, BytesWritable> {
+ private static final Logger logger = LoggerFactory.getLogger(UHCDictionaryReducer.class);
+
+ private IDictionaryBuilder builder;
+ private TblColRef col;
+
+ private MultipleOutputs mos;
+
+ @Override
+ protected void doSetup(Context context) throws IOException {
+ super.bindCurrentConfiguration(context.getConfiguration());
+ Configuration conf = context.getConfiguration();
+ mos = new MultipleOutputs(context);
+
+ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+ String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+ CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
+ CubeDesc cubeDesc = cube.getDescriptor();
+ List<TblColRef> uhcColumns = CubeManager.getInstance(config).getAllUHCColumns(cubeDesc);
+
+ int taskId = context.getTaskAttemptID().getTaskID().getId();
+ col = uhcColumns.get(taskId);
+ logger.info("column name: " + col.getIdentity());
+
+ if (cube.getDescriptor().getShardByColumns().contains(col)) {
+ //for ShardByColumns
+ builder = DictionaryGenerator.newDictionaryBuilder(col.getType());
+ builder.init(null, 0, null);
+ } else {
+ //for GlobalDictionaryColumns
+ String hdfsDir = conf.get(BatchConstants.CFG_GLOBAL_DICT_BASE_DIR);
+ DictionaryInfo dictionaryInfo = new DictionaryInfo(col.getColumnDesc(), col.getDatatype());
+ String builderClass = cubeDesc.getDictionaryBuilderClass(col);
+ builder = (IDictionaryBuilder) ClassUtil.newInstance(builderClass);
+ builder.init(dictionaryInfo, 0, hdfsDir);
+ }
+ }
+
+ @Override
+ public void doReduce(SelfDefineSortableKey skey, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
+ Text key = skey.getText();
+ String value = Bytes.toString(key.getBytes(), 1, key.getLength() - 1);
+ builder.addValue(value);
+ }
+
+ @Override
+ protected void doCleanup(Context context) throws IOException, InterruptedException {
+ Dictionary<String> dict = builder.build();
+ outputDict(col, dict);
+ }
+
+ private void outputDict(TblColRef col, Dictionary<String> dict) throws IOException, InterruptedException {
+ // output written to baseDir/colName/colName.rldict-r-00000 (etc)
+ String dictFileName = col.getIdentity() + "/" + col.getName() + DICT_FILE_POSTFIX;
+
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(baos);) {
+ outputStream.writeUTF(dict.getClass().getName());
+ dict.write(outputStream);
+
+ mos.write(BatchConstants.CFG_OUTPUT_DICT, NullWritable.get(), new BytesWritable(baos.toByteArray()), dictFileName);
+ }
+ mos.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties
index 619bf99..55eb719 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -48,6 +48,8 @@ kylin.storage.url=hbase
# Working folder in HDFS, make sure user has the right access to the hdfs directory
kylin.env.hdfs-working-dir=/kylin
+kylin.env.zookeeper-connect-string=sandbox.hortonworks.com
+
# HBase Cluster FileSystem, which serving hbase, format as hdfs://hbase-cluster:8020
# Leave empty if hbase running on same cluster with hive and mapreduce
#kylin.storage.hbase.cluster-fs=
@@ -55,6 +57,9 @@ kylin.env.hdfs-working-dir=/kylin
kylin.engine.mr.reduce-input-mb=500
+kylin.engine.mr.uhc-config-override.mapreduce.reduce.memory.mb=500
+kylin.engine.mr.uhc-config-override.mapred.reduce.child.java.opts=-Xmx400M
+
### JOB ###
http://git-wip-us.apache.org/repos/asf/kylin/blob/3941b4da/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
index df2ebf7..c578a57 100644
--- a/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/dict/ITGlobalDictionaryBuilderTest.java
@@ -72,7 +72,7 @@ public class ITGlobalDictionaryBuilderTest extends HBaseMetadataTestCase {
finishLatch.await();
GlobalDictionaryBuilder builder = new GlobalDictionaryBuilder();
- builder.init(dictionaryInfo, 0);
+ builder.init(dictionaryInfo, 0, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
builder.addValue("success");
Dictionary<String> dict = builder.build();
@@ -108,7 +108,7 @@ public class ITGlobalDictionaryBuilderTest extends HBaseMetadataTestCase {
GlobalDictionaryBuilder builder = new GlobalDictionaryBuilder();
startLatch.countDown();
- builder.init(dictionaryInfo, 0);
+ builder.init(dictionaryInfo, 0, KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory());
for (int i = 0; i < count; i++) {
builder.addValue(prefix + i);
}