You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/27 10:01:08 UTC
[13/13] incubator-kylin git commit: KYLIN-976 extract
MeasureIngester.reEncodeDictionary()
KYLIN-976 extract MeasureIngester.reEncodeDictionary()
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/dac24880
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/dac24880
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/dac24880
Branch: refs/heads/KYLIN-976
Commit: dac248804b331f491175e06fd9f598235b2c9e6f
Parents: ce61309
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Nov 27 16:59:51 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Nov 27 16:59:51 2015 +0800
----------------------------------------------------------------------
.../apache/kylin/measure/MeasureIngester.java | 2 +
.../org/apache/kylin/measure/MeasureType.java | 3 -
.../kylin/measure/basic/BasicMeasureType.java | 7 --
.../kylin/measure/basic/BigDecimalIngester.java | 6 ++
.../kylin/measure/basic/DoubleIngester.java | 6 ++
.../kylin/measure/basic/LongIngester.java | 6 ++
.../kylin/measure/hllc/HLLCMeasureType.java | 11 ++-
.../kylin/measure/topn/TopNMeasureType.java | 43 +++++++++---
.../mr/steps/MergeCuboidFromStorageMapper.java | 68 +++++++------------
.../engine/mr/steps/MergeCuboidMapper.java | 71 +++++++-------------
10 files changed, 107 insertions(+), 116 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
index 8d6e601..9c7b406 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
@@ -41,4 +41,6 @@ abstract public class MeasureIngester<V> {
}
abstract public V valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap);
+
+ abstract public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts);
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index 604a7b6..4fe59c0 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -21,7 +21,6 @@ package org.apache.kylin.measure;
import java.util.List;
import java.util.Map;
-import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.measure.basic.BasicMeasureFactory;
import org.apache.kylin.measure.hllc.HLLCAggregationFactory;
import org.apache.kylin.measure.topn.TopNMeasureFactory;
@@ -86,8 +85,6 @@ abstract public class MeasureType {
abstract public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc);
- abstract public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts);
-
/* ============================================================================
* Cube Selection
* ---------------------------------------------------------------------------- */
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
index f6bf090..fe53bab 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
@@ -20,7 +20,6 @@ package org.apache.kylin.measure.basic;
import java.util.List;
-import org.apache.kylin.common.util.Dictionary;
import org.apache.kylin.measure.MeasureAggregator;
import org.apache.kylin.measure.MeasureIngester;
import org.apache.kylin.measure.MeasureType;
@@ -126,10 +125,4 @@ public class BasicMeasureType extends MeasureType {
return null;
}
- @Override
- public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
- // TODO Auto-generated method stub
- return null;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
index bb743d6..ea1495c 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
@@ -28,6 +28,7 @@ import org.apache.kylin.metadata.model.TblColRef;
public class BigDecimalIngester extends MeasureIngester<BigDecimal> {
+ @Override
public BigDecimal valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
if (values.length > 1)
throw new IllegalArgumentException();
@@ -37,4 +38,9 @@ public class BigDecimalIngester extends MeasureIngester<BigDecimal> {
else
return new BigDecimal(values[0]);
}
+
+ @Override
+ public BigDecimal reEncodeDictionary(BigDecimal value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
index 506ed19..aaa754a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
@@ -31,6 +31,7 @@ public class DoubleIngester extends MeasureIngester<DoubleMutable> {
// avoid repeated object creation
private DoubleMutable current = new DoubleMutable();
+ @Override
public DoubleMutable valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
if (values.length > 1)
throw new IllegalArgumentException();
@@ -42,4 +43,9 @@ public class DoubleIngester extends MeasureIngester<DoubleMutable> {
l.set(Double.parseDouble(values[0]));
return l;
}
+
+ @Override
+ public DoubleMutable reEncodeDictionary(DoubleMutable value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
index 5bf1257..bdc1704 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
@@ -31,6 +31,7 @@ public class LongIngester extends MeasureIngester<LongMutable> {
// avoid repeated object creation
private LongMutable current = new LongMutable();
+ @Override
public LongMutable valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
if (values.length > 1)
throw new IllegalArgumentException();
@@ -42,4 +43,9 @@ public class LongIngester extends MeasureIngester<LongMutable> {
l.set(Long.parseLong(values[0]));
return l;
}
+
+ @Override
+ public LongMutable reEncodeDictionary(LongMutable value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
index ee90818..2ad7630 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
@@ -74,6 +74,11 @@ public class HLLCMeasureType extends MeasureType {
hllc.add(v == null ? "__nUlL__" : v);
return hllc;
}
+
+ @Override
+ public HyperLogLogPlusCounter reEncodeDictionary(HyperLogLogPlusCounter value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) {
+ throw new UnsupportedOperationException();
+ }
};
}
@@ -91,10 +96,4 @@ public class HLLCMeasureType extends MeasureType {
return null;
}
- @Override
- public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
- // TODO Auto-generated method stub
- return null;
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index 1d2c87b..561f9f1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -18,9 +18,11 @@
package org.apache.kylin.measure.topn;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
+import org.apache.kylin.common.topn.Counter;
import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.BytesUtil;
@@ -88,6 +90,37 @@ public class TopNMeasureType extends MeasureType {
topNCounter.offer(key, counter);
return topNCounter;
}
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public TopNCounter reEncodeDictionary(TopNCounter value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) {
+ TopNCounter<ByteArray> topNCounter = (TopNCounter<ByteArray>) value;
+
+ TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn();
+ Dictionary<String> sourceDict = oldDicts.get(colRef);
+ Dictionary<String> mergedDict = newDicts.get(colRef);
+
+ int topNSize = topNCounter.size();
+ byte[] newIdBuf = new byte[topNSize * mergedDict.getSizeOfId()];
+ byte[] literal = new byte[sourceDict.getSizeOfValue()];
+
+ int bufOffset = 0;
+ for (Counter<ByteArray> c : topNCounter) {
+ int oldId = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
+ int newId;
+ int size = sourceDict.getValueBytesFromId(oldId, literal, 0);
+ if (size < 0) {
+ newId = mergedDict.nullId();
+ } else {
+ newId = mergedDict.getIdFromValueBytes(literal, 0, size);
+ }
+
+ BytesUtil.writeUnsigned(newId, newIdBuf, bufOffset, mergedDict.getSizeOfId());
+ c.getItem().set(newIdBuf, bufOffset, mergedDict.getSizeOfId());
+ bufOffset += mergedDict.getSizeOfId();
+ }
+ return value;
+ }
};
}
@@ -98,14 +131,8 @@ public class TopNMeasureType extends MeasureType {
@Override
public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
- // TODO Auto-generated method stub
- return null;
+ TblColRef literalCol = measureDesc.getFunction().getParameter().getColRefs().get(1);
+ return Collections.singletonList(literalCol);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index bc1c883..b4682dd 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -22,10 +22,9 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.Dictionary;
@@ -48,6 +47,8 @@ import org.apache.kylin.engine.mr.MRUtil;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
@@ -60,6 +61,7 @@ import com.google.common.collect.Lists;
/**
* @author shaoshi
*/
+@SuppressWarnings({ "rawtypes", "unchecked" })
public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, ByteArrayWritable, ByteArrayWritable> {
private static final Logger logger = LoggerFactory.getLogger(MergeCuboidFromStorageMapper.class);
@@ -87,7 +89,9 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
private MeasureCodec codec;
private ByteArrayWritable outputValue = new ByteArrayWritable();
- private List<Integer> topNMeasureIdx;
+ private List<Pair<Integer, MeasureIngester>> dictMeasures;
+ private Map<TblColRef, Dictionary<String>> oldDicts;
+ private Map<TblColRef, Dictionary<String>> newDicts;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
@@ -115,12 +119,18 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
measureDescs = cubeDesc.getMeasures();
codec = new MeasureCodec(measureDescs);
- topNMeasureIdx = Lists.newArrayList();
+ dictMeasures = Lists.newArrayList();
for (int i = 0; i < measureDescs.size(); i++) {
- if (measureDescs.get(i).getFunction().isTopN()) {
- topNMeasureIdx.add(i);
+ MeasureDesc measureDesc = measureDescs.get(i);
+ MeasureType measureType = MeasureType.create(measureDesc.getFunction());
+ if (measureType.getColumnsNeedDictionary(measureDesc).isEmpty() == false) {
+ dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester()));
}
}
+ if (dictMeasures.size() > 0) {
+ oldDicts = sourceCubeSegment.buildDictionaryMap();
+ newDicts = mergedCubeSegment.buildDictionaryMap();
+ }
}
@Override
@@ -193,7 +203,9 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
outputKey.set(newKeyBuf.array(), 0, fullKeySize);
// re-encode measures if dictionary is used
- reEncodeMeasure(value);
+ if (dictMeasures.size() > 0) {
+ reEncodeMeasure(value);
+ }
valueBuf.clear();
codec.encode(value, valueBuf);
@@ -217,45 +229,11 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
}
}
- @SuppressWarnings("unchecked")
private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException {
- // currently only topN uses dictionary in measure obj
- if (topNMeasureIdx.isEmpty())
- return;
-
- int bufOffset = 0;
- for (int idx : topNMeasureIdx) {
- TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx];
-
- MeasureDesc measureDesc = measureDescs.get(idx);
- TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn();
- DictionaryManager dictMgr = DictionaryManager.getInstance(config);
- Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
- Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
-
- int topNSize = topNCounters.size();
- while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
- mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
- mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
- byte[] oldBuf = newKeyBodyBuf;
- newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
- System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
- }
-
- for (Counter<ByteArray> c : topNCounters) {
- int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
- int idInMergedDict;
- int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
- if (size < 0) {
- idInMergedDict = mergedDict.nullId();
- } else {
- idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
- }
-
- BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
- c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
- bufOffset += mergedDict.getSizeOfId();
- }
+ for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
+ int i = pair.getFirst();
+ MeasureIngester ingester = pair.getSecond();
+ measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/dac24880/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index c0277b5..4fc7236 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -22,17 +22,17 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.SplittedBytes;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -48,6 +48,8 @@ import org.apache.kylin.engine.mr.KylinMapper;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
import org.apache.kylin.metadata.model.MeasureDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
@@ -57,6 +59,7 @@ import com.google.common.collect.Lists;
/**
* @author ysong1, honma
*/
+@SuppressWarnings({"rawtypes", "unchecked"})
public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private KylinConfig config;
@@ -78,7 +81,9 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
private HashMap<TblColRef, Boolean> dimensionsNeedDict = new HashMap<TblColRef, Boolean>();
// for re-encode measures that use dictionary
- private List<Integer> topNMeasureIdx;
+ private List<Pair<Integer, MeasureIngester>> dictMeasures;
+ private Map<TblColRef, Dictionary<String>> oldDicts;
+ private Map<TblColRef, Dictionary<String>> newDicts;
private List<MeasureDesc> measureDescs;
private MeasureCodec codec;
private Object[] measureObjs;
@@ -115,12 +120,19 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
measureObjs = new Object[measureDescs.size()];
valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
outputValue = new Text();
- topNMeasureIdx = Lists.newArrayList();
+
+ dictMeasures = Lists.newArrayList();
for (int i = 0; i < measureDescs.size(); i++) {
- if (measureDescs.get(i).getFunction().isTopN()) {
- topNMeasureIdx.add(i);
+ MeasureDesc measureDesc = measureDescs.get(i);
+ MeasureType measureType = MeasureType.create(measureDesc.getFunction());
+ if (measureType.getColumnsNeedDictionary(measureDesc).isEmpty() == false) {
+ dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester()));
}
}
+ if (dictMeasures.size() > 0) {
+ oldDicts = sourceCubeSegment.buildDictionaryMap();
+ newDicts = mergedCubeSegment.buildDictionaryMap();
+ }
}
private static final Pattern JOB_NAME_PATTERN = Pattern.compile("kylin-([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
@@ -214,9 +226,13 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
outputKey.set(newKeyBuf.array(), 0, fullKeySize);
// re-encode measures if dictionary is used
- if (topNMeasureIdx.size() > 0) {
+ if (dictMeasures.size() > 0) {
codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
- reEncodeMeasure(measureObjs);
+ for (Pair<Integer, MeasureIngester> pair : dictMeasures) {
+ int i = pair.getFirst();
+ MeasureIngester ingester = pair.getSecond();
+ measureObjs[i] = ingester.reEncodeDictionary(measureObjs[i], measureDescs.get(i), oldDicts, newDicts);
+ }
valueBuf.clear();
codec.encode(measureObjs, valueBuf);
outputValue.set(valueBuf.array(), 0, valueBuf.position());
@@ -240,43 +256,4 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
return ret;
}
}
-
- @SuppressWarnings("unchecked")
- private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException {
- int bufOffset = 0;
- for (int idx : topNMeasureIdx) {
- // only TopN measure uses dic
- TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx];
-
- MeasureDesc measureDesc = measureDescs.get(idx);
- TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn();
- DictionaryManager dictMgr = DictionaryManager.getInstance(config);
- Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
- Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
-
- int topNSize = topNCounters.size();
- while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
- mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
- mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
- byte[] oldBuf = newKeyBodyBuf;
- newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
- System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
- }
-
- for (Counter<ByteArray> c : topNCounters) {
- int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
- int idInMergedDict;
- int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
- if (size < 0) {
- idInMergedDict = mergedDict.nullId();
- } else {
- idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
- }
-
- BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
- c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
- bufOffset += mergedDict.getSizeOfId();
- }
- }
- }
}