You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2015/11/11 12:58:46 UTC
incubator-kylin git commit: KYLIN-1137 TopN measure need support
dictionary merge
Repository: incubator-kylin
Updated Branches:
refs/heads/KYLIN-1126 8796fa37f -> 6476549c9
KYLIN-1137 TopN measure need support dictionary merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/6476549c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/6476549c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/6476549c
Branch: refs/heads/KYLIN-1126
Commit: 6476549c9690da8eeefa4fe6c0ee476c3d9bd561
Parents: 8796fa3
Author: shaofengshi <sh...@apache.org>
Authored: Wed Nov 11 19:58:02 2015 +0800
Committer: shaofengshi <sh...@apache.org>
Committed: Wed Nov 11 19:58:02 2015 +0800
----------------------------------------------------------------------
.../org/apache/kylin/common/topn/Counter.java | 4 +
.../org/apache/kylin/cube/model/CubeDesc.java | 10 +++
.../engine/mr/steps/InMemCuboidMapper.java | 26 ++-----
.../engine/mr/steps/MergeCuboidMapper.java | 79 +++++++++++++++++++-
.../engine/mr/steps/MergeDictionaryStep.java | 17 ++---
5 files changed, 106 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6476549c/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
index 2bca4df..461e083 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/Counter.java
@@ -50,6 +50,10 @@ public class Counter<T> implements Externalizable {
public T getItem() {
return item;
}
+
+ public void setItem(T item) {
+ this.item = item;
+ }
public double getCount() {
return count;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6476549c/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 160090a..2709247 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -847,4 +847,14 @@ public class CubeDesc extends RootPersistentEntity {
public LinkedHashSet<TblColRef> getMeasureDisplayColumns() {
return measureDisplayColumns;
}
+
+
+ public boolean hasMeasureUsingDictionary() {
+ for (MeasureDesc measureDesc : this.getMeasures()) {
+ if (measureDesc.getFunction().isTopN())
+ return true;
+ }
+
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6476549c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index 2bf627b..d724c76 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -65,26 +65,14 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
- for (DimensionDesc dim : cubeDesc.getDimensions()) {
- // dictionary
- for (TblColRef col : dim.getColumnRefs()) {
- if (cubeDesc.getRowkey().isUseDictionary(col)) {
- Dictionary<?> dict = cubeSegment.getDictionary(col);
- if (dict == null) {
- logger.warn("Dictionary for " + col + " was not found.");
- }
-
- dictionaryMap.put(col, cubeSegment.getDictionary(col));
- }
- }
- }
-
- for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
- if (measureDesc.getFunction().isTopN()) {
- List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
- TblColRef col = colRefs.get(colRefs.size() - 1);
- dictionaryMap.put(col, cubeSegment.getDictionary(col));
+ // dictionary
+ for (TblColRef col : cubeDesc.getAllColumnsNeedDictionary()) {
+ Dictionary<?> dict = cubeSegment.getDictionary(col);
+ if (dict == null) {
+ logger.warn("Dictionary for " + col + " was not found.");
}
+
+ dictionaryMap.put(col, cubeSegment.getDictionary(col));
}
DoggedCubeBuilder cubeBuilder = new DoggedCubeBuilder(cube.getDescriptor(), dictionaryMap);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6476549c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 0b68e59..b3b38dc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -19,13 +19,18 @@
package org.apache.kylin.engine.mr.steps;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.HashMap;
+import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import com.google.common.collect.Lists;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.SplittedBytes;
@@ -43,6 +48,9 @@ import org.apache.kylin.dict.DictionaryManager;
import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.measure.MeasureAggregators;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
@@ -68,6 +76,12 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private RowKeyEncoderProvider rowKeyEncoderProvider;
private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
+ private List<MeasureDesc> measuresDescs;
+ private MeasureCodec codec;
+ private Object[] measureObjs;
+ private Integer[] measureIdxUsingDict;
+ private ByteBuffer valueBuf;
+ private Text outputValue;
private Boolean checkNeedMerging(TblColRef col) throws IOException {
Boolean ret = dictsNeedMerging.get(col);
@@ -108,6 +122,22 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
+
+ if (cubeDesc.hasMeasureUsingDictionary()) {
+ measuresDescs = cubeDesc.getMeasures();
+ codec = new MeasureCodec(measuresDescs);
+ measureObjs = new Object[measuresDescs.size()];
+ List<Integer> measuresUsingDict = Lists.newArrayList();
+ for (int i = 0; i < measuresDescs.size(); i++) {
+ if (measuresDescs.get(i).getFunction().isTopN()) {
+ // so far only TopN uses dic
+ measuresUsingDict.add(i);
+ }
+ }
+ measureIdxUsingDict = measuresUsingDict.toArray(new Integer[measuresUsingDict.size()]);
+ valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+ outputValue = new Text();
+ }
}
private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
@@ -200,6 +230,53 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
outputKey.set(newKeyBuf.array(), 0, fullKeySize);
- context.write(outputKey, value);
+
+ // encode measure if it uses dictionary
+ if (cubeDesc.hasMeasureUsingDictionary()) {
+ codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
+ bufOffset = 0;
+ for (int idx : measureIdxUsingDict) {
+ // only TopN measure uses dic
+ TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx];
+
+ MeasureDesc measureDesc = measuresDescs.get(idx);
+ String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn();
+ TblColRef col = cubeDesc.findColumnRef(cubeDesc.getFactTable(), displayCol);
+ DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+ Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(col));
+ Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(col));
+
+ int topNSize = topNCounters.size();
+ while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+ mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+ mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
+ byte[] oldBuf = newKeyBodyBuf;
+ newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
+ System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
+ }
+
+ for (Counter<ByteArray> c : topNCounters) {
+ int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
+ int idInMergedDict;
+ int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
+ if (size < 0) {
+ idInMergedDict = mergedDict.nullId();
+ } else {
+ idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
+ }
+
+ BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+ c.setItem(new ByteArray(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId()));
+ bufOffset += mergedDict.getSizeOfId();
+ }
+ }
+
+ valueBuf.clear();
+ codec.encode(measureObjs, valueBuf);
+ outputValue.set(valueBuf.array(), 0, valueBuf.position());
+ context.write(outputKey, outputValue);
+ } else {
+ context.write(outputKey, value);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/6476549c/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
index 2db4ce7..b73fda4 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@ -110,16 +110,13 @@ public class MergeDictionaryStep extends AbstractExecutable {
DictionaryManager dictMgr = DictionaryManager.getInstance(conf);
CubeDesc cubeDesc = cube.getDescriptor();
- for (DimensionDesc dim : cubeDesc.getDimensions()) {
- for (TblColRef col : dim.getColumnRefs()) {
- if (newSeg.getCubeDesc().getRowkey().isUseDictionary(col)) {
- String dictTable = dictMgr.decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable();
- if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
- colsNeedMeringDict.add(col);
- } else {
- colsNeedCopyDict.add(col);
- }
- }
+
+ for (TblColRef col : cubeDesc.getAllColumnsNeedDictionary()) {
+ String dictTable = dictMgr.decideSourceData(cubeDesc.getModel(), "true", col).getTable();
+ if (cubeDesc.getFactTable().equalsIgnoreCase(dictTable)) {
+ colsNeedMeringDict.add(col);
+ } else {
+ colsNeedCopyDict.add(col);
}
}