You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/27 10:01:07 UTC

[12/13] incubator-kylin git commit: KYLIN-976 Add ingester; Build part done, in-mem cube test pass

KYLIN-976 Add ingester; Build part done, in-mem cube test pass


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/ce61309a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/ce61309a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/ce61309a

Branch: refs/heads/KYLIN-976
Commit: ce61309ac860c1ee82acb08f6525a419d422eaa5
Parents: 8f2a56c
Author: Li, Yang <ya...@ebay.com>
Authored: Thu Nov 26 18:20:48 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Nov 27 14:49:10 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/dataGen/FactTableGenerator.java   |   2 +-
 .../streaming/StreamingTableDataGenerator.java  |   2 +-
 .../common/datatype/BigDecimalSerializer.java   | 109 -------
 .../apache/kylin/common/datatype/DataType.java  | 289 -------------------
 .../common/datatype/DataTypeSerializer.java     | 108 -------
 .../common/datatype/DateTimeSerializer.java     |  62 ----
 .../kylin/common/datatype/DoubleMutable.java    |  68 -----
 .../kylin/common/datatype/DoubleSerializer.java |  81 ------
 .../kylin/common/datatype/LongMutable.java      |  70 -----
 .../kylin/common/datatype/LongSerializer.java   |  88 ------
 .../kylin/common/datatype/StringSerializer.java |  54 ----
 .../apache/kylin/common/util/Dictionary.java    | 232 +++++++++++++++
 .../datatype/BigDecimalSerializerTest.java      |  51 ----
 .../kylin/aggregation/AggregationType.java      |  97 -------
 .../kylin/aggregation/IAggregationFactory.java  |   6 -
 .../kylin/aggregation/MeasureAggregator.java    |  76 -----
 .../kylin/aggregation/MeasureAggregators.java   |  81 ------
 .../apache/kylin/aggregation/MeasureCodec.java  |  79 -----
 .../aggregation/basic/BasicAggregation.java     | 128 --------
 .../basic/BasicAggregationFactory.java          |  31 --
 .../basic/BigDecimalMaxAggregator.java          |  54 ----
 .../basic/BigDecimalMinAggregator.java          |  55 ----
 .../basic/BigDecimalSumAggregator.java          |  51 ----
 .../aggregation/basic/DoubleMaxAggregator.java  |  54 ----
 .../aggregation/basic/DoubleMinAggregator.java  |  54 ----
 .../aggregation/basic/DoubleSumAggregator.java  |  51 ----
 .../aggregation/basic/LongMaxAggregator.java    |  54 ----
 .../aggregation/basic/LongMinAggregator.java    |  54 ----
 .../aggregation/basic/LongSumAggregator.java    |  51 ----
 .../kylin/aggregation/hllc/HLLCAggregation.java |  78 -----
 .../hllc/HLLCAggregationFactory.java            |  35 ---
 .../kylin/aggregation/hllc/HLLCAggregator.java  |  64 ----
 .../kylin/aggregation/hllc/HLLCSerializer.java  |  98 -------
 .../kylin/aggregation/hllc/LDCAggregator.java   |  63 ----
 .../kylin/aggregation/topn/TopNAggregation.java |  76 -----
 .../topn/TopNAggregationFactory.java            |  35 ---
 .../kylin/aggregation/topn/TopNAggregator.java  |  66 -----
 .../aggregation/topn/TopNCounterSerializer.java | 117 --------
 .../java/org/apache/kylin/cube/CubeManager.java |   7 +-
 .../java/org/apache/kylin/cube/CubeSegment.java |  19 +-
 .../kylin/cube/gridtable/CubeCodeSystem.java    |  30 +-
 .../kylin/cube/gridtable/CubeGridTable.java     |  13 +-
 .../gridtable/CuboidToGridTableMapping.java     |   2 +-
 .../gridtable/NotEnoughGTInfoException.java     |   1 +
 .../cube/gridtable/TrimmedCubeCodeSystem.java   |   4 +-
 .../inmemcubing/AbstractInMemCubeBuilder.java   |   6 +-
 .../cube/inmemcubing/DoggedCubeBuilder.java     |   6 +-
 .../cube/inmemcubing/InMemCubeBuilder.java      |  18 +-
 .../InMemCubeBuilderInputConverter.java         | 105 +++----
 .../cube/inmemcubing/InMemCubeBuilderUtils.java |   3 +-
 .../kylin/cube/kv/AbstractRowKeyEncoder.java    |   2 +-
 .../apache/kylin/cube/kv/RowKeyColumnIO.java    |   2 +-
 .../apache/kylin/cube/kv/RowKeyColumnOrder.java |   2 +-
 .../org/apache/kylin/cube/model/CubeDesc.java   |   4 +-
 .../model/validation/rule/FunctionRule.java     |   2 +-
 .../org/apache/kylin/cube/util/CubingUtils.java |  19 +-
 .../kylin/gridtable/DefaultGTComparator.java    |   2 +-
 .../kylin/gridtable/GTAggregateScanner.java     |   2 +-
 .../java/org/apache/kylin/gridtable/GTInfo.java |   2 +-
 .../kylin/gridtable/GTSampleCodeSystem.java     |   4 +-
 .../apache/kylin/gridtable/IGTCodeSystem.java   |   2 +-
 .../apache/kylin/gridtable/UnitTestSupport.java |   4 +-
 .../topn/TopNCounterSerializerTest.java         |  60 ----
 .../kylin/cube/DictionaryManagerTest.java       |   2 +-
 .../DoggedCubeBuilderStressTest.java            |   4 +-
 .../cube/inmemcubing/DoggedCubeBuilderTest.java |   4 +-
 .../cube/inmemcubing/InMemCubeBuilderTest.java  |  12 +-
 .../gridtable/AggregationCacheMemSizeTest.java  |  14 +-
 .../kylin/gridtable/DictGridTableTest.java      |   6 +-
 .../kylin/gridtable/SimpleGridTableTest.java    |   2 +-
 .../gridtable/SimpleInvertedIndexTest.java      |   6 +-
 .../metadata/measure/MeasureCodecTest.java      |   6 +-
 .../apache/kylin/dict/DateStrDictionary.java    |   1 +
 .../org/apache/kylin/dict/DictCodeSystem.java   |   1 +
 .../java/org/apache/kylin/dict/Dictionary.java  | 233 ---------------
 .../apache/kylin/dict/DictionaryGenerator.java  |   9 +-
 .../org/apache/kylin/dict/DictionaryInfo.java   |   1 +
 .../kylin/dict/DictionaryInfoSerializer.java    |   1 +
 .../apache/kylin/dict/DictionaryManager.java    |   3 +-
 .../apache/kylin/dict/DictionarySerializer.java |   1 +
 .../org/apache/kylin/dict/IDictionaryAware.java |   1 +
 .../dict/MultipleDictionaryValueEnumerator.java |   2 +
 .../apache/kylin/dict/TimeStrDictionary.java    |   1 +
 .../org/apache/kylin/dict/TrieDictionary.java   |   2 +
 .../apache/kylin/dict/lookup/SnapshotTable.java |   3 +-
 .../apache/kylin/dict/NumberDictionaryTest.java |   3 +-
 .../apache/kylin/measure/IMeasureFactory.java   |  24 ++
 .../apache/kylin/measure/MeasureAggregator.java |  76 +++++
 .../kylin/measure/MeasureAggregators.java       |  81 ++++++
 .../org/apache/kylin/measure/MeasureCodec.java  |  79 +++++
 .../apache/kylin/measure/MeasureIngester.java   |  44 +++
 .../org/apache/kylin/measure/MeasureType.java   | 103 +++++++
 .../measure/basic/BasicMeasureFactory.java      |  32 ++
 .../kylin/measure/basic/BasicMeasureType.java   | 135 +++++++++
 .../kylin/measure/basic/BigDecimalIngester.java |  40 +++
 .../measure/basic/BigDecimalMaxAggregator.java  |  54 ++++
 .../measure/basic/BigDecimalMinAggregator.java  |  55 ++++
 .../measure/basic/BigDecimalSumAggregator.java  |  51 ++++
 .../kylin/measure/basic/DoubleIngester.java     |  45 +++
 .../measure/basic/DoubleMaxAggregator.java      |  54 ++++
 .../measure/basic/DoubleMinAggregator.java      |  54 ++++
 .../measure/basic/DoubleSumAggregator.java      |  51 ++++
 .../kylin/measure/basic/LongIngester.java       |  45 +++
 .../kylin/measure/basic/LongMaxAggregator.java  |  54 ++++
 .../kylin/measure/basic/LongMinAggregator.java  |  54 ++++
 .../kylin/measure/basic/LongSumAggregator.java  |  51 ++++
 .../measure/hllc/HLLCAggregationFactory.java    |  36 +++
 .../kylin/measure/hllc/HLLCAggregator.java      |  64 ++++
 .../kylin/measure/hllc/HLLCMeasureType.java     | 100 +++++++
 .../kylin/measure/hllc/HLLCSerializer.java      |  87 ++++++
 .../kylin/measure/hllc/LDCAggregator.java       |  63 ++++
 .../kylin/measure/topn/TopNAggregator.java      |  66 +++++
 .../measure/topn/TopNCounterSerializer.java     | 101 +++++++
 .../kylin/measure/topn/TopNMeasureFactory.java  |  36 +++
 .../kylin/measure/topn/TopNMeasureType.java     | 111 +++++++
 .../metadata/datatype/BigDecimalSerializer.java | 100 +++++++
 .../kylin/metadata/datatype/DataType.java       | 278 ++++++++++++++++++
 .../metadata/datatype/DataTypeSerializer.java   |  99 +++++++
 .../metadata/datatype/DateTimeSerializer.java   |  49 ++++
 .../kylin/metadata/datatype/DoubleMutable.java  |  68 +++++
 .../metadata/datatype/DoubleSerializer.java     |  69 +++++
 .../kylin/metadata/datatype/LongMutable.java    |  70 +++++
 .../kylin/metadata/datatype/LongSerializer.java |  77 +++++
 .../metadata/datatype/StringSerializer.java     |  48 +++
 .../filter/TimeConditionLiteralsReplacer.java   |   2 +-
 .../apache/kylin/metadata/model/ColumnDesc.java |   2 +-
 .../kylin/metadata/model/FunctionDesc.java      |  10 +-
 .../kylin/metadata/model/MeasureDesc.java       |  12 -
 .../apache/kylin/metadata/model/TblColRef.java  |   2 +-
 .../metadata/realization/SQLDigestUtil.java     |   2 +-
 .../topn/TopNCounterSerializerTest.java         |  46 +++
 .../datatype/BigDecimalSerializerTest.java      |  53 ++++
 .../storage/translate/ColumnValueRange.java     |   2 +-
 .../org/apache/kylin/storage/tuple/Tuple.java   |   4 +-
 .../storage/translate/ColumnValueRangeTest.java |   2 +-
 .../engine/mr/steps/BaseCuboidMapperBase.java   | 123 +++-----
 .../kylin/engine/mr/steps/CuboidReducer.java    |   4 +-
 .../engine/mr/steps/InMemCuboidMapper.java      |   6 +-
 .../engine/mr/steps/InMemCuboidReducer.java     |   4 +-
 .../mr/steps/MergeCuboidFromStorageMapper.java  |   4 +-
 .../engine/mr/steps/MergeCuboidMapper.java      |   4 +-
 .../engine/mr/steps/MergeDictionaryStep.java    |   1 -
 .../kylin/engine/mr/steps/CubeReducerTest.java  |   4 +-
 .../engine/mr/steps/MergeCuboidMapperTest.java  |   4 +-
 .../apache/kylin/engine/spark/SparkCubing.java  |  12 +-
 .../streaming/OneOffStreamingBuilder.java       |   4 +-
 .../engine/streaming/StreamingBatchBuilder.java |   6 +-
 .../streaming/cube/StreamingCubeBuilder.java    |  10 +-
 .../apache/kylin/invertedindex/IISegment.java   |   3 +-
 .../index/CompressedValueContainer.java         |   3 +-
 .../invertedindex/index/RawTableRecord.java     |   4 +-
 .../apache/kylin/invertedindex/index/Slice.java |   3 +-
 .../kylin/invertedindex/index/SliceBuilder.java |   4 +-
 .../kylin/invertedindex/index/TableRecord.java  |   4 +-
 .../invertedindex/index/TableRecordInfo.java    |   4 +-
 .../index/TableRecordInfoDigest.java            |   4 +-
 .../invertedindex/measure/FixedHLLCodec.java    |   2 +-
 .../measure/FixedLenMeasureCodec.java           |   2 +-
 .../measure/FixedPointLongCodec.java            |   4 +-
 .../invertedindex/model/IIKeyValueCodec.java    |   4 +-
 .../invertedindex/util/IIDictionaryBuilder.java |   2 +-
 .../kylin/invertedindex/IIInstanceTest.java     |   2 +-
 .../invertedindex/InvertedIndexLocalTest.java   |   2 +-
 .../measure/FixedPointLongCodecTest.java        |   2 +-
 .../common/coprocessor/AggregationCache.java    |   2 +-
 .../common/coprocessor/FilterDecorator.java     |   2 +-
 .../storage/hbase/cube/v1/CubeStorageQuery.java |   2 +-
 .../hbase/cube/v1/CubeTupleConverter.java       |   2 +-
 .../observer/AggregationScanner.java            |   2 +-
 .../observer/ObserverAggregationCache.java      |   2 +-
 .../observer/ObserverAggregators.java           |   4 +-
 .../v1/coprocessor/observer/ObserverTuple.java  |   2 +-
 .../hbase/cube/v2/CubeTupleConverter.java       |   2 +-
 .../endpoint/BitMapFilterEvaluator.java         |   2 +-
 .../endpoint/ClearTextDictionary.java           |   2 +-
 .../endpoint/EndpointAggregationCache.java      |   2 +-
 .../endpoint/EndpointAggregators.java           |   6 +-
 .../ii/coprocessor/endpoint/IIEndpoint.java     |   4 +-
 .../coprocessor/endpoint/LocalDictionary.java   |   2 +-
 .../storage/hbase/steps/CreateHTableJob.java    |   2 +-
 .../storage/hbase/steps/CubeHFileMapper.java    |   2 +-
 .../hbase/steps/HBaseMROutput2Transition.java   |   2 +-
 .../storage/hbase/steps/KeyValueCreator.java    |   2 +-
 .../storage/hbase/steps/RowValueDecoder.java    |   6 +-
 .../observer/AggregateRegionObserverTest.java   |   2 +-
 .../endpoint/BitMapFilterEvaluatorTest.java     |   3 +-
 .../endpoint/EndpointAggregationTest.java       |   4 +-
 .../hbase/steps/CubeHFileMapper2Test.java       |   2 +-
 .../hbase/steps/RowValueDecoderTest.java        |   4 +-
 189 files changed, 3346 insertions(+), 3260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
index 705c175..a85d471 100644
--- a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
+++ b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
@@ -37,7 +37,6 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.cube.CubeInstance;
@@ -45,6 +44,7 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
index 04a5b5b..4ce223e 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
@@ -5,8 +5,8 @@ import java.util.List;
 import java.util.Random;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-common/src/main/java/org/apache/kylin/common/datatype/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/BigDecimalSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/BigDecimalSerializer.java
deleted file mode 100644
index f19ce14..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/datatype/BigDecimalSerializer.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.datatype;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- * 
- */
-public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
-
-    private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class);
-
-    final DataType type;
-    final int maxLength;
-
-    int avoidVerbose = 0;
-
-    public BigDecimalSerializer(DataType type) {
-        this.type = type;
-        // see serialize(): 1 byte scale, 1 byte length, assume every 2 digits takes 1 byte
-        this.maxLength = 1 + 1 + (type.getPrecision() + 1) / 2;
-    }
-
-    @Override
-    public void serialize(BigDecimal value, ByteBuffer out) {
-        if (value.scale() > type.getScale()) {
-            if (avoidVerbose % 10000 == 0) {
-                logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (avoidVerbose++));
-            }
-            value = value.setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN);
-        }
-        byte[] bytes = value.unscaledValue().toByteArray();
-        if (bytes.length + 2 > maxLength) {
-            throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
-        }
-
-        BytesUtil.writeVInt(value.scale(), out);
-        BytesUtil.writeVInt(bytes.length, out);
-        out.put(bytes);
-    }
-
-    @Override
-    public BigDecimal deserialize(ByteBuffer in) {
-        int scale = BytesUtil.readVInt(in);
-        int n = BytesUtil.readVInt(in);
-
-        byte[] bytes = new byte[n];
-        in.get(bytes);
-
-        return new BigDecimal(new BigInteger(bytes), scale);
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        int mark = in.position();
-
-        @SuppressWarnings("unused")
-        int scale = BytesUtil.readVInt(in);
-        int n = BytesUtil.readVInt(in);
-        int len = in.position() - mark + n;
-
-        in.position(mark);
-        return len;
-    }
-
-    @Override
-    public int maxLength() {
-        return maxLength;
-    }
-
-    @Override
-    public int getStorageBytesEstimate() {
-        return 8;
-    }
-
-    @Override
-    public BigDecimal valueOf(byte[] value) {
-        if (value == null)
-            return new BigDecimal(0);
-        else
-            return new BigDecimal(Bytes.toString(value));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-common/src/main/java/org/apache/kylin/common/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/DataType.java b/core-common/src/main/java/org/apache/kylin/common/datatype/DataType.java
deleted file mode 100644
index 36fad00..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/datatype/DataType.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.datatype;
-
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- */
-@SuppressWarnings("serial")
-public class DataType implements Serializable {
-
-    // standard sql types, ref: http://www.w3schools.com/sql/sql_datatypes_general.asp
-    public static final String VALID_TYPES_STRING = "any|char|varchar|boolean|binary" //
-            + "|integer|tinyint|smallint|bigint|decimal|numeric|float|real|double" //
-            + "|date|time|datetime|timestamp|byte|int|short|long|string";
-
-    private static final String TYPE_PATTEN_TAIL = "\\s*" //
-            + "(?:" + "[(]" + "([\\d\\s,]+)" + "[)]" + ")?";
-
-    private static final Pattern TYPE_PATTERN = Pattern.compile( //
-            "(" + VALID_TYPES_STRING + ")" + TYPE_PATTEN_TAIL, Pattern.CASE_INSENSITIVE);
-
-    private static final Pattern CUSTOM_TYPE_PATTERN = Pattern.compile( //
-            "(" + ".*?" + ")" + TYPE_PATTEN_TAIL, Pattern.CASE_INSENSITIVE);
-    
-    public static final Set<String> INTEGER_FAMILY = new HashSet<String>();
-    public static final Set<String> NUMBER_FAMILY = new HashSet<String>();
-    public static final Set<String> DATETIME_FAMILY = new HashSet<String>();
-    public static final Set<String> STRING_FAMILY = new HashSet<String>();
-    private static final Map<String, String> LEGACY_TYPE_MAP = new HashMap<String, String>();
-    static {
-        INTEGER_FAMILY.add("tinyint");
-        INTEGER_FAMILY.add("smallint");
-        INTEGER_FAMILY.add("integer");
-        INTEGER_FAMILY.add("bigint");
-
-        NUMBER_FAMILY.addAll(INTEGER_FAMILY);
-        NUMBER_FAMILY.add("float");
-        NUMBER_FAMILY.add("double");
-        NUMBER_FAMILY.add("decimal");
-        NUMBER_FAMILY.add("real");
-        NUMBER_FAMILY.add("numeric");
-
-        DATETIME_FAMILY.add("date");
-        DATETIME_FAMILY.add("time");
-        DATETIME_FAMILY.add("datetime");
-        DATETIME_FAMILY.add("timestamp");
-
-        STRING_FAMILY.add("varchar");
-        STRING_FAMILY.add("char");
-
-        LEGACY_TYPE_MAP.put("byte", "tinyint");
-        LEGACY_TYPE_MAP.put("int", "integer");
-        LEGACY_TYPE_MAP.put("short", "smallint");
-        LEGACY_TYPE_MAP.put("long", "bigint");
-        LEGACY_TYPE_MAP.put("string", "varchar");
-        LEGACY_TYPE_MAP.put("hllc10", "hllc(10)");
-        LEGACY_TYPE_MAP.put("hllc12", "hllc(12)");
-        LEGACY_TYPE_MAP.put("hllc14", "hllc(14)");
-        LEGACY_TYPE_MAP.put("hllc15", "hllc(15)");
-        LEGACY_TYPE_MAP.put("hllc16", "hllc(16)");
-    }
-
-    private static final ConcurrentMap<DataType, DataType> CACHE = new ConcurrentHashMap<DataType, DataType>();
-
-    public static final DataType ANY = DataType.getType("any");
-
-    public static DataType getType(String type) {
-        return getTypeInner(type, false);
-    }
-    
-    public static DataType getCustomType(String type) {
-        return getTypeInner(type, true);
-    }
-    
-    private static DataType getTypeInner(String type, boolean isCustom) {
-        if (type == null)
-            return null;
-
-        DataType dataType = new DataType(type, isCustom);
-        DataType cached = CACHE.get(dataType);
-        if (cached == null) {
-            CACHE.put(dataType, dataType);
-            cached = dataType;
-        }
-        return cached;
-    }
-
-    // ============================================================================
-
-    private String name;
-    private int precision;
-    private int scale;
-
-    DataType(String datatype, boolean isCustom) {
-        datatype = datatype.trim().toLowerCase();
-        datatype = replaceLegacy(datatype);
-
-        Pattern pattern = isCustom ? CUSTOM_TYPE_PATTERN : TYPE_PATTERN;
-        Matcher m = pattern.matcher(datatype);
-        if (m.matches() == false)
-            throw new IllegalArgumentException("bad data type -- " + datatype + ", does not match " + pattern);
-
-        name = replaceLegacy(m.group(1));
-        precision = -1;
-        scale = -1;
-
-        String leftover = m.group(2);
-        if (leftover != null) {
-            String[] parts = leftover.split("\\s*,\\s*");
-            for (int i = 0; i < parts.length; i++) {
-                int n;
-                try {
-                    n = Integer.parseInt(parts[i]);
-                } catch (NumberFormatException e) {
-                    throw new IllegalArgumentException("bad data type -- " + datatype + ", precision/scale not numeric");
-                }
-                if (i == 0)
-                    precision = n;
-                else if (i == 1)
-                    scale = n;
-                else
-                    throw new IllegalArgumentException("bad data type -- " + datatype + ", too many precision/scale parts");
-            }
-        }
-
-        // FIXME 256 for unknown string precision
-        if ((name.equals("char") || name.equals("varchar")) && precision == -1) {
-            precision = 256; // to save memory at frontend, e.g. tableau will
-                             // allocate memory according to this
-        }
-
-        // FIXME (19,4) for unknown decimal precision
-        if ((name.equals("decimal") || name.equals("numeric")) && precision == -1) {
-            precision = 19;
-            scale = 4;
-        }
-    }
-
-    private String replaceLegacy(String str) {
-        String replace = LEGACY_TYPE_MAP.get(str);
-        return replace == null ? str : replace;
-    }
-
-    public int getStorageBytesEstimate() {
-        return DataTypeSerializer.create(this).getStorageBytesEstimate();
-    }
-
-    public boolean isStringFamily() {
-        return STRING_FAMILY.contains(name);
-    }
-
-    public boolean isIntegerFamily() {
-        return INTEGER_FAMILY.contains(name);
-    }
-
-    public boolean isNumberFamily() {
-        return NUMBER_FAMILY.contains(name);
-    }
-
-    public boolean isDateTimeFamily() {
-        return DATETIME_FAMILY.contains(name);
-    }
-
-    public boolean isDate() {
-        return name.equals("date");
-    }
-
-    public boolean isTime() {
-        return name.equals("time");
-    }
-
-    public boolean isTimestamp() {
-        return name.equals("timestamp");
-    }
-
-    public boolean isDatetime() {
-        return name.equals("datetime");
-    }
-
-    public boolean isTinyInt() {
-        return name.equals("tinyint");
-    }
-
-    public boolean isSmallInt() {
-        return name.equals("smallint");
-    }
-
-    public boolean isInt() {
-        return name.equals("integer");
-    }
-
-    public boolean isBigInt() {
-        return name.equals("bigint");
-    }
-
-    public boolean isFloat() {
-        return name.equals("float");
-    }
-
-    public boolean isDouble() {
-        return name.equals("double");
-    }
-
-    public boolean isDecimal() {
-        return name.equals("decimal");
-    }
-
-    public boolean isHLLC() {
-        return name.equals("hllc");
-    }
-
-    public String getName() {
-        return name;
-    }
-
-    public int getPrecision() {
-        return precision;
-    }
-
-    public int getScale() {
-        return scale;
-    }
-
-    @Override
-    public int hashCode() {
-        final int prime = 31;
-        int result = 1;
-        result = prime * result + ((name == null) ? 0 : name.hashCode());
-        result = prime * result + precision;
-        result = prime * result + scale;
-        return result;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj)
-            return true;
-        if (obj == null)
-            return false;
-        if (getClass() != obj.getClass())
-            return false;
-        DataType other = (DataType) obj;
-        if (name == null) {
-            if (other.name != null)
-                return false;
-        } else if (!name.equals(other.name))
-            return false;
-        if (precision != other.precision)
-            return false;
-        if (scale != other.scale)
-            return false;
-        return true;
-    }
-
-    @Override
-    public String toString() {
-        if (precision < 0 && scale < 0)
-            return name;
-        else if (scale < 0)
-            return name + "(" + precision + ")";
-        else
-            return name + "(" + precision + "," + scale + ")";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-common/src/main/java/org/apache/kylin/common/datatype/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/DataTypeSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/DataTypeSerializer.java
deleted file mode 100644
index 99fa3fd..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/datatype/DataTypeSerializer.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.datatype;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.kylin.common.util.BytesSerializer;
-
-import com.google.common.collect.Maps;
-
-/**
- * @author yangli9
- * 
- * Note: the implementations MUST be thread-safe.
- */
-abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
-
-    final static Map<String, Class<?>> implementations;
-    static {
-        HashMap<String, Class<?>> impl = Maps.newHashMap();
-        impl.put("varchar", StringSerializer.class);
-        impl.put("decimal", BigDecimalSerializer.class);
-        impl.put("double", DoubleSerializer.class);
-        impl.put("float", DoubleSerializer.class);
-        impl.put("bigint", LongSerializer.class);
-        impl.put("long", LongSerializer.class);
-        impl.put("integer", LongSerializer.class);
-        impl.put("int", LongSerializer.class);
-        impl.put("smallint", LongSerializer.class);
-        impl.put("date", DateTimeSerializer.class);
-        impl.put("datetime", DateTimeSerializer.class);
-        impl.put("timestamp", DateTimeSerializer.class);
-        implementations = Collections.unmodifiableMap(impl);
-    }
-    
-    public static boolean hasRegistered(String dataTypeName) {
-        return implementations.containsKey(dataTypeName);
-    }
-    
-    public static void register(String dataTypeName, Class<? extends DataTypeSerializer<?>> impl) {
-        implementations.put(dataTypeName, impl);
-    }
-
-    public static DataTypeSerializer<?> create(String dataType) {
-        return create(DataType.getType(dataType));
-    }
-
-    public static DataTypeSerializer<?> create(DataType type) {
-        Class<?> clz = implementations.get(type.getName());
-        if (clz == null)
-            throw new RuntimeException("No DataTypeSerializer for type " + type);
-
-        try {
-            return (DataTypeSerializer<?>) clz.getConstructor(DataType.class).newInstance(type);
-        } catch (Exception e) {
-            throw new RuntimeException(e); // never happen
-        }
-    }
-    
-    /** peek into buffer and return the length of serialization */
-    abstract public int peekLength(ByteBuffer in);
-
-    /** return the max number of bytes to the longest serialization */
-    abstract public int maxLength();
-
-    /** get an estimate of size in bytes of the serialized data */
-    abstract public int getStorageBytesEstimate();
-
-    /** convert from String to obj (string often come as byte[] in mapred) */
-    abstract public T valueOf(byte[] value);
-
-    /** convert from String to obj */
-    public T valueOf(String value) {
-        try {
-            return valueOf(value.getBytes("UTF-8"));
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException(e); // never happen
-        }
-    }
-
-    /** convert from obj to string */
-    public String toString(T value) {
-        if (value == null)
-            return "NULL";
-        else
-            return value.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-common/src/main/java/org/apache/kylin/common/datatype/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/DateTimeSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/DateTimeSerializer.java
deleted file mode 100644
index d3ef7cd..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/datatype/DateTimeSerializer.java
+++ /dev/null
@@ -1,62 +0,0 @@
-package org.apache.kylin.common.datatype;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.DateFormat;
-
-public class DateTimeSerializer extends DataTypeSerializer<LongMutable> {
-
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
-
-    public DateTimeSerializer(DataType type) {
-    }
-
-    @Override
-    public void serialize(LongMutable value, ByteBuffer out) {
-        out.putLong(value.get());
-    }
-
-    private LongMutable current() {
-        LongMutable l = current.get();
-        if (l == null) {
-            l = new LongMutable();
-            current.set(l);
-        }
-        return l;
-    }
-
-    @Override
-    public LongMutable deserialize(ByteBuffer in) {
-        LongMutable l = current();
-        l.set(in.getLong());
-        return l;
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        return 8;
-    }
-
-    @Override
-    public int maxLength() {
-        return 8;
-    }
-
-    @Override
-    public int getStorageBytesEstimate() {
-        return 8;
-    }
-
-    @Override
-    public LongMutable valueOf(byte[] value) {
-        LongMutable l = current();
-        if (value == null)
-            l.set(0L);
-        else
-            l.set(DateFormat.stringToMillis(Bytes.toString(value)));
-        return l;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleMutable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleMutable.java b/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleMutable.java
deleted file mode 100644
index a32fbd8..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleMutable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.datatype;
-
-import java.io.Serializable;
-
-@SuppressWarnings("serial")
-public class DoubleMutable implements Comparable<DoubleMutable>, Serializable {
-
-    private double v;
-
-    public DoubleMutable() {
-        this(0);
-    }
-
-    public DoubleMutable(double v) {
-        set(v);
-    }
-
-    public double get() {
-        return v;
-    }
-
-    public void set(double v) {
-        this.v = v;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof DoubleMutable)) {
-            return false;
-        }
-        DoubleMutable other = (DoubleMutable) o;
-        return this.v == other.v;
-    }
-
-    @Override
-    public int hashCode() {
-        return (int) Double.doubleToLongBits(v);
-    }
-
-    @Override
-    public int compareTo(DoubleMutable o) {
-        return (v < o.v ? -1 : (v == o.v ? 0 : 1));
-    }
-
-    @Override
-    public String toString() {
-        return Double.toString(v);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleSerializer.java
deleted file mode 100644
index f128576..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleSerializer.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.datatype;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.util.Bytes;
-
-/**
- */
-public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> {
-
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<DoubleMutable> current = new ThreadLocal<DoubleMutable>();
-
-    public DoubleSerializer(DataType type) {
-    }
-
-    @Override
-    public void serialize(DoubleMutable value, ByteBuffer out) {
-        out.putDouble(value.get());
-    }
-
-    private DoubleMutable current() {
-        DoubleMutable d = current.get();
-        if (d == null) {
-            d = new DoubleMutable();
-            current.set(d);
-        }
-        return d;
-    }
-
-    @Override
-    public DoubleMutable deserialize(ByteBuffer in) {
-        DoubleMutable d = current();
-        d.set(in.getDouble());
-        return d;
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        return 8;
-    }
-
-    @Override
-    public int maxLength() {
-        return 8;
-    }
-
-    @Override
-    public int getStorageBytesEstimate() {
-        return 8;
-    }
-
-    @Override
-    public DoubleMutable valueOf(byte[] value) {
-        DoubleMutable d = current();
-        if (value == null)
-            d.set(0d);
-        else
-            d.set(Double.parseDouble(Bytes.toString(value)));
-        return d;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-common/src/main/java/org/apache/kylin/common/datatype/LongMutable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/LongMutable.java b/core-common/src/main/java/org/apache/kylin/common/datatype/LongMutable.java
deleted file mode 100644
index 72b540c..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/datatype/LongMutable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.datatype;
-
-import java.io.Serializable;
-
-@SuppressWarnings("serial")
-public class LongMutable implements Comparable<LongMutable>, Serializable {
-
-    private long v;
-
-    public LongMutable() {
-        this(0);
-    }
-
-    public LongMutable(long v) {
-        set(v);
-    }
-
-    public long get() {
-        return v;
-    }
-
-    public void set(long v) {
-        this.v = v;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof LongMutable)) {
-            return false;
-        }
-        LongMutable other = (LongMutable) o;
-        return this.v == other.v;
-    }
-
-    @Override
-    public int hashCode() {
-        return (int) v;
-    }
-
-    @Override
-    public int compareTo(LongMutable o) {
-        long thisValue = this.v;
-        long thatValue = o.v;
-        return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
-    }
-
-    @Override
-    public String toString() {
-        return Long.toString(v);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-common/src/main/java/org/apache/kylin/common/datatype/LongSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/LongSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/LongSerializer.java
deleted file mode 100644
index 5c5e10f..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/datatype/LongSerializer.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.datatype;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-
-/**
- */
-public class LongSerializer extends DataTypeSerializer<LongMutable> {
-
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
-
-    public LongSerializer(DataType type) {
-    }
-
-    @Override
-    public void serialize(LongMutable value, ByteBuffer out) {
-        BytesUtil.writeVLong(value.get(), out);
-    }
-
-    private LongMutable current() {
-        LongMutable l = current.get();
-        if (l == null) {
-            l = new LongMutable();
-            current.set(l);
-        }
-        return l;
-    }
-
-    @Override
-    public LongMutable deserialize(ByteBuffer in) {
-        LongMutable l = current();
-        l.set(BytesUtil.readVLong(in));
-        return l;
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        int mark = in.position();
-
-        BytesUtil.readVLong(in);
-        int len = in.position() - mark;
-
-        in.position(mark);
-        return len;
-    }
-
-    @Override
-    public int maxLength() {
-        return 9; // vlong: 1 + 8
-    }
-
-    @Override
-    public int getStorageBytesEstimate() {
-        return 5;
-    }
-
-    @Override
-    public LongMutable valueOf(byte[] value) {
-        LongMutable l = current();
-        if (value == null)
-            l.set(0L);
-        else
-            l.set(Long.parseLong(Bytes.toString(value)));
-        return l;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-common/src/main/java/org/apache/kylin/common/datatype/StringSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/StringSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/StringSerializer.java
deleted file mode 100644
index b3562eb..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/datatype/StringSerializer.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.kylin.common.datatype;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-
-public class StringSerializer extends DataTypeSerializer<String> {
-
-    final DataType type;
-    final int maxLength;
-
-    public StringSerializer(DataType type) {
-        this.type = type;
-        // see serialize(): 2 byte length, rest is String.toBytes()
-        this.maxLength = 2 + type.getPrecision();
-    }
-
-    @Override
-    public void serialize(String value, ByteBuffer out) {
-        int start = out.position();
-
-        BytesUtil.writeUTFString(value, out);
-
-        if (out.position() - start > maxLength)
-            throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
-    }
-
-    @Override
-    public String deserialize(ByteBuffer in) {
-        return BytesUtil.readUTFString(in);
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        return BytesUtil.peekByteArrayLength(in);
-    }
-
-    @Override
-    public int maxLength() {
-        return maxLength;
-    }
-
-    @Override
-    public int getStorageBytesEstimate() {
-        return maxLength;
-    }
-
-    @Override
-    public String valueOf(byte[] value) {
-        return Bytes.toString(value);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
new file mode 100644
index 0000000..6d3fa62
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Dictionary.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.io.PrintStream;
+import java.io.Serializable;
+import java.io.UnsupportedEncodingException;
+
+import org.apache.kylin.common.persistence.Writable;
+
+/**
+ * A bi-way dictionary that maps from dimension/column values to IDs and vice
+ * versa. By storing IDs instead of real values, the size of cube is
+ * significantly reduced.
+ * 
+ * - IDs are smallest integers possible for the cardinality of a column, for the
+ * purpose of minimal storage space - IDs preserve ordering of values, such that
+ * range query can be applied to IDs directly
+ * 
+ * A dictionary once built, is immutable. This allows optimal memory footprint
+ * by e.g. flatten the Trie structure into a byte array, replacing node pointers
+ * with array offsets.
+ * 
+ * @author yangli9
+ */
+@SuppressWarnings("serial")
+abstract public class Dictionary<T> implements Writable, Serializable {
+
+    public static final byte NULL = (byte) 0xff;
+
+    // ID with all bit-1 (0xff e.g.) reserved for NULL value
+    public static final int NULL_ID[] = new int[] { 0, 0xff, 0xffff, 0xffffff, 0xffffffff };
+
+    abstract public int getMinId();
+
+    abstract public int getMaxId();
+
+    public int getSize() {
+        return getMaxId() - getMinId() + 1;
+    }
+
+    /**
+     * @return the size of an ID in bytes, determined by the cardinality of column
+     */
+    abstract public int getSizeOfId();
+
+    /**
+     * @return the (maximum) size of value in bytes, determined by the longest value
+     */
+    abstract public int getSizeOfValue();
+
+    /**
+     * @return true if each entry of this dict is contained by the dict in param
+     */
+    abstract public boolean contains(Dictionary<?> another);
+
+    /**
+     * Convenient form of <code>getIdFromValue(value, 0)</code>
+     */
+    final public int getIdFromValue(T value) throws IllegalArgumentException {
+        return getIdFromValue(value, 0);
+    }
+
+    /**
+     * Returns the ID integer of given value. In case of not found
+     * <p>
+     * - if roundingFlag=0, throw IllegalArgumentException; <br>
+     * - if roundingFlag<0, the closest smaller ID integer if exist; <br>
+     * - if roundingFlag>0, the closest bigger ID integer if exist. <br>
+     * <p>
+     * The implementation often has cache, thus faster than the byte[] version getIdFromValueBytes()
+     * 
+     * @throws IllegalArgumentException
+     *             if value is not found in dictionary and rounding is off;
+     *             or if rounding cannot find a smaller or bigger ID
+     */
+    final public int getIdFromValue(T value, int roundingFlag) throws IllegalArgumentException {
+        if (isNullObjectForm(value))
+            return nullId();
+        else
+            return getIdFromValueImpl(value, roundingFlag);
+    }
+
+    final public boolean containsValue(T value) throws IllegalArgumentException {
+        if (isNullObjectForm(value)) {
+            return true;
+        } else {
+            try {
+                //if no key found, it will throw exception
+                getIdFromValueImpl(value, 0);
+            } catch (IllegalArgumentException e) {
+                return false;
+            }
+            return true;
+        }
+    }
+
+    protected boolean isNullObjectForm(T value) {
+        return value == null;
+    }
+
+    abstract protected int getIdFromValueImpl(T value, int roundingFlag);
+
+    /**
+     * @return the value corresponds to the given ID
+     * @throws IllegalArgumentException
+     *             if ID is not found in dictionary
+     */
+    final public T getValueFromId(int id) throws IllegalArgumentException {
+        if (isNullId(id))
+            return null;
+        else
+            return getValueFromIdImpl(id);
+    }
+
+    abstract protected T getValueFromIdImpl(int id);
+
+    /**
+     * Convenient form of
+     * <code>getIdFromValueBytes(value, offset, len, 0)</code>
+     */
+    final public int getIdFromValueBytes(byte[] value, int offset, int len) throws IllegalArgumentException {
+        return getIdFromValueBytes(value, offset, len, 0);
+    }
+
+    /**
+     * A lower level API, return ID integer from raw value bytes. In case of not found 
+     * <p>
+     * - if roundingFlag=0, throw IllegalArgumentException; <br>
+     * - if roundingFlag<0, the closest smaller ID integer if exist; <br>
+     * - if roundingFlag>0, the closest bigger ID integer if exist. <br>
+     * <p>
+     * Bypassing the cache layer, this could be significantly slower than getIdFromValue(T value).
+     * 
+     * @throws IllegalArgumentException
+     *             if value is not found in dictionary and rounding is off;
+     *             or if rounding cannot find a smaller or bigger ID
+     */
+    final public int getIdFromValueBytes(byte[] value, int offset, int len, int roundingFlag) throws IllegalArgumentException {
+        if (isNullByteForm(value, offset, len))
+            return nullId();
+        else {
+            int id = getIdFromValueBytesImpl(value, offset, len, roundingFlag);
+            if (id < 0)
+                throw new IllegalArgumentException("Value not exists!");
+            return id;
+        }
+    }
+
+    protected boolean isNullByteForm(byte[] value, int offset, int len) {
+        return value == null;
+    }
+
+    abstract protected int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag);
+
+    final public byte[] getValueBytesFromId(int id) {
+        if (isNullId(id))
+            return BytesUtil.EMPTY_BYTE_ARRAY;
+        else
+            return getValueBytesFromIdImpl(id);
+    }
+
+    abstract protected byte[] getValueBytesFromIdImpl(int id);
+
+    /**
+     * A lower level API, get byte values from ID, return the number of bytes
+     * written. Bypassing the cache layer, this could be significantly slower
+     * than getIdFromValue(T value).
+     *
+     * @return size of value bytes, 0 if empty string, -1 if null
+     *
+     * @throws IllegalArgumentException
+     *             if ID is not found in dictionary
+     */
+    final public int getValueBytesFromId(int id, byte[] returnValue, int offset) throws IllegalArgumentException {
+        if (isNullId(id))
+            return -1;
+        else
+            return getValueBytesFromIdImpl(id, returnValue, offset);
+    }
+
+    abstract protected int getValueBytesFromIdImpl(int id, byte[] returnValue, int offset);
+
+    abstract public void dump(PrintStream out);
+
+    public int nullId() {
+        return NULL_ID[getSizeOfId()];
+    }
+
+    public boolean isNullId(int id) {
+        int nullId = NULL_ID[getSizeOfId()];
+        return (nullId & id) == nullId;
+    }
+
+    /** utility that converts a dictionary ID to string, preserving order */
+    public static String dictIdToString(byte[] idBytes, int offset, int length) {
+        try {
+            return new String(idBytes, offset, length, "ISO-8859-1");
+        } catch (UnsupportedEncodingException e) {
+            // never happen
+            return null;
+        }
+    }
+
+    /** the reverse of dictIdToString(), returns integer ID */
+    public static int stringToDictId(String str) {
+        try {
+            byte[] bytes = str.getBytes("ISO-8859-1");
+            return BytesUtil.readUnsigned(bytes, 0, bytes.length);
+        } catch (UnsupportedEncodingException e) {
+            // never happen
+            return 0;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-common/src/test/java/org/apache/kylin/common/datatype/BigDecimalSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/datatype/BigDecimalSerializerTest.java b/core-common/src/test/java/org/apache/kylin/common/datatype/BigDecimalSerializerTest.java
deleted file mode 100644
index cba6795..0000000
--- a/core-common/src/test/java/org/apache/kylin/common/datatype/BigDecimalSerializerTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package org.apache.kylin.common.datatype;
-
-import static org.junit.Assert.*;
-
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-/**
- */
-public class BigDecimalSerializerTest {
-
-    private static BigDecimalSerializer bigDecimalSerializer;
-
-    @BeforeClass
-    public static void beforeClass() {
-        bigDecimalSerializer = new BigDecimalSerializer(DataType.getType("decimal"));
-    }
-
-    @Test
-    public void testNormal() {
-        BigDecimal input = new BigDecimal("1234.1234");
-        ByteBuffer buffer = ByteBuffer.allocate(256);
-        buffer.mark();
-        bigDecimalSerializer.serialize(input, buffer);
-        buffer.reset();
-        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
-        assertEquals(input, output);
-    }
-
-    @Test
-    public void testScaleOutOfRange() {
-        BigDecimal input = new BigDecimal("1234.1234567890");
-        ByteBuffer buffer = ByteBuffer.allocate(256);
-        buffer.mark();
-        bigDecimalSerializer.serialize(input, buffer);
-        buffer.reset();
-        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
-        assertEquals(input.setScale(bigDecimalSerializer.type.getScale(), BigDecimal.ROUND_HALF_EVEN), output);
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testOutOfPrecision() {
-        BigDecimal input = new BigDecimal("66855344214907231736.4924");
-        ByteBuffer buffer = ByteBuffer.allocate(256);
-        bigDecimalSerializer.serialize(input, buffer);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/AggregationType.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/AggregationType.java b/core-cube/src/main/java/org/apache/kylin/aggregation/AggregationType.java
deleted file mode 100644
index 81345b7..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/AggregationType.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.aggregation.basic.BasicAggregationFactory;
-import org.apache.kylin.aggregation.hllc.HLLCAggregationFactory;
-import org.apache.kylin.aggregation.topn.TopNAggregationFactory;
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.datatype.DataTypeSerializer;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-import com.google.common.collect.Maps;
-
-abstract public class AggregationType {
-    
-    private static final Map<String, IAggregationFactory> factoryRegistry = Maps.newConcurrentMap();
-    private static final IAggregationFactory defaultFactory = new BasicAggregationFactory();
-    
-    static {
-        factoryRegistry.put(FunctionDesc.FUNC_COUNT_DISTINCT, new HLLCAggregationFactory());
-        factoryRegistry.put(FunctionDesc.FUNC_TOP_N, new TopNAggregationFactory());
-    }
-    
-    public static AggregationType create(String funcName, String dataType) {
-        funcName = funcName.toUpperCase();
-        dataType = dataType.toLowerCase();
-        
-        IAggregationFactory factory = factoryRegistry.get(funcName);
-        if (factory == null)
-            factory = defaultFactory;
-        
-        AggregationType result = factory.createAggregationType(funcName, dataType);
-        
-        // register serializer for aggr data type
-        DataType aggregationDataType = result.getAggregationDataType();
-        if (DataTypeSerializer.hasRegistered(aggregationDataType.getName()) == false) {
-            DataTypeSerializer.register(aggregationDataType.getName(), result.getAggregationDataSeralizer());
-        }
-        
-        return result;
-    }
-    
-    /* ============================================================================
-     * Define
-     * ---------------------------------------------------------------------------- */
-    
-    abstract public DataType getAggregationDataType();
-    
-    abstract public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer();
-    
-    abstract public void validate(MeasureDesc measureDesc) throws IllegalArgumentException;
-    
-    /* ============================================================================
-     * Build
-     * ---------------------------------------------------------------------------- */
-    
-    abstract public MeasureAggregator<?> newAggregator();
- 
-    abstract public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc);
-    
-    abstract public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts);
-
-    /* ============================================================================
-     * Cube Selection
-     * ---------------------------------------------------------------------------- */
-    
-    /* ============================================================================
-     * Query
-     * ---------------------------------------------------------------------------- */
-    
-    /* ============================================================================
-     * Storage
-     * ---------------------------------------------------------------------------- */
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/IAggregationFactory.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/IAggregationFactory.java b/core-cube/src/main/java/org/apache/kylin/aggregation/IAggregationFactory.java
deleted file mode 100644
index cbdd9da..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/IAggregationFactory.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package org.apache.kylin.aggregation;
-
-public interface IAggregationFactory {
-
-    public AggregationType createAggregationType(String funcName, String dataType);
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
deleted file mode 100644
index 6968a9d..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation;
-
-import java.io.Serializable;
-
-/**
- */
-@SuppressWarnings("serial")
-abstract public class MeasureAggregator<V> implements Serializable {
-
-    public static MeasureAggregator<?> create(String funcName, String dataType) {
-        return AggregationType.create(funcName, dataType).newAggregator();
-    }
-
-    public static int guessBigDecimalMemBytes() {
-        // 116 returned by AggregationCacheMemSizeTest
-        return 8 // aggregator obj shell
-        + 8 // ref to BigDecimal
-        + 8 // BigDecimal obj shell
-        + 100; // guess of BigDecimal internal
-    }
-
-    public static int guessDoubleMemBytes() {
-        // 29 to 44 returned by AggregationCacheMemSizeTest
-        return 44;
-        /*
-        return 8 // aggregator obj shell
-        + 8 // ref to DoubleWritable
-        + 8 // DoubleWritable obj shell
-        + 8; // size of double
-        */
-    }
-
-    public static int guessLongMemBytes() {
-        // 29 to 44 returned by AggregationCacheMemSizeTest
-        return 44;
-        /*
-        return 8 // aggregator obj shell
-        + 8 // ref to LongWritable
-        + 8 // LongWritable obj shell
-        + 8; // size of long
-        */
-    }
-
-    // ============================================================================
-
-    @SuppressWarnings("rawtypes")
-    public void setDependentAggregator(MeasureAggregator agg) {
-    }
-
-    abstract public void reset();
-
-    abstract public void aggregate(V value);
-
-    abstract public V getState();
-
-    // get an estimate of memory consumption UPPER BOUND
-    abstract public int getMemBytesEstimate();
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregators.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregators.java b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregators.java
deleted file mode 100644
index 3aa575b..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregators.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-/**
- */
-@SuppressWarnings({ "rawtypes", "unchecked", "serial" })
-public class MeasureAggregators implements Serializable {
-
-    private final MeasureAggregator[] aggs;
-    private final int descLength;
-
-    public MeasureAggregators(Collection<MeasureDesc> measureDescs) {
-        this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
-    }
-
-    public MeasureAggregators(MeasureDesc... measureDescs) {
-        descLength = measureDescs.length;
-        aggs = new MeasureAggregator[descLength];
-
-        Map<String, Integer> measureIndexMap = new HashMap<String, Integer>();
-        for (int i = 0; i < descLength; i++) {
-            FunctionDesc func = measureDescs[i].getFunction();
-            aggs[i] = MeasureAggregator.create(func.getExpression(), func.getReturnType());
-            measureIndexMap.put(measureDescs[i].getName(), i);
-        }
-        // fill back dependent aggregator
-        for (int i = 0; i < descLength; i++) {
-            String depMsrRef = measureDescs[i].getDependentMeasureRef();
-            if (depMsrRef != null) {
-                int index = measureIndexMap.get(depMsrRef);
-                aggs[i].setDependentAggregator(aggs[index]);
-            }
-        }
-    }
-
-    public void reset() {
-        for (int i = 0; i < aggs.length; i++) {
-            aggs[i].reset();
-        }
-    }
-    
-    public void aggregate(Object[] values) {
-        assert values.length == descLength;
-
-        for (int i = 0; i < descLength; i++) {
-            aggs[i].aggregate(values[i]);
-        }
-    }
-
-    public void collectStates(Object[] states) {
-        for (int i = 0; i < descLength; i++) {
-            states[i] = aggs[i].getState();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
deleted file mode 100644
index 8721954..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation;
-
-import java.nio.ByteBuffer;
-import java.util.Collection;
-
-import org.apache.kylin.common.datatype.DataTypeSerializer;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-/**
- * @author yangli9
- * 
- */
-@SuppressWarnings({ "rawtypes", "unchecked" })
-public class MeasureCodec {
-
-    int nMeasures;
-    DataTypeSerializer[] serializers;
-
-    public MeasureCodec(Collection<MeasureDesc> measureDescs) {
-        this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
-    }
-
-    public MeasureCodec(MeasureDesc... measureDescs) {
-        String[] dataTypes = new String[measureDescs.length];
-        for (int i = 0; i < dataTypes.length; i++) {
-            dataTypes[i] = measureDescs[i].getFunction().getReturnType();
-        }
-        init(dataTypes);
-    }
-
-    public MeasureCodec(String... dataTypes) {
-        init(dataTypes);
-    }
-
-    private void init(String[] dataTypes) {
-        nMeasures = dataTypes.length;
-        serializers = new DataTypeSerializer[nMeasures];
-
-        for (int i = 0; i < nMeasures; i++) {
-            serializers[i] = DataTypeSerializer.create(dataTypes[i]);
-        }
-    }
-
-    public DataTypeSerializer getSerializer(int idx) {
-        return serializers[idx];
-    }
-
-    public void decode(ByteBuffer buf, Object[] result) {
-        assert result.length == nMeasures;
-        for (int i = 0; i < nMeasures; i++) {
-            result[i] = serializers[i].deserialize(buf);
-        }
-    }
-
-    public void encode(Object[] values, ByteBuffer out) {
-        assert values.length == nMeasures;
-        for (int i = 0; i < nMeasures; i++) {
-            serializers[i].serialize(values[i], out);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregation.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregation.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregation.java
deleted file mode 100644
index 7ea88bd..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregation.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import java.util.List;
-
-import org.apache.kylin.aggregation.AggregationType;
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.BigDecimalSerializer;
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.datatype.DataTypeSerializer;
-import org.apache.kylin.common.datatype.DateTimeSerializer;
-import org.apache.kylin.common.datatype.DoubleSerializer;
-import org.apache.kylin.common.datatype.LongSerializer;
-import org.apache.kylin.common.datatype.StringSerializer;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-public class BasicAggregation extends AggregationType {
-    
-    private final String funcName;
-    private final DataType dataType;
-
-    public BasicAggregation(String funcName, String dataType) {
-        this.funcName = funcName;
-        this.dataType = DataType.getType(dataType);
-    }
-
-    @Override
-    public DataType getAggregationDataType() {
-        return dataType;
-    }
-
-    public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
-        if (dataType.isStringFamily())
-            return StringSerializer.class;
-        else if (dataType.isIntegerFamily())
-            return LongSerializer.class;
-        else if (dataType.isDecimal())
-            return BigDecimalSerializer.class;
-        else if (dataType.isNumberFamily())
-            return DoubleSerializer.class;
-        else if (dataType.isDateTimeFamily())
-            return DateTimeSerializer.class;
-        else
-            throw new IllegalArgumentException("No default serializer for type " + dataType);
-    }
-    
-    @Override
-    public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
-        // TODO Auto-generated method stub
-        
-    }
-
-    @Override
-    public MeasureAggregator<?> newAggregator() {
-        if (isSum() || isCount()) {
-            if (dataType.isDecimal())
-                return new BigDecimalSumAggregator();
-            else if (dataType.isIntegerFamily())
-                return new LongSumAggregator();
-            else if (dataType.isNumberFamily())
-                return new DoubleSumAggregator();
-        } else if (isMax()) {
-            if (dataType.isDecimal())
-                return new BigDecimalMaxAggregator();
-            else if (dataType.isIntegerFamily())
-                return new LongMaxAggregator();
-            else if (dataType.isNumberFamily())
-                return new DoubleMaxAggregator();
-        } else if (isMin()) {
-            if (dataType.isDecimal())
-                return new BigDecimalMinAggregator();
-            else if (dataType.isIntegerFamily())
-                return new LongMinAggregator();
-            else if (dataType.isNumberFamily())
-                return new DoubleMinAggregator();
-        }
-        throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + dataType + "'");
-    }
-    
-    private boolean isSum() {
-        return FunctionDesc.FUNC_SUM.equalsIgnoreCase(funcName);
-    }
-
-    private boolean isCount() {
-        return FunctionDesc.FUNC_COUNT.equalsIgnoreCase(funcName);
-    }
-    
-    private boolean isMax() {
-        return FunctionDesc.FUNC_MAX.equalsIgnoreCase(funcName);
-    }
-    
-    private boolean isMin() {
-        return FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName);
-    }
-    
-    @Override
-    public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregationFactory.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregationFactory.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregationFactory.java
deleted file mode 100644
index 547d45f..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregationFactory.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import org.apache.kylin.aggregation.AggregationType;
-import org.apache.kylin.aggregation.IAggregationFactory;
-
-public class BasicAggregationFactory implements IAggregationFactory {
-
-    @Override
-    public AggregationType createAggregationType(String funcName, String dataType) {
-        return new BasicAggregation(funcName, dataType);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMaxAggregator.java
deleted file mode 100644
index ca044d0..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMaxAggregator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import java.math.BigDecimal;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-
-/**
- */
-@SuppressWarnings("serial")
-public class BigDecimalMaxAggregator extends MeasureAggregator<BigDecimal> {
-
-    BigDecimal max = null;
-
-    @Override
-    public void reset() {
-        max = null;
-    }
-
-    @Override
-    public void aggregate(BigDecimal value) {
-        if (max == null)
-            max = value;
-        else if (max.compareTo(value) < 0)
-            max = value;
-    }
-
-    @Override
-    public BigDecimal getState() {
-        return max;
-    }
-
-    @Override
-    public int getMemBytesEstimate() {
-        return guessBigDecimalMemBytes();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMinAggregator.java
deleted file mode 100644
index 3c3c85e..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMinAggregator.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import java.math.BigDecimal;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-
-/**
- */
-@SuppressWarnings("serial")
-public class BigDecimalMinAggregator extends MeasureAggregator<BigDecimal> {
-
-    BigDecimal max = null;
-
-    @Override
-    public void reset() {
-        max = null;
-    }
-
-    @Override
-    public void aggregate(BigDecimal value) {
-        if (max == null)
-            max = value;
-        else if (max.compareTo(value) > 0)
-            max = value;
-    }
-
-    @Override
-    public BigDecimal getState() {
-        return max;
-    }
-
-    @Override
-    public int getMemBytesEstimate() {
-        return guessBigDecimalMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSumAggregator.java
deleted file mode 100644
index 19aef3c..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSumAggregator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import java.math.BigDecimal;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-
-/**
- */
-@SuppressWarnings("serial")
-public class BigDecimalSumAggregator extends MeasureAggregator<BigDecimal> {
-
-    BigDecimal sum = new BigDecimal(0);
-
-    @Override
-    public void reset() {
-        sum = new BigDecimal(0);
-    }
-
-    @Override
-    public void aggregate(BigDecimal value) {
-        sum = sum.add(value);
-    }
-
-    @Override
-    public BigDecimal getState() {
-        return sum;
-    }
-
-    @Override
-    public int getMemBytesEstimate() {
-        return guessBigDecimalMemBytes();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
deleted file mode 100644
index 99896a6..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.DoubleMutable;
-
-/**
- */
-@SuppressWarnings("serial")
-public class DoubleMaxAggregator extends MeasureAggregator<DoubleMutable> {
-
-    DoubleMutable max = null;
-
-    @Override
-    public void reset() {
-        max = null;
-    }
-
-    @Override
-    public void aggregate(DoubleMutable value) {
-        if (max == null)
-            max = new DoubleMutable(value.get());
-        else if (max.get() < value.get())
-            max.set(value.get());
-    }
-
-    @Override
-    public DoubleMutable getState() {
-        return max;
-    }
-
-    @Override
-    public int getMemBytesEstimate() {
-        return guessDoubleMemBytes();
-    }
-
-}