You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/03/28 01:04:57 UTC

[20/50] incubator-kylin git commit: Merge branch 'streaming-localdict' of https://github.com/KylinOLAP/Kylin into streaming-localdict

Merge branch 'streaming-localdict' of https://github.com/KylinOLAP/Kylin into streaming-localdict

Conflicts:
	invertedindex/src/main/java/org/apache/kylin/invertedindex/index/BatchSliceBuilder.java
	invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
	job/src/main/java/org/apache/kylin/job/hadoop/cubev2/InMemCuboidMapper.java
	job/src/test/java/org/apache/kylin/job/BuildCubeWithStreamTest.java
	storage/src/main/java/org/apache/kylin/storage/gridtable/GTDictionaryCodeSystem.java
	storage/src/main/java/org/apache/kylin/storage/gridtable/GTRecord.java
	streaming/src/main/java/org/apache/kylin/streaming/cube/CubeStreamBuilder.java
	streaming/src/main/java/org/apache/kylin/streaming/invertedindex/IIStreamBuilder.java
	streaming/src/test/java/org/apache/kylin/streaming/KafkaConsumerTest.java


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/7f73abe5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/7f73abe5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/7f73abe5

Branch: refs/heads/streaming-localdict
Commit: 7f73abe5c53fc165ff01b920850fe4caf8ab9e0d
Parents: 959d031 7088724
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Mar 27 11:39:24 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Mar 27 11:39:24 2015 +0800

----------------------------------------------------------------------
 .../common/hll/HyperLogLogPlusCounter.java      |  29 ++-
 .../kylin/common/persistence/ResourceStore.java |   1 +
 .../org/apache/kylin/common/util/ByteArray.java |   2 +-
 .../org/apache/kylin/common/util/BytesUtil.java |  16 +-
 .../apache/kylin/common/util/BytesUtilTest.java |  20 ++
 .../java/org/apache/kylin/dict/Dictionary.java  |  31 ++--
 .../org/apache/kylin/dict/TrieDictionary.java   |  48 ++---
 .../apache/kylin/dict/NumberDictionaryTest.java |   2 +-
 .../metadata/model_desc/kylin_sales_model.json  |  17 ++
 .../localmeta/streaming/kafka_test.json         |  15 ++
 .../apache/kylin/invertedindex/IIInstance.java  |  12 ++
 .../apache/kylin/invertedindex/IIManager.java   |  17 +-
 .../invertedindex/index/BatchSliceBuilder.java  |   8 +-
 .../model/IIJoinedFlatTableDesc.java            |  12 +-
 .../invertedindex/model/IIKeyValueCodec.java    |  91 +++++----
 .../model/IIKeyValueCodecWithState.java         |  68 +++++++
 .../apache/kylin/invertedindex/model/IIRow.java |  13 ++
 .../org/apache/kylin/job/JoinedFlatTable.java   |   1 -
 .../kylin/job/constant/BatchConstants.java      |   5 +
 .../kylin/job/constant/ExecutableConstants.java |   1 +
 .../apache/kylin/job/cube/CubingJobBuilder.java |  85 ++++++---
 .../kylin/job/hadoop/AbstractHadoopJob.java     |   5 +-
 .../kylin/job/hadoop/cube/BaseCuboidMapper.java |   2 +-
 .../kylin/job/hadoop/cube/CubeHFileMapper.java  |   2 +-
 .../kylin/job/hadoop/cube/CuboidReducer.java    |   2 +-
 .../cube/FactDistinctColumnsCombiner.java       |  26 ++-
 .../job/hadoop/cube/FactDistinctColumnsJob.java |  14 +-
 .../hadoop/cube/FactDistinctColumnsMapper.java  | 139 --------------
 .../cube/FactDistinctColumnsMapperBase.java     |  81 ++++++++
 .../hadoop/cube/FactDistinctColumnsReducer.java | 143 ++++++++++++---
 .../cube/FactDistinctHiveColumnsMapper.java     | 148 +++++++++++++++
 .../cube/FactDistinctIIColumnsMapper.java       | 129 +++++++++++++
 .../job/hadoop/cube/MergeCuboidMapper.java      |   2 +-
 .../kylin/job/hadoop/cube/NDCuboidMapper.java   |   2 +-
 .../job/hadoop/cube/NewBaseCuboidMapper.java    |   2 +-
 .../hadoop/cubev2/BuildDictionaryMapper.java    |   2 +-
 .../kylin/job/hadoop/cubev2/InMemCuboidJob.java | 183 +++++++++++++++++++
 .../job/hadoop/cubev2/InMemCuboidMapper.java    | 163 ++++++++++-------
 .../job/hadoop/cubev2/InMemCuboidReducer.java   |  82 +++++++++
 .../invertedindex/InvertedIndexMapper.java      |   2 +-
 .../invertedindex/InvertedIndexPartitioner.java |   2 +-
 .../invertedindex/InvertedIndexReducer.java     |   2 +-
 .../kylin/job/streaming/StreamingBootstrap.java | 117 ++++++++++++
 .../kylin/job/streaming/StreamingCLI.java       |  71 +++++++
 .../kylin/job/BuildCubeWithStreamTest.java      |   4 +-
 .../apache/kylin/job/IIStreamBuilderTest.java   |  80 ++++++++
 .../kylin/metadata/model/DimensionDesc.java     |  12 +-
 .../metadata/model/IJoinedFlatTableDesc.java    |   2 -
 .../metadata/model/IntermediateColumnDesc.java  |   4 +
 pom.xml                                         |   1 +
 .../gridtable/GTDictionaryCodeSystem.java       |  72 +++++---
 .../endpoint/HbaseServerKVIterator.java         |   9 +-
 streaming/pom.xml                               |   8 +
 .../apache/kylin/streaming/BrokerConfig.java    |  78 ++++++++
 .../kylin/streaming/JsonStreamParser.java       |  73 ++++++++
 .../org/apache/kylin/streaming/KafkaConfig.java |  99 +++++-----
 .../apache/kylin/streaming/KafkaConsumer.java   |  22 +--
 .../apache/kylin/streaming/KafkaRequester.java  | 128 +++++++------
 .../apache/kylin/streaming/StreamBuilder.java   |   9 +
 .../apache/kylin/streaming/StreamManager.java   | 114 ++++++++++++
 .../apache/kylin/streaming/StreamParser.java    |  47 +++++
 .../kylin/streaming/StringStreamParser.java     |  55 ++++++
 .../kylin/streaming/cube/CubeStreamBuilder.java |  37 ++--
 .../invertedindex/IIStreamBuilder.java          |   6 +-
 .../kylin/streaming/EternalStreamProducer.java  |   5 +-
 .../apache/kylin/streaming/KafkaBaseTest.java   |  23 ---
 .../apache/kylin/streaming/KafkaConfigTest.java |  65 -------
 .../kylin/streaming/KafkaConsumerTest.java      |   8 +-
 .../kylin/streaming/KafkaRequesterTest.java     |  11 +-
 .../kylin/streaming/Nous/NousMessageTest.java   |   4 +-
 .../kylin/streaming/OneOffStreamProducer.java   |   3 +-
 .../kylin/streaming/StreamManagerTest.java      |  69 +++++++
 .../invertedindex/IIStreamBuilderTest.java      |  41 -----
 .../invertedindex/PrintOutStreamBuilder.java    |  67 +++++++
 .../kafka_streaming_test/kafka.properties       |  10 -
 75 files changed, 2250 insertions(+), 731 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7f73abe5/common/src/main/java/org/apache/kylin/common/util/ByteArray.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7f73abe5/dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
----------------------------------------------------------------------
diff --cc dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
index a931359,a931359..815b06d
--- a/dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
@@@ -73,15 -73,15 +73,16 @@@ abstract public class Dictionary<T> imp
      }
  
      /**
--     * Returns the ID integer of given value. In case of not found - if
--     * roundingFlag=0, throw IllegalArgumentException; - if roundingFlag<0, the
--     * closest smaller ID integer if exist; - if roundingFlag>0, the closest
--     * bigger ID integer if exist. The implementation often has cache, thus
--     * faster than the byte[] version getIdFromValueBytes()
++     * Returns the ID integer of given value. In case of not found
++     * - if roundingFlag=0, throw IllegalArgumentException;
++     * - if roundingFlag<0, the closest smaller ID integer if exist;
++     * - if roundingFlag>0, the closest bigger ID integer if exist.
++     * 
++     * The implementation often has cache, thus faster than the byte[] version getIdFromValueBytes()
       * 
       * @throws IllegalArgumentException
--     *             if value is not found in dictionary and rounding is off or
--     *             failed
++     *             if value is not found in dictionary and rounding is off;
++     *             or if rounding cannot find a smaller or bigger ID
       */
      final public int getIdFromValue(T value, int roundingFlag) {
          if (isNullObjectForm(value))
@@@ -119,16 -119,16 +120,16 @@@
      }
  
      /**
--     * A lower level API, return ID integer from raw value bytes. In case of not
--     * found - if roundingFlag=0, throw IllegalArgumentException; - if
--     * roundingFlag<0, the closest smaller ID integer if exist; - if
--     * roundingFlag>0, the closest bigger ID integer if exist. Bypassing the
--     * cache layer, this could be significantly slower than getIdFromValue(T
--     * value).
++     * A lower level API, return ID integer from raw value bytes. In case of not found 
++     * - if roundingFlag=0, throw IllegalArgumentException; 
++     * - if roundingFlag<0, the closest smaller ID integer if exist; 
++     * - if roundingFlag>0, the closest bigger ID integer if exist.
++     * 
++     * Bypassing the cache layer, this could be significantly slower than getIdFromValue(T value).
       * 
       * @throws IllegalArgumentException
--     *             if value is not found in dictionary and rounding is off or
--     *             failed
++     *             if value is not found in dictionary and rounding is off;
++     *             or if rounding cannot find a smaller or bigger ID
       */
      final public int getIdFromValueBytes(byte[] value, int offset, int len, int roundingFlag) {
          if (isNullByteForm(value, offset, len))

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7f73abe5/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
----------------------------------------------------------------------
diff --cc dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
index ef845ce,ef845ce..bf40eac
--- a/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
+++ b/dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
@@@ -185,9 -185,9 +185,9 @@@ public class TrieDictionary<T> extends 
       * @param inpEnd
       *            -- end of input
       * @param roundingFlag
--     *            -- =0: return -1 if not found -- <0: return closest smaller if
--     *            not found, might be -1 -- >0: return closest bigger if not
--     *            found, might be nValues
++     *            -- =0: return -1 if not found
++     *            -- <0: return closest smaller if not found, return -1
++     *            -- >0: return closest bigger if not found, return nValues
       */
      private int lookupSeqNoFromValue(int n, byte[] inp, int o, int inpEnd, int roundingFlag) {
          if (inp.length == 0) // special 'empty' value
@@@ -199,11 -199,11 +199,8 @@@
              // match the current node, note [0] of node's value has been matched
              // when this node is selected by its parent
              int p = n + firstByteOffset; // start of node's value
--            int end = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1); // end of
--                                                                       // node's
--                                                                       // value
--            for (p++; p < end && o < inpEnd; p++, o++) { // note matching start
--                                                         // from [1]
++            int end = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1); // end of node's value
++            for (p++; p < end && o < inpEnd; p++, o++) { // note matching start from [1]
                  if (trieBytes[p] != inp[o]) {
                      int comp = BytesUtil.compareByteUnsigned(trieBytes[p], inp[o]);
                      if (comp < 0) {
@@@ -216,9 -216,9 +213,7 @@@
              // node completely matched, is input all consumed?
              boolean isEndOfValue = checkFlag(n, BIT_IS_END_OF_VALUE);
              if (o == inpEnd) {
--                return p == end && isEndOfValue ? seq : roundSeqNo(roundingFlag, seq - 1, -1, seq); // input
--                                                                                                    // all
--                                                                                                    // matched
++                return p == end && isEndOfValue ? seq : roundSeqNo(roundingFlag, seq - 1, -1, seq); // input all matched
              }
              if (isEndOfValue)
                  seq++;
@@@ -226,9 -226,9 +221,7 @@@
              // find a child to continue
              int c = headSize + (BytesUtil.readUnsigned(trieBytes, n, sizeChildOffset) & childOffsetMask);
              if (c == headSize) // has no children
--                return roundSeqNo(roundingFlag, seq - 1, -1, seq); // input only
--                                                                   // partially
--                                                                   // matched
++                return roundSeqNo(roundingFlag, seq - 1, -1, seq); // input only partially matched
              byte inpByte = inp[o];
              int comp;
              while (true) {
@@@ -242,26 -242,26 +235,10 @@@
                  } else if (comp < 0) { // try next child
                      seq += BytesUtil.readUnsigned(trieBytes, c + sizeChildOffset, sizeNoValuesBeneath);
                      if (checkFlag(c, BIT_IS_LAST_CHILD))
--                        return roundSeqNo(roundingFlag, seq - 1, -1, seq); // no
--                                                                           // child
--                                                                           // can
--                                                                           // match
--                                                                           // the
--                                                                           // next
--                                                                           // byte
--                                                                           // of
--                                                                           // input
++                        return roundSeqNo(roundingFlag, seq - 1, -1, seq); // no child can match the next byte of input
                      c = p + BytesUtil.readUnsigned(trieBytes, p - 1, 1);
                  } else { // children are ordered by their first value byte
--                    return roundSeqNo(roundingFlag, seq - 1, -1, seq); // no
--                                                                       // child
--                                                                       // can
--                                                                       // match
--                                                                       // the
--                                                                       // next
--                                                                       // byte
--                                                                       // of
--                                                                       // input
++                    return roundSeqNo(roundingFlag, seq - 1, -1, seq); // no child can match the next byte of input
                  }
              }
          }
@@@ -279,9 -279,9 +256,7 @@@
      @Override
      final protected T getValueFromIdImpl(int id) {
          if (enableCache) {
--            Object[] cache = idToValueCache.get(); // SoftReference to skip
--                                                   // cache gracefully when
--                                                   // short of memory
++            Object[] cache = idToValueCache.get(); // SoftReference to skip cache gracefully when short of memory
              if (cache != null) {
                  int seq = calcSeqNoFromId(id);
                  if (seq < 0 || seq >= nValues)
@@@ -347,8 -347,8 +322,7 @@@
              int nValuesBeneath;
              while (true) {
                  nValuesBeneath = BytesUtil.readUnsigned(trieBytes, c + sizeChildOffset, sizeNoValuesBeneath);
--                if (seq - nValuesBeneath < 0) { // value is under this child,
--                                                // reset n and loop again
++                if (seq - nValuesBeneath < 0) { // value is under this child, reset n and loop again
                      n = c;
                      break;
                  } else { // go to next child

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7f73abe5/dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
----------------------------------------------------------------------
diff --cc dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
index e6d2ee9,e6d2ee9..f9af244
--- a/dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
+++ b/dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
@@@ -122,7 -122,7 +122,7 @@@ public class NumberDictionaryTest 
          }
  
          // test rounding
--        for (int i = 0; i < n; i++) {
++        for (int i = 0; i < n * 50; i++) {
              String randStr = randNumber();
              BigDecimal rand = new BigDecimal(randStr);
              int binarySearch = Collections.binarySearch(sorted, rand);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7f73abe5/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/BuildDictionaryMapper.java
----------------------------------------------------------------------
diff --cc job/src/main/java/org/apache/kylin/job/hadoop/cubev2/BuildDictionaryMapper.java
index 4d66186,0000000..a2c2c3b
mode 100644,000000..100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/BuildDictionaryMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cubev2/BuildDictionaryMapper.java
@@@ -1,184 -1,0 +1,184 @@@
 +package org.apache.kylin.job.hadoop.cubev2;
 +
 +import com.google.common.base.Function;
 +import com.google.common.collect.*;
 +import org.apache.commons.lang3.StringUtils;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.hbase.util.Bytes;
 +import org.apache.hadoop.io.Text;
 +import org.apache.hadoop.mapreduce.Mapper;
 +import org.apache.hive.hcatalog.data.HCatRecord;
 +import org.apache.hive.hcatalog.data.schema.HCatSchema;
 +import org.apache.kylin.common.KylinConfig;
 +import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 +import org.apache.kylin.common.mr.KylinMapper;
 +import org.apache.kylin.cube.CubeInstance;
 +import org.apache.kylin.cube.CubeManager;
 +import org.apache.kylin.cube.CubeSegment;
 +import org.apache.kylin.cube.cuboid.Cuboid;
 +import org.apache.kylin.cube.cuboid.CuboidScheduler;
 +import org.apache.kylin.cube.model.CubeDesc;
 +import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 +import org.apache.kylin.dict.Dictionary;
 +import org.apache.kylin.dict.DictionaryGenerator;
 +import org.apache.kylin.dict.DictionaryInfo;
 +import org.apache.kylin.dict.DictionaryInfoSerializer;
 +import org.apache.kylin.dict.lookup.HiveTableReader;
 +import org.apache.kylin.job.constant.BatchConstants;
 +import org.apache.kylin.job.hadoop.AbstractHadoopJob;
 +import org.apache.kylin.metadata.model.SegmentStatusEnum;
 +import org.apache.kylin.metadata.model.TblColRef;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import javax.annotation.Nullable;
 +import java.io.ByteArrayOutputStream;
 +import java.io.DataOutputStream;
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.Collection;
 +import java.util.List;
 +import java.util.Map;
 +
 +/**
 + * Created by shaoshi on 3/24/15.
 + */
 +public class BuildDictionaryMapper<KEYIN> extends KylinMapper<KEYIN, HCatRecord, Text, Text> {
 +
 +    private static final Logger logger = LoggerFactory.getLogger(BuildDictionaryMapper.class);
 +    private String cubeName;
 +    private CubeInstance cube;
 +    private CubeSegment cubeSegment;
 +    private CubeDesc cubeDesc;
 +
 +    private HCatSchema schema = null;
 +    private HyperLogLogPlusCounter hll;
 +
 +
 +    private Text outputKey = new Text();
 +    private Text outputValue = new Text();
 +    private List<TblColRef> dimColumns;
 +    private SetMultimap<Integer, String> columnDistinctValueMap;
 +    private CuboidScheduler cuboidScheduler = null;
 +    private CubeJoinedFlatTableDesc intermediateTableDesc;
 +    private long baseCuboidId;
 +    private List<String> rowKeyValues = null;
 +    private int nRowKey;
 +
 +    @Override
 +    protected void setup(Context context) throws IOException {
 +        super.publishConfiguration(context.getConfiguration());
 +
 +        Configuration conf = context.getConfiguration();
 +
-         KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata(conf);
++        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
 +        cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
 +        cube = CubeManager.getInstance(config).getCube(cubeName);
 +        String segmentName = context.getConfiguration().get(BatchConstants.CFG_CUBE_SEGMENT_NAME).toUpperCase();
 +        cubeDesc = cube.getDescriptor();
 +        cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
 +        dimColumns = cubeDesc.listDimensionColumnsExcludingDerived();
 +        hll = new HyperLogLogPlusCounter(16);
 +        columnDistinctValueMap = HashMultimap.create(); // key is col, value is a set of string values
 +        cuboidScheduler = new CuboidScheduler(cubeDesc);
 +        intermediateTableDesc = new CubeJoinedFlatTableDesc(cube.getDescriptor(), cubeSegment);
 +        baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
 +        nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length;
 +
 +        rowKeyValues = Lists.newArrayList();
 +    }
 +
 +    @Override
 +    public void map(KEYIN key, HCatRecord record, Context context) throws IOException, InterruptedException {
 +        String[] row = HiveTableReader.getRowAsStringArray(record);
 +        buildDictAndCount(row);
 +    }
 +
 +    protected void buildDictAndCount(String[] row) {
 +        for (int i = 0; i < intermediateTableDesc.getRowKeyColumnIndexes().length; i++) {
 +            columnDistinctValueMap.put(i, row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]);
 +        }
 +
 +        putRowKeyToHLL(row, baseCuboidId); // recursively put all possible row keys to hll
 +    }
 +
 +    protected void cleanup(Mapper.Context context) throws IOException, InterruptedException {
 +        Map<Integer, DictionaryInfo> dictionaries = buildDictionary();
 +
 +        DictionaryInfoSerializer dictionaryInfoSerializer = new DictionaryInfoSerializer();
 +        Cuboid baseCuboid = Cuboid.findById(cubeDesc, this.baseCuboidId);
 +        byte[] keyBuf;
 +        // output dictionary to reducer, key is the index of the col on row key;
 +        for (Integer rowKeyIndex : dictionaries.keySet()) {
 +            keyBuf = Bytes.toBytes(rowKeyIndex);
 +            outputKey.set(keyBuf);
 +
 +            //serialize the dictionary to bytes;
 +            ByteArrayOutputStream buf = new ByteArrayOutputStream();
 +            DataOutputStream dout = new DataOutputStream(buf);
 +            dictionaryInfoSerializer.serialize(dictionaries.get(rowKeyIndex), dout);
 +            dout.close();
 +            buf.close();
 +            byte[] dictionaryBytes = buf.toByteArray();
 +            outputValue.set(dictionaryBytes);
 +
 +            context.write(outputKey, outputValue);
 +        }
 +
 +        // output hll to reducer, key is -1
 +        keyBuf = Bytes.toBytes(-1);
 +        outputKey.set(keyBuf);
 +        ByteBuffer hllBuf = ByteBuffer.allocate(1024 * 1024);
 +        hll.writeRegisters(hllBuf);
 +        outputValue.set(hllBuf.array());
 +        outputKey.set(keyBuf, 0, keyBuf.length);
 +        context.write(outputKey, outputValue);
 +    }
 +
 +    private void putRowKeyToHLL(String[] row, long cuboidId) {
 +        rowKeyValues.clear();
 +        long mask = Long.highestOneBit(baseCuboidId);
 +        // int actualLength = Long.SIZE - Long.numberOfLeadingZeros(baseCuboidId);
 +        for (int i = 0; i < nRowKey; i++) {
 +            if ((mask & cuboidId) == 1) {
 +                rowKeyValues.add(row[intermediateTableDesc.getRowKeyColumnIndexes()[i]]);
 +            }
 +            mask = mask >> 1;
 +        }
 +
 +        String key = StringUtils.join(rowKeyValues, ",");
 +        hll.add(key);
 +
 +        Collection<Long> children = cuboidScheduler.getSpanningCuboid(cuboidId);
 +        for (Long childId : children) {
 +            putRowKeyToHLL(row, childId);
 +        }
 +
 +    }
 +
 +    private Map<Integer, DictionaryInfo> buildDictionary() {
 +        Map<Integer, DictionaryInfo> dictionaryMap = Maps.newHashMap();
 +        for (int i = 0; i < intermediateTableDesc.getRowKeyColumnIndexes().length; i++) {
 +            // dictionary
 +            if (cubeDesc.getRowkey().isUseDictionary(i)) {
 +                TblColRef col = cubeDesc.getRowkey().getRowKeyColumns()[i].getColRef();
 +                Dictionary dict = DictionaryGenerator.buildDictionaryFromValueList(col.getType(), Collections2.transform(columnDistinctValueMap.get(i), new Function<String, byte[]>() {
 +                    @Nullable
 +                    @Override
 +                    public byte[] apply(String input) {
 +                        return input.getBytes();
 +                    }
 +                }));
 +
 +                logger.info("Building dictionary for " + col);
 +                DictionaryInfo dictInfo = new DictionaryInfo(col.getTable(), col.getName(), 0, col.getDatatype(), null, "");
 +                dictInfo.setDictionaryObject(dict);
 +                dictInfo.setDictionaryClass(dict.getClass().getName());
 +                dictionaryMap.put(i, dictInfo);
 +            }
 +        }
 +
 +        return dictionaryMap;
 +    }
 +
 +}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7f73abe5/storage/src/main/java/org/apache/kylin/storage/gridtable/GTDictionaryCodeSystem.java
----------------------------------------------------------------------
diff --cc storage/src/main/java/org/apache/kylin/storage/gridtable/GTDictionaryCodeSystem.java
index 03c7541,45b5d5f..6f2d9ce
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTDictionaryCodeSystem.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTDictionaryCodeSystem.java
@@@ -15,12 -13,11 +13,12 @@@ import java.util.Map
  /**
   * Created by shaoshi on 3/23/15.
   */
++@SuppressWarnings({ "rawtypes", "unchecked" })
  public class GTDictionaryCodeSystem implements IGTCodeSystem {
      private GTInfo info;
-     private BitSet encodedColumns = null;
      private Map<Integer, Dictionary> dictionaryMaps = null; // key: column index; value: dictionary for this column;
-     private Map<Integer, DataTypeSerializer> serializerMap = null; // column index; value: serializer for this column;
      private IFilterCodeSystem<ByteArray> filterCS;
+     private DataTypeSerializer[] serializers;
  
      public GTDictionaryCodeSystem(Map<Integer, Dictionary> dictionaryMaps) {
          this.dictionaryMaps = dictionaryMaps;
@@@ -95,12 -87,7 +88,7 @@@
  
      @Override
      public Object decodeColumnValue(int col, ByteBuffer buf) {
-         if (useDictionary(col)) {
-             int id = BytesUtil.readUnsigned(buf, dictionaryMaps.get(col).getSizeOfId());
-             return dictionaryMaps.get(col).getValueFromId(id);
-         } else {
-             return serializerMap.get(col).deserialize(buf);
-         }
 -       return serializers[col].deserialize(buf);
++        return serializers[col].deserialize(buf);
      }
  
      @Override