You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:04:57 UTC
[20/50] incubator-kylin git commit: Merge branch
'streaming-localdict' of https://github.com/KylinOLAP/Kylin into
streaming-localdict
Merge branch 'streaming-localdict' of https://github.com/KylinOLAP/Kylin into streaming-localdict
Conflicts:
invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
storage/src/main/java/org/apache/kylin/storage/gridtable/GTDictionaryCodeSystem.java
storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java
Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7f73abe5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7f73abe5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7f73abe5
Branch: refs/heads/streaming-localdict
Commit: 7f73abe5c53fc165ff01b920850fe4caf8ab9e0d
Parents: 959d031 7088724
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Mar 27 11:39:24 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Mar 27 11:39:24 2015 +0800
----------------------------------------------------------------------
.../common/hll/HyperLogLogPlusCounter.java | 29 ++-
.../kylin/common/persistence/ResourceStore.java | 1 +
.../org/apache/kylin/common/util/ByteArray.java | 2 +-
.../org/apache/kylin/common/util/BytesUtil.java | 16 +-
.../apache/kylin/common/util/BytesUtilTest.java | 20 ++
.../java/org/apache/kylin/dict/Dictionary.java | 31 ++--
.../org/apache/kylin/dict/TrieDictionary.java | 48 ++---
.../apache/kylin/dict/NumberDictionaryTest.java | 2 +-
.../metadata/model_desc/kylin_sales_model.json | 17 ++
.../localmeta/streaming/kafka_test.json | 15 ++
.../apache/kylin/invertedindex/IIInstance.java | 12 ++
.../apache/kylin/invertedindex/IIManager.java | 17 +-
.../invertedindex/index/BatchSliceBuilder.java | 8 +-
.../model/IIJoinedFlatTableDesc.java | 12 +-
.../invertedindex/model/IIKeyValueCodec.java | 91 +++++----
.../model/IIKeyValueCodecWithState.java | 68 +++++++
.../apache/kylin/invertedindex/model/IIRow.java | 13 ++
.../org/apache/kylin/job/JoinedFlatTable.java | 1 -
.../kylin/job/constant/BatchConstants.java | 5 +
.../kylin/job/constant/ExecutableConstants.java | 1 +
.../apache/kylin/job/cube/CubingJobBuilder.java | 85 ++++++---
.../kylin/job/hadoop/AbstractHadoopJob.java | 5 +-
.../kylin/job/hadoop/cube/BaseCuboidMapper.java | 2 +-
.../kylin/job/hadoop/cube/CubeHFileMapper.java | 2 +-
.../kylin/job/hadoop/cube/CuboidReducer.java | 2 +-
.../cube/FactDistinctColumnsCombiner.java | 26 ++-
.../job/hadoop/cube/FactDistinctColumnsJob.java | 14 +-
.../hadoop/cube/FactDistinctColumnsMapper.java | 139 --------------
.../cube/FactDistinctColumnsMapperBase.java | 81 ++++++++
.../hadoop/cube/FactDistinctColumnsReducer.java | 143 ++++++++++++---
.../cube/FactDistinctHiveColumnsMapper.java | 148 +++++++++++++++
.../cube/FactDistinctIIColumnsMapper.java | 129 +++++++++++++
.../job/hadoop/cube/MergeCuboidMapper.java | 2 +-
.../kylin/job/hadoop/cube/NDCuboidMapper.java | 2 +-
.../job/hadoop/cube/NewBaseCuboidMapper.java | 2 +-
.../hadoop/cubev2/BuildDictionaryMapper.java | 2 +-
.../kylin/job/hadoop/cubev2/InMemCuboidJob.java | 183 +++++++++++++++++++
.../job/hadoop/cubev2/InMemCuboidMapper.java | 163 ++++++++++-------
.../job/hadoop/cubev2/InMemCuboidReducer.java | 82 +++++++++
.../invertedindex/InvertedIndexMapper.java | 2 +-
.../invertedindex/InvertedIndexPartitioner.java | 2 +-
.../invertedindex/InvertedIndexReducer.java | 2 +-
.../kylin/job/streaming/StreamingBootstrap.java | 117 ++++++++++++
.../kylin/job/streaming/StreamingCLI.java | 71 +++++++
.../kylin/job/BuildCubeWithStreamTest.java | 4 +-
.../apache/kylin/job/IIStreamBuilderTest.java | 80 ++++++++
.../kylin/metadata/model/DimensionDesc.java | 12 +-
.../metadata/model/IJoinedFlatTableDesc.java | 2 -
.../metadata/model/IntermediateColumnDesc.java | 4 +
pom.xml | 1 +
.../gridtable/GTDictionaryCodeSystem.java | 72 +++++---
.../endpoint/HbaseServerKVIterator.java | 9 +-
streaming/pom.xml | 8 +
.../apache/kylin/streaming/BrokerConfig.java | 78 ++++++++
.../kylin/streaming/JsonStreamParser.java | 73 ++++++++
.../org/apache/kylin/streaming/KafkaConfig.java | 99 +++++-----
.../apache/kylin/streaming/KafkaConsumer.java | 22 +--
.../apache/kylin/streaming/KafkaRequester.java | 128 +++++++------
.../apache/kylin/streaming/StreamBuilder.java | 9 +
.../apache/kylin/streaming/StreamManager.java | 114 ++++++++++++
.../apache/kylin/streaming/StreamParser.java | 47 +++++
.../kylin/streaming/StringStreamParser.java | 55 ++++++
.../kylin/streaming/cube/CubeStreamBuilder.java | 37 ++--
.../invertedindex/IIStreamBuilder.java | 6 +-
.../kylin/streaming/EternalStreamProducer.java | 5 +-
.../apache/kylin/streaming/KafkaBaseTest.java | 23 ---
.../apache/kylin/streaming/KafkaConfigTest.java | 65 -------
.../kylin/streaming/KafkaConsumerTest.java | 8 +-
.../kylin/streaming/KafkaRequesterTest.java | 11 +-
.../kylin/streaming/Nous/NousMessageTest.java | 4 +-
.../kylin/streaming/OneOffStreamProducer.java | 3 +-
.../kylin/streaming/StreamManagerTest.java | 69 +++++++
.../invertedindex/IIStreamBuilderTest.java | 41 -----
.../invertedindex/PrintOutStreamBuilder.java | 67 +++++++
.../kafka_streaming_test/kafka.properties | 10 -
75 files changed, 2250 insertions(+), 731 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7f73abe5/common/src/main/java/org/apache/kylin/common/util/ByteArray.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7f73abe5/dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
----------------------------------------------------------------------
diff --cc dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
index a931359,a931359..815b06d
--- a/dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
@@@ -73,15 -73,15 +73,16 @@@ abstract public class Dictionary<T> imp
}
/**
-- * Returns the ID integer of given value. In case of not found - if
-- * roundingFlag=0, throw IllegalArgumentException; - if roundingFlag<0, the
-- * closest smaller ID integer if exist; - if roundingFlag>0, the closest
-- * bigger ID integer if exist. The implementation often has cache, thus
-- * faster than the byte[] version getIdFromValueBytes()
++ * Returns the ID integer of given value. In case of not found
++ * - if roundingFlag=0, throw IllegalArgumentException;
++ * - if roundingFlag<0, the closest smaller ID integer if exist;
++ * - if roundingFlag>0, the closest bigger ID integer if exist.
++ *
++ * The implementation often has cache, thus faster than the byte[] version getIdFromValueBytes()
*
* @throws IllegalArgumentException
-- * if value is not found in dictionary and rounding is off or
-- * failed
++ * if value is not found in dictionary and rounding is off;
++ * or if rounding cannot find a smaller or bigger ID
*/
final public int getIdFromValue(T value, int roundingFlag) {
if (isNullObjectForm(value))
@@@ -119,16 -119,16 +120,16 @@@
}
/**
-- * A lower level API, return ID integer from raw value bytes. In case of not
-- * found - if roundingFlag=0, throw IllegalArgumentException; - if
-- * roundingFlag<0, the closest smaller ID integer if exist; - if
-- * roundingFlag>0, the closest bigger ID integer if exist. Bypassing the
-- * cache layer, this could be significantly slower than getIdFromValue(T
-- * value).
++ * A lower level API, return ID integer from raw value bytes. In case of not found
++ * - if roundingFlag=0, throw IllegalArgumentException;
++ * - if roundingFlag<0, the closest smaller ID integer if exist;
++ * - if roundingFlag>0, the closest bigger ID integer if exist.
++ *
++ * Bypassing the cache layer, this could be significantly slower than getIdFromValue(T value).
*
* @throws IllegalArgumentException
-- * if value is not found in dictionary and rounding is off or
-- * failed
++ * if value is not found in dictionary and rounding is off;
++ * or if rounding cannot find a smaller or bigger ID
*/
final public int getIdFromValueBytes(byte[] value, int offset, int len, int roundingFlag) {
if (isNullByteForm(value, offset, len))
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7f73abe5/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
----------------------------------------------------------------------
diff --cc dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
index ef845ce,ef845ce..bf40eac
--- a/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
@@@ -185,9 -185,9 +185,9 @@@ public class TrieDictionary<T> extends
* @param inpEnd
* -- end of input
* @param roundingFlag
-- * -- =0: return -1 if not found -- <0: return closest smaller if
-- * not found, might be -1 -- >0: return closest bigger if not
-- * found, might be nValues
++ * -- =0: return -1 if not found
++ * -- <0: return closest smaller if not found, return -1
++ * -- >0: return closest bigger if not found, return nValues
*/
private int lookupSeqNoFromValue(int n, byte[] inp, int o, int inpEnd, int roundingFlag) {
if (inp.length == 0) // special 'empty' value
@@@ -199,11 -199,11 +199,8 @@@
// match the current node, note [0] of node's value has been matched
// when this node is selected by its parent
int p = n + firstByteOffset; // start of node's value
-- int end = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1); // end of
-- // node's
-- // value
-- for (p++; p < end && o < inpEnd; p++, o++) { // note matching start
-- // from [1]
++ int end = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1); // end of node's value
++ for (p++; p < end && o < inpEnd; p++, o++) { // note matching start from [1]
if (trieBytes[p] != inp[o]) {
int comp = BytesUtil.compareByteUnsigned(trieBytes[p], inp[o]);
if (comp < 0) {
@@@ -216,9 -216,9 +213,7 @@@
// node completely matched, is input all consumed?
boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
if (o == inpEnd) {
-- return p == end && isEndOfValue ? seq : roundSeqNo(roundingFlag, seq - 1, -1, seq); // input
-- // all
-- // matched
++ return p == end && isEndOfValue ? seq : roundSeqNo(roundingFlag, seq - 1, -1, seq); // input all matched
}
if (isEndOfValue)
seq++;
@@@ -226,9 -226,9 +221,7 @@@
// find a child to continue
int c = headSize + (BytesUtil.readUnsigned(trieBytes, n, sizeChildOffset) & childOffsetMask);
if (c == headSize) // has no children
-- return roundSeqNo(roundingFlag, seq - 1, -1, seq); // input only
-- // partially
-- // matched
++ return roundSeqNo(roundingFlag, seq - 1, -1, seq); // input only partially matched
byte inpByte = inp[o];
int comp;
while (true) {
@@@ -242,26 -242,26 +235,10 @@@
} else if (comp < 0) { // try next child
seq += BytesUtil.readUnsigned(trieBytes, c + sizeChildOffset, sizeNoValuesBeneath);
if (checkFlag(c, BIT_IS_LAST_CHILD))
-- return roundSeqNo(roundingFlag, seq - 1, -1, seq); // no
-- // child
-- // can
-- // match
-- // the
-- // next
-- // byte
-- // of
-- // input
++ return roundSeqNo(roundingFlag, seq - 1, -1, seq); // no child can match the next byte of input
c = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1);
} else { // children are ordered by their first value byte
-- return roundSeqNo(roundingFlag, seq - 1, -1, seq); // no
-- // child
-- // can
-- // match
-- // the
-- // next
-- // byte
-- // of
-- // input
++ return roundSeqNo(roundingFlag, seq - 1, -1, seq); // no child can match the next byte of input
}
}
}
@@@ -279,9 -279,9 +256,7 @@@
@Override
final protected T getValueFromIdImpl(int id) {
if (enableCache) {
-- Object[] cache = idToValueCache.get(); // SoftReference to skip
-- // cache gracefully when
-- // short of memory
++ Object[] cache = idToValueCache.get(); // SoftReference to skip cache gracefully when short of memory
if (cache != null) {
int seq = calcSeqNoFromId(id);
if (seq < 0 || seq >= nValues)
@@@ -347,8 -347,8 +322,7 @@@
int nValuesBeneath;
while (true) {
nValuesBeneath = BytesUtil.readUnsigned(trieBytes, c + sizeChildOffset, sizeNoValuesBeneath);
-- if (seq - nValuesBeneath < 0) { // value is under this child,
-- // reset n and loop again
++ if (seq - nValuesBeneath < 0) { // value is under this child, reset n and loop again
n = c;
break;
} else { // go to next child
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7f73abe5/dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
----------------------------------------------------------------------
diff --cc dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
index e6d2ee9,e6d2ee9..f9af244
--- a/dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
+++ b/dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
@@@ -122,7 -122,7 +122,7 @@@ public class NumberDictionaryTest
}
// test rounding
-- for (int i = 0; i < n; i++) {
++ for (int i = 0; i < n * 50; i++) {
String randStr = randNumber();
BigDecimal rand = new BigDecimal(randStr);
int binarySearch = Collections.binarySearch(sorted, rand);
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7f73abe5/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/BuildDictionaryMapper.java
----------------------------------------------------------------------
diff --cc job/src/main/java/org/apache/kylin/job/hadoop/cubev2/BuildDictionaryMapper.java
index 4d66186,0000000..a2c2c3b
mode 100644,000000..100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/BuildDictionaryMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/BuildDictionaryMapper.java
@@@ -1,184 -1,0 +1,184 @@@
+package org.apache.kylin.job.hadoop.cubev2;
+
+import com.google.common.base.Function;
+import com.google.common.collect.*;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hive.hcatalog.data.HCatRecord;
+import org.apache.hive.hcatalog.data.schema.HCatSchema;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.mr.KylinMapper;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.cuboid.CuboidScheduler;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.dict.DictionaryGenerator;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryInfoSerializer;
+import org.apache.kylin.dict.lookup.HiveTableReader;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.job.hadoop.AbstractHadoopJob;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Created by shaoshi on 3/24/15.
+ */
+public class BuildDictionaryMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Text, Text> {
+
+ private static final Logger logger = LoggerFactory.getLogger(BuildDictionaryMapper.class);
+ private String cubeName;
+ private CubeInstance cube;
+ private CubeSegment cubeSegment;
+ private CubeDesc cubeDesc;
+
+ private HCatSchema schema = null;
+ private HyperLogLogPlusCounter hll;
+
+
+ private Text outputKey = new Text();
+ private Text outputValue = new Text();
+ private List<TblColRef> dimColumns;
+ private SetMultimap<Integer, String> columnDistinctValueMap;
+ private CuboidScheduler cuboidScheduler = null;
+ private CubeJoinedFlatTableDesc intermediateTableDesc;
+ private long baseCuboidId;
+ private List<String> rowKeyValues = null;
+ private int nRowKey;
+
+ @Override
+ protected void setup(Context context) throws IOException {
+ super.publishConfiguration(context.getConfiguration());
+
+ Configuration conf = context.getConfiguration();
+
- KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
++ KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+ cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+ cube = CubeManager.getInstance(config).getCube(cubeName);
+ String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
+ cubeDesc = cube.getDescriptor();
+ cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
+ dimColumns = cubeDesc.listDimensionColumnsExcludingDerived();
+ hll = new HyperLogLogPlusCounter(16);
+ columnDistinctValueMap = HashMultimap.create(); // key is col, value is a set of string values
+ cuboidScheduler = new CuboidScheduler(cubeDesc);
+ intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
+ baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
+ nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
+
+ rowKeyValues = Lists.newArrayList();
+ }
+
+ @Override
+ public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
+ String[] row = HiveTableReader.getRowAsStringArray(record);
+ buildDictAndCount(row);
+ }
+
+ protected void buildDictAndCount(String[] row) {
+ for (int i = 0; i < intermediateTableDesc.getRowKeyColumnIndexes().length; i++) {
+ columnDistinctValueMap.put(i, row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]);
+ }
+
+ putRowKeyToHLL(row, baseCuboidId); // recursively put all possible row keys to hll
+ }
+
+ protected void cleanup(Mapper.Context context) throws IOException, InterruptedException {
+ Map<Integer, DictionaryInfo> dictionaries = buildDictionary();
+
+ DictionaryInfoSerializer dictionaryInfoSerializer = new DictionaryInfoSerializer();
+ Cuboid baseCuboid = Cuboid.findById(cubeDesc, this.baseCuboidId);
+ byte[] keyBuf;
+ // output dictionary to reducer, key is the index of the col on row key;
+ for (Integer rowKeyIndex : dictionaries.keySet()) {
+ keyBuf = Bytes.toBytes(rowKeyIndex);
+ outputKey.set(keyBuf);
+
+ //serialize the dictionary to bytes;
+ ByteArrayOutputStream buf = new ByteArrayOutputStream();
+ DataOutputStream dout = new DataOutputStream(buf);
+ dictionaryInfoSerializer.serialize(dictionaries.get(rowKeyIndex), dout);
+ dout.close();
+ buf.close();
+ byte[] dictionaryBytes = buf.toByteArray();
+ outputValue.set(dictionaryBytes);
+
+ context.write(outputKey, outputValue);
+ }
+
+ // output hll to reducer, key is -1
+ keyBuf = Bytes.toBytes(-1);
+ outputKey.set(keyBuf);
+ ByteBuffer hllBuf = ByteBuffer.allocate(1024 * 1024);
+ hll.writeRegisters(hllBuf);
+ outputValue.set(hllBuf.array());
+ outputKey.set(keyBuf, 0, keyBuf.length);
+ context.write(outputKey, outputValue);
+ }
+
+ private void putRowKeyToHLL(String[] row, long cuboidId) {
+ rowKeyValues.clear();
+ long mask = Long.highestOneBit(baseCuboidId);
+ // int actualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId);
+ for (int i = 0; i < nRowKey; i++) {
+ if ((mask & cuboidId) == 1) {
+ rowKeyValues.add(row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]);
+ }
+ mask = mask >> 1;
+ }
+
+ String key = StringUtils.join(rowKeyValues, ",");
+ hll.add(key);
+
+ Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
+ for (Long childId : children) {
+ putRowKeyToHLL(row, childId);
+ }
+
+ }
+
+ private Map<Integer, DictionaryInfo> buildDictionary() {
+ Map<Integer, DictionaryInfo> dictionaryMap = Maps.newHashMap();
+ for (int i = 0; i < intermediateTableDesc.getRowKeyColumnIndexes().length; i++) {
+ // dictionary
+ if (cubeDesc.getRowkey().isUseDictionary(i)) {
+ TblColRef col = cubeDesc.getRowkey().getRowKeyColumns()[i].getColRef();
+ Dictionary dict = DictionaryGenerator.buildDictionaryFromValueList(col.getType(), Collections2.transform(columnDistinctValueMap.get(i), new Function<String, byte[]>() {
+ @Nullable
+ @Override
+ public byte[] apply(String input) {
+ return input.getBytes();
+ }
+ }));
+
+ logger.info("Building dictionary for " + col);
+ DictionaryInfo dictInfo = new DictionaryInfo(col.getTable(), col.getName(), 0, col.getDatatype(), null, "");
+ dictInfo.setDictionaryObject(dict);
+ dictInfo.setDictionaryClass(dict.getClass().getName());
+ dictionaryMap.put(i, dictInfo);
+ }
+ }
+
+ return dictionaryMap;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7f73abe5/storage/src/main/java/org/apache/kylin/storage/gridtable/GTDictionaryCodeSystem.java
----------------------------------------------------------------------
diff --cc storage/src/main/java/org/apache/kylin/storage/gridtable/GTDictionaryCodeSystem.java
index 03c7541,45b5d5f..6f2d9ce
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTDictionaryCodeSystem.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTDictionaryCodeSystem.java
@@@ -15,12 -13,11 +13,12 @@@ import java.util.Map
/**
* Created by shaoshi on 3/23/15.
*/
++@SuppressWarnings({ "rawtypes", "unchecked" })
public class GTDictionaryCodeSystem implements IGTCodeSystem {
private GTInfo info;
- private BitSet encodedColumns = null;
private Map<Integer, Dictionary> dictionaryMaps = null; // key: column index; value: dictionary for this column;
- private Map<Integer, DataTypeSerializer> serializerMap = null; // column index; value: serializer for this column;
private IFilterCodeSystem<ByteArray> filterCS;
+ private DataTypeSerializer[] serializers;
public GTDictionaryCodeSystem(Map<Integer, Dictionary> dictionaryMaps) {
this.dictionaryMaps = dictionaryMaps;
@@@ -95,12 -87,7 +88,7 @@@
@Override
public Object decodeColumnValue(int col, ByteBuffer buf) {
- if (useDictionary(col)) {
- int id = BytesUtil.readUnsigned(buf, dictionaryMaps.get(col).getSizeOfId());
- return dictionaryMaps.get(col).getValueFromId(id);
- } else {
- return serializerMap.get(col).deserialize(buf);
- }
- return serializers[col].deserialize(buf);
++ return serializers[col].deserialize(buf);
}
@Override