You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/03/27 03:32:45 UTC
[42/50] [abbrv] kylin git commit: KYLIN-1476 Support measure
dictionary in 1.x
KYLIN-1476 Support measure dictionary in 1.x
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d517530c
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d517530c
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d517530c
Branch: refs/heads/1.3.x
Commit: d517530c93dc3176a52295af297afdfefdfc4b62
Parents: 438a6ad
Author: wangxiaoyu <ro...@gmail.com>
Authored: Tue Mar 8 23:39:51 2016 +0800
Committer: Xiaoyu Wang <wa...@apache.org>
Committed: Mon Mar 21 20:00:57 2016 +0800
----------------------------------------------------------------------
.../java/org/apache/kylin/cube/CubeManager.java | 26 ++++++++++++-
.../kylin/cube/cli/DictionaryGeneratorCLI.java | 14 +++----
.../kylin/job/cube/MergeDictionaryStep.java | 17 ++++-----
.../hadoop/cube/FactDistinctColumnsMapper.java | 40 +++++---------------
.../hadoop/cube/FactDistinctColumnsReducer.java | 12 ++----
.../hadoop/hive/CubeJoinedFlatTableDesc.java | 18 +++++++--
6 files changed, 65 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 88ce4f1..4615762 100644
--- a/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -194,11 +194,11 @@ public class CubeManager implements IRealizationProvider {
public DictionaryInfo buildDictionary(CubeSegment cubeSeg, TblColRef col, String factColumnsPath) throws IOException {
CubeDesc cubeDesc = cubeSeg.getCubeDesc();
- if (!cubeDesc.getRowkey().isUseDictionary(col))
+ if (!cubeDesc.getAllColumnsNeedDictionary().contains(col))
return null;
DictionaryManager dictMgr = getDictionaryManager();
- DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, factColumnsPath);
+ DictionaryInfo dictInfo = dictMgr.buildDictionary(cubeDesc.getModel(), "true", col, factColumnsPath);
if (dictInfo != null) {
cubeSeg.putDictResPath(col, dictInfo.getResourcePath());
@@ -791,4 +791,26 @@ public class CubeManager implements IRealizationProvider {
return getCube(name);
}
+
+ /**
+ * Get the columns which need build the dictionary from fact table. (the column exists on fact and is not fk)
+ * @param cubeDesc
+ * @return
+ * @throws IOException
+ */
+ public List<TblColRef> getAllDictColumnsOnFact(CubeDesc cubeDesc) throws IOException {
+ List<TblColRef> dictionaryColumns = cubeDesc.getAllColumnsNeedDictionary();
+
+ List<TblColRef> factDictCols = new ArrayList<TblColRef>();
+ DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+ for (int i = 0; i < dictionaryColumns.size(); i++) {
+ TblColRef col = dictionaryColumns.get(i);
+
+ String scanTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), "true", col, null)[0];
+ if (cubeDesc.getModel().isFactTable(scanTable)) {
+ factDictCols.add(col);
+ }
+ }
+ return factDictCols;
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
----------------------------------------------------------------------
diff --git a/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index 3cdaa93..bcbd496 100644
--- a/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ b/cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -44,15 +44,13 @@ public class DictionaryGeneratorCLI {
private static void processSegment(KylinConfig config, CubeSegment cubeSeg, String factColumnsPath) throws IOException {
CubeManager cubeMgr = CubeManager.getInstance(config);
- for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
- // dictionary
- for (TblColRef col : dim.getColumnRefs()) {
- if (cubeSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
- logger.info("Building dictionary for " + col);
- cubeMgr.buildDictionary(cubeSeg, col, factColumnsPath);
- }
- }
+ // dictionary
+ for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionary()) {
+ logger.info("Building dictionary for " + col);
+ cubeMgr.buildDictionary(cubeSeg, col, factColumnsPath);
+ }
+ for (DimensionDesc dim : cubeSeg.getCubeDesc().getDimensions()) {
// build snapshot
if (dim.getTable() != null && !dim.getTable().equalsIgnoreCase(cubeSeg.getCubeDesc().getFactTable())) {
// CubeSegment seg = cube.getTheOnlySegment();
http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java b/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
index 8f77558..47151a7 100644
--- a/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
+++ b/job/src/main/java/org/apache/kylin/job/cube/MergeDictionaryStep.java
@@ -107,16 +107,13 @@ public class MergeDictionaryStep extends AbstractExecutable {
DictionaryManager dictMgr = DictionaryManager.getInstance(conf);
CubeDesc cubeDesc = cube.getDescriptor();
- for (DimensionDesc dim : cubeDesc.getDimensions()) {
- for (TblColRef col : dim.getColumnRefs()) {
- if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
- String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
- if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
- colsNeedMeringDict.add(col);
- } else {
- colsNeedCopyDict.add(col);
- }
- }
+
+ for (TblColRef col : cubeDesc.getAllColumnsNeedDictionary()) {
+ String dictTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), "true", col, null)[0];
+ if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
+ colsNeedMeringDict.add(col);
+ } else {
+ colsNeedCopyDict.add(col);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
index 72802aa..b70113d 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsMapper.java
@@ -19,7 +19,6 @@
package org.apache.kylin.job.hadoop.cube;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -34,10 +33,7 @@ import org.apache.kylin.common.mr.KylinMapper;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.RowKeyDesc;
-import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc;
@@ -51,7 +47,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRec
private String cubeName;
private CubeInstance cube;
private CubeDesc cubeDesc;
- private int[] factDictCols;
+ List<TblColRef> factDictCols;
private CubeJoinedFlatTableDesc intermediateTableDesc;
@@ -60,6 +56,7 @@ public class FactDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRec
private int errorRecordCounter;
private HCatSchema schema = null;
+ protected int[] dictionaryColumnIndex;
@Override
protected void setup(Context context) throws IOException {
@@ -73,40 +70,23 @@ public class FactDistinctColumnsMapper<KEYIN> extends KylinMapper<KEYIN, HCatRec
cubeDesc = cube.getDescriptor();
intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
- List<TblColRef> columns = baseCuboid.getColumns();
-
- ArrayList<Integer> factDictCols = new ArrayList<Integer>();
- RowKeyDesc rowkey = cubeDesc.getRowkey();
- DictionaryManager dictMgr = DictionaryManager.getInstance(config);
- for (int i = 0; i < columns.size(); i++) {
- TblColRef col = columns.get(i);
- if (rowkey.isUseDictionary(col) == false)
- continue;
-
- String scanTable = (String) dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col, null)[0];
- if (cubeDesc.getModel().isFactTable(scanTable)) {
- factDictCols.add(i);
- }
+ factDictCols = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc);
+ dictionaryColumnIndex = new int[factDictCols.size()];
+ for (int i = 0; i < factDictCols.size(); i++) {
+ TblColRef colRef = factDictCols.get(i);
+ int columnIndexOnFlatTbl = intermediateTableDesc.getColumnIndex(colRef);
+ dictionaryColumnIndex[i] = columnIndexOnFlatTbl;
}
- this.factDictCols = new int[factDictCols.size()];
- for (int i = 0; i < factDictCols.size(); i++)
- this.factDictCols[i] = factDictCols.get(i);
-
schema = HCatInputFormat.getTableSchema(context.getConfiguration());
}
@Override
public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
-
try {
-
- int[] flatTableIndexes = intermediateTableDesc.getRowKeyColumnIndexes();
HCatFieldSchema fieldSchema = null;
- for (int i : factDictCols) {
+ for (int i = 0; i < factDictCols.size(); i++) {
outputKey.set((short) i);
- fieldSchema = schema.get(flatTableIndexes[i]);
+ fieldSchema = schema.get(dictionaryColumnIndex[i]);
Object fieldValue = record.get(fieldSchema.getName(), schema);
if (fieldValue == null)
continue;
http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
index 89f90ba..809ced2 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cube/FactDistinctColumnsReducer.java
@@ -31,14 +31,12 @@ import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Bytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.cuboid.Cuboid;
import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.job.constant.BatchConstants;
import org.apache.kylin.job.hadoop.AbstractHadoopJob;
import org.apache.kylin.metadata.model.TblColRef;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -47,8 +45,7 @@ import java.util.List;
*/
public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text, NullWritable, Text> {
- private List<TblColRef> columnList = new ArrayList<TblColRef>();
-
+ private List<TblColRef> factDictCols;
@Override
protected void setup(Context context) throws IOException {
super.publishConfiguration(context.getConfiguration());
@@ -58,15 +55,12 @@ public class FactDistinctColumnsReducer extends KylinReducer<ShortWritable, Text
String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
CubeDesc cubeDesc = cube.getDescriptor();
-
- long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
- Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
- columnList = baseCuboid.getColumns();
+ factDictCols = CubeManager.getInstance(config).getAllDictColumnsOnFact(cubeDesc);
}
@Override
public void reduce(ShortWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
- TblColRef col = columnList.get(key.get());
+ TblColRef col = factDictCols.get(key.get());
HashSet<ByteArray> set = new HashSet<ByteArray>();
for (Text textValue : values) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/d517530c/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java b/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java
index b2bed9f..72dd400 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/hive/CubeJoinedFlatTableDesc.java
@@ -48,9 +48,12 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
private List<IntermediateColumnDesc> columnList = Lists.newArrayList();
+ private Map<String, Integer> columnIndexMap;
+
public CubeJoinedFlatTableDesc(CubeDesc cubeDesc, CubeSegment cubeSegment) {
this.cubeDesc = cubeDesc;
this.cubeSegment = cubeSegment;
+ this.columnIndexMap = Maps.newHashMap();
parseCubeDesc();
}
@@ -73,10 +76,9 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
this.tableName = "kylin_intermediate_" + cubeDesc.getName() + "_" + cubeSegment.getName();
}
- Map<String, Integer> dimensionIndexMap = Maps.newHashMap();
int columnIndex = 0;
for (TblColRef col : cubeDesc.listDimensionColumnsExcludingDerived()) {
- dimensionIndexMap.put(colName(col.getCanonicalName()), columnIndex);
+ columnIndexMap.put(colName(col.getCanonicalName()), columnIndex);
columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), col));
columnIndex++;
}
@@ -86,7 +88,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
rowKeyColumnIndexes = new int[rowkeyColCount];
for (int i = 0; i < rowkeyColCount; i++) {
String colName = colName(cuboidColumns.get(i).getCanonicalName());
- Integer dimIdx = dimensionIndexMap.get(colName);
+ Integer dimIdx = columnIndexMap.get(colName);
if (dimIdx == null) {
throw new RuntimeException("Can't find column " + colName);
}
@@ -108,6 +110,7 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
measureColumnIndexes[i][j] = contains(columnList, c);
if (measureColumnIndexes[i][j] < 0) {
measureColumnIndexes[i][j] = columnIndex;
+ columnIndexMap.put(colName(c.getCanonicalName()), columnIndex);
columnList.add(new IntermediateColumnDesc(String.valueOf(columnIndex), c));
columnIndex++;
}
@@ -172,4 +175,13 @@ public class CubeJoinedFlatTableDesc implements IJoinedFlatTableDesc {
private static String colName(String canonicalColName) {
return canonicalColName.replace(".", "_");
}
+
+ public int getColumnIndex(TblColRef colRef) {
+ String key = colName(colRef.getCanonicalName());
+ Integer index = columnIndexMap.get(key);
+ if (index == null)
+ throw new IllegalArgumentException("Column " + colRef.toString() + " wasn't found on flat table.");
+
+ return index.intValue();
+ }
}