You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/27 10:01:02 UTC

[07/13] incubator-kylin git commit: KYLIN-976 AggregationType interface, serializer and aggregator

KYLIN-976 AggregationType interface, serializer and aggregator


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/8f2a56cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/8f2a56cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/8f2a56cf

Branch: refs/heads/KYLIN-976
Commit: 8f2a56cfb5f340cd19c67bf22c8dfbdf2f9458a1
Parents: 1218bbd
Author: Li, Yang <ya...@ebay.com>
Authored: Wed Nov 25 11:13:18 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Nov 27 14:47:09 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/dataGen/FactTableGenerator.java   |   2 +-
 .../streaming/StreamingTableDataGenerator.java  |   2 +-
 .../common/datatype/BigDecimalSerializer.java   | 109 +++++++
 .../apache/kylin/common/datatype/DataType.java  | 289 ++++++++++++++++++
 .../common/datatype/DataTypeSerializer.java     | 108 +++++++
 .../common/datatype/DateTimeSerializer.java     |  62 ++++
 .../kylin/common/datatype/DoubleMutable.java    |  68 +++++
 .../kylin/common/datatype/DoubleSerializer.java |  81 +++++
 .../kylin/common/datatype/LongMutable.java      |  70 +++++
 .../kylin/common/datatype/LongSerializer.java   |  88 ++++++
 .../kylin/common/datatype/StringSerializer.java |  54 ++++
 .../kylin/common/debug/BackdoorToggles.java     |   5 -
 .../kylin/common/topn/DoublyLinkedList.java     |   2 -
 .../apache/kylin/common/util/DoubleMutable.java |  68 -----
 .../apache/kylin/common/util/LongMutable.java   |  70 -----
 .../datatype/BigDecimalSerializerTest.java      |  51 ++++
 .../apache/kylin/aggregation/Aggregation.java   |  42 ---
 .../kylin/aggregation/AggregationType.java      |  97 ++++++
 .../kylin/aggregation/DataTypeSerializer.java   | 118 --------
 .../kylin/aggregation/IAggregationFactory.java  |   6 +
 .../kylin/aggregation/MeasureAggregator.java    |  61 +---
 .../apache/kylin/aggregation/MeasureCodec.java  |   1 +
 .../aggregation/basic/BasicAggregation.java     | 128 ++++++++
 .../basic/BasicAggregationFactory.java          |  31 ++
 .../aggregation/basic/BigDecimalSerializer.java | 111 -------
 .../aggregation/basic/DateTimeSerializer.java   |  65 ----
 .../aggregation/basic/DoubleMaxAggregator.java  |   2 +-
 .../aggregation/basic/DoubleMinAggregator.java  |   2 +-
 .../aggregation/basic/DoubleSerializer.java     |  84 ------
 .../aggregation/basic/DoubleSumAggregator.java  |   2 +-
 .../aggregation/basic/LongMaxAggregator.java    |   2 +-
 .../aggregation/basic/LongMinAggregator.java    |   2 +-
 .../kylin/aggregation/basic/LongSerializer.java |  91 ------
 .../aggregation/basic/LongSumAggregator.java    |   2 +-
 .../aggregation/basic/StringSerializer.java     |  56 ----
 .../kylin/aggregation/hllc/HLLCAggregation.java |  78 +++++
 .../hllc/HLLCAggregationFactory.java            |  35 +++
 .../kylin/aggregation/hllc/HLLCSerializer.java  |   4 +-
 .../kylin/aggregation/hllc/LDCAggregator.java   |   2 +-
 .../kylin/aggregation/topn/TopNAggregation.java |  76 +++++
 .../topn/TopNAggregationFactory.java            |  35 +++
 .../aggregation/topn/TopNCounterSerializer.java |  12 +-
 .../kylin/cube/gridtable/CubeCodeSystem.java    |   4 +-
 .../kylin/cube/gridtable/CubeGridTable.java     |   1 -
 .../gridtable/CuboidToGridTableMapping.java     |   2 +-
 .../cube/gridtable/TrimmedCubeCodeSystem.java   |   3 +-
 .../cube/inmemcubing/InMemCubeBuilder.java      |   2 +-
 .../InMemCubeBuilderInputConverter.java         |   2 +-
 .../apache/kylin/cube/kv/RowKeyColumnOrder.java |   2 +-
 .../model/validation/rule/FunctionRule.java     |   2 +-
 .../java/org/apache/kylin/gridtable/GTInfo.java |   2 +-
 .../kylin/gridtable/GTSampleCodeSystem.java     |   2 +-
 .../apache/kylin/gridtable/UnitTestSupport.java |  14 +-
 .../basic/BigDecimalSerializerTest.java         |  53 ----
 .../topn/TopNCounterSerializerTest.java         |   9 +-
 .../gridtable/AggregationCacheMemSizeTest.java  |   4 +-
 .../kylin/gridtable/DictGridTableTest.java      |  16 +-
 .../kylin/gridtable/SimpleGridTableTest.java    |   2 +-
 .../gridtable/SimpleInvertedIndexTest.java      |  13 +-
 .../metadata/measure/MeasureCodecTest.java      |   4 +-
 .../apache/kylin/dict/DictionaryGenerator.java  |   9 +-
 .../apache/kylin/dict/DictionaryManager.java    |   4 +-
 .../apache/kylin/dict/NumberDictionaryTest.java |   9 +-
 .../filter/TimeConditionLiteralsReplacer.java   |   2 +-
 .../apache/kylin/metadata/model/ColumnDesc.java |   9 +-
 .../apache/kylin/metadata/model/DataType.java   | 295 -------------------
 .../kylin/metadata/model/FunctionDesc.java      |  11 +-
 .../apache/kylin/metadata/model/TblColRef.java  |   5 +-
 .../metadata/realization/SQLDigestUtil.java     |   6 +-
 .../org/apache/kylin/storage/tuple/Tuple.java   |   4 +-
 .../kylin/engine/mr/steps/CubeReducerTest.java  |   2 +-
 .../engine/mr/steps/MergeCuboidMapperTest.java  |  15 +-
 .../invertedindex/index/RawTableRecord.java     |   2 +-
 .../kylin/invertedindex/index/TableRecord.java  |   2 +-
 .../invertedindex/index/TableRecordInfo.java    |   6 +-
 .../index/TableRecordInfoDigest.java            |   6 +-
 .../invertedindex/measure/FixedHLLCodec.java    |   2 +-
 .../measure/FixedLenMeasureCodec.java           |   4 +-
 .../measure/FixedPointLongCodec.java            |   4 +-
 .../invertedindex/model/IIKeyValueCodec.java    |   6 +-
 .../measure/FixedPointLongCodecTest.java        |  13 +-
 .../hbase/cube/v1/CubeTupleConverter.java       |   4 -
 .../endpoint/EndpointAggregators.java           |   6 +-
 .../storage/hbase/steps/CreateHTableJob.java    |   2 +-
 .../storage/hbase/steps/RowValueDecoder.java    |   4 +-
 .../observer/AggregateRegionObserverTest.java   |   2 +-
 .../endpoint/EndpointAggregationTest.java       |   2 +-
 .../hbase/steps/RowValueDecoderTest.java        |   2 +-
 88 files changed, 1601 insertions(+), 1248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
index 5a0fee7..705c175 100644
--- a/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
+++ b/assembly/src/test/java/org/apache/kylin/job/dataGen/FactTableGenerator.java
@@ -37,6 +37,7 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.cube.CubeInstance;
@@ -45,7 +46,6 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
index 075a048..04a5b5b 100644
--- a/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/StreamingTableDataGenerator.java
@@ -5,9 +5,9 @@ import java.util.List;
 import java.util.Random;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
-import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/BigDecimalSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/BigDecimalSerializer.java
new file mode 100644
index 0000000..f19ce14
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/BigDecimalSerializer.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.datatype;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
+
+    private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class);
+
+    final DataType type;
+    final int maxLength;
+
+    int avoidVerbose = 0;
+
+    public BigDecimalSerializer(DataType type) {
+        this.type = type;
+        // see serialize(): 1 byte scale, 1 byte length, assume every 2 digits takes 1 byte
+        this.maxLength = 1 + 1 + (type.getPrecision() + 1) / 2;
+    }
+
+    @Override
+    public void serialize(BigDecimal value, ByteBuffer out) {
+        if (value.scale() > type.getScale()) {
+            if (avoidVerbose % 10000 == 0) {
+                logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (avoidVerbose++));
+            }
+            value = value.setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN);
+        }
+        byte[] bytes = value.unscaledValue().toByteArray();
+        if (bytes.length + 2 > maxLength) {
+            throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
+        }
+
+        BytesUtil.writeVInt(value.scale(), out);
+        BytesUtil.writeVInt(bytes.length, out);
+        out.put(bytes);
+    }
+
+    @Override
+    public BigDecimal deserialize(ByteBuffer in) {
+        int scale = BytesUtil.readVInt(in);
+        int n = BytesUtil.readVInt(in);
+
+        byte[] bytes = new byte[n];
+        in.get(bytes);
+
+        return new BigDecimal(new BigInteger(bytes), scale);
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        int mark = in.position();
+
+        @SuppressWarnings("unused")
+        int scale = BytesUtil.readVInt(in);
+        int n = BytesUtil.readVInt(in);
+        int len = in.position() - mark + n;
+
+        in.position(mark);
+        return len;
+    }
+
+    @Override
+    public int maxLength() {
+        return maxLength;
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return 8;
+    }
+
+    @Override
+    public BigDecimal valueOf(byte[] value) {
+        if (value == null)
+            return new BigDecimal(0);
+        else
+            return new BigDecimal(Bytes.toString(value));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/DataType.java b/core-common/src/main/java/org/apache/kylin/common/datatype/DataType.java
new file mode 100644
index 0000000..36fad00
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/DataType.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.datatype;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class DataType implements Serializable {
+
+    // standard sql types, ref: http://www.w3schools.com/sql/sql_datatypes_general.asp
+    public static final String VALID_TYPES_STRING = "any|char|varchar|boolean|binary" //
+            + "|integer|tinyint|smallint|bigint|decimal|numeric|float|real|double" //
+            + "|date|time|datetime|timestamp|byte|int|short|long|string";
+
+    private static final String TYPE_PATTEN_TAIL = "\\s*" //
+            + "(?:" + "[(]" + "([\\d\\s,]+)" + "[)]" + ")?";
+
+    private static final Pattern TYPE_PATTERN = Pattern.compile( //
+            "(" + VALID_TYPES_STRING + ")" + TYPE_PATTEN_TAIL, Pattern.CASE_INSENSITIVE);
+
+    private static final Pattern CUSTOM_TYPE_PATTERN = Pattern.compile( //
+            "(" + ".*?" + ")" + TYPE_PATTEN_TAIL, Pattern.CASE_INSENSITIVE);
+    
+    public static final Set<String> INTEGER_FAMILY = new HashSet<String>();
+    public static final Set<String> NUMBER_FAMILY = new HashSet<String>();
+    public static final Set<String> DATETIME_FAMILY = new HashSet<String>();
+    public static final Set<String> STRING_FAMILY = new HashSet<String>();
+    private static final Map<String, String> LEGACY_TYPE_MAP = new HashMap<String, String>();
+    static {
+        INTEGER_FAMILY.add("tinyint");
+        INTEGER_FAMILY.add("smallint");
+        INTEGER_FAMILY.add("integer");
+        INTEGER_FAMILY.add("bigint");
+
+        NUMBER_FAMILY.addAll(INTEGER_FAMILY);
+        NUMBER_FAMILY.add("float");
+        NUMBER_FAMILY.add("double");
+        NUMBER_FAMILY.add("decimal");
+        NUMBER_FAMILY.add("real");
+        NUMBER_FAMILY.add("numeric");
+
+        DATETIME_FAMILY.add("date");
+        DATETIME_FAMILY.add("time");
+        DATETIME_FAMILY.add("datetime");
+        DATETIME_FAMILY.add("timestamp");
+
+        STRING_FAMILY.add("varchar");
+        STRING_FAMILY.add("char");
+
+        LEGACY_TYPE_MAP.put("byte", "tinyint");
+        LEGACY_TYPE_MAP.put("int", "integer");
+        LEGACY_TYPE_MAP.put("short", "smallint");
+        LEGACY_TYPE_MAP.put("long", "bigint");
+        LEGACY_TYPE_MAP.put("string", "varchar");
+        LEGACY_TYPE_MAP.put("hllc10", "hllc(10)");
+        LEGACY_TYPE_MAP.put("hllc12", "hllc(12)");
+        LEGACY_TYPE_MAP.put("hllc14", "hllc(14)");
+        LEGACY_TYPE_MAP.put("hllc15", "hllc(15)");
+        LEGACY_TYPE_MAP.put("hllc16", "hllc(16)");
+    }
+
+    private static final ConcurrentMap<DataType, DataType> CACHE = new ConcurrentHashMap<DataType, DataType>();
+
+    public static final DataType ANY = DataType.getType("any");
+
+    public static DataType getType(String type) {
+        return getTypeInner(type, false);
+    }
+    
+    public static DataType getCustomType(String type) {
+        return getTypeInner(type, true);
+    }
+    
+    private static DataType getTypeInner(String type, boolean isCustom) {
+        if (type == null)
+            return null;
+
+        DataType dataType = new DataType(type, isCustom);
+        DataType cached = CACHE.get(dataType);
+        if (cached == null) {
+            CACHE.put(dataType, dataType);
+            cached = dataType;
+        }
+        return cached;
+    }
+
+    // ============================================================================
+
+    private String name;
+    private int precision;
+    private int scale;
+
+    DataType(String datatype, boolean isCustom) {
+        datatype = datatype.trim().toLowerCase();
+        datatype = replaceLegacy(datatype);
+
+        Pattern pattern = isCustom ? CUSTOM_TYPE_PATTERN : TYPE_PATTERN;
+        Matcher m = pattern.matcher(datatype);
+        if (m.matches() == false)
+            throw new IllegalArgumentException("bad data type -- " + datatype + ", does not match " + pattern);
+
+        name = replaceLegacy(m.group(1));
+        precision = -1;
+        scale = -1;
+
+        String leftover = m.group(2);
+        if (leftover != null) {
+            String[] parts = leftover.split("\\s*,\\s*");
+            for (int i = 0; i < parts.length; i++) {
+                int n;
+                try {
+                    n = Integer.parseInt(parts[i]);
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("bad data type -- " + datatype + ", precision/scale not numeric");
+                }
+                if (i == 0)
+                    precision = n;
+                else if (i == 1)
+                    scale = n;
+                else
+                    throw new IllegalArgumentException("bad data type -- " + datatype + ", too many precision/scale parts");
+            }
+        }
+
+        // FIXME 256 for unknown string precision
+        if ((name.equals("char") || name.equals("varchar")) && precision == -1) {
+            precision = 256; // to save memory at frontend, e.g. tableau will
+                             // allocate memory according to this
+        }
+
+        // FIXME (19,4) for unknown decimal precision
+        if ((name.equals("decimal") || name.equals("numeric")) && precision == -1) {
+            precision = 19;
+            scale = 4;
+        }
+    }
+
+    private String replaceLegacy(String str) {
+        String replace = LEGACY_TYPE_MAP.get(str);
+        return replace == null ? str : replace;
+    }
+
+    public int getStorageBytesEstimate() {
+        return DataTypeSerializer.create(this).getStorageBytesEstimate();
+    }
+
+    public boolean isStringFamily() {
+        return STRING_FAMILY.contains(name);
+    }
+
+    public boolean isIntegerFamily() {
+        return INTEGER_FAMILY.contains(name);
+    }
+
+    public boolean isNumberFamily() {
+        return NUMBER_FAMILY.contains(name);
+    }
+
+    public boolean isDateTimeFamily() {
+        return DATETIME_FAMILY.contains(name);
+    }
+
+    public boolean isDate() {
+        return name.equals("date");
+    }
+
+    public boolean isTime() {
+        return name.equals("time");
+    }
+
+    public boolean isTimestamp() {
+        return name.equals("timestamp");
+    }
+
+    public boolean isDatetime() {
+        return name.equals("datetime");
+    }
+
+    public boolean isTinyInt() {
+        return name.equals("tinyint");
+    }
+
+    public boolean isSmallInt() {
+        return name.equals("smallint");
+    }
+
+    public boolean isInt() {
+        return name.equals("integer");
+    }
+
+    public boolean isBigInt() {
+        return name.equals("bigint");
+    }
+
+    public boolean isFloat() {
+        return name.equals("float");
+    }
+
+    public boolean isDouble() {
+        return name.equals("double");
+    }
+
+    public boolean isDecimal() {
+        return name.equals("decimal");
+    }
+
+    public boolean isHLLC() {
+        return name.equals("hllc");
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public int getPrecision() {
+        return precision;
+    }
+
+    public int getScale() {
+        return scale;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((name == null) ? 0 : name.hashCode());
+        result = prime * result + precision;
+        result = prime * result + scale;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        DataType other = (DataType) obj;
+        if (name == null) {
+            if (other.name != null)
+                return false;
+        } else if (!name.equals(other.name))
+            return false;
+        if (precision != other.precision)
+            return false;
+        if (scale != other.scale)
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        if (precision < 0 && scale < 0)
+            return name;
+        else if (scale < 0)
+            return name + "(" + precision + ")";
+        else
+            return name + "(" + precision + "," + scale + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/DataTypeSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/DataTypeSerializer.java
new file mode 100644
index 0000000..99fa3fd
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/DataTypeSerializer.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.datatype;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.util.BytesSerializer;
+
+import com.google.common.collect.Maps;
+
+/**
+ * @author yangli9
+ * 
+ * Note: the implementations MUST be thread-safe.
+ */
+abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
+
+    final static Map<String, Class<?>> implementations;
+    static {
+        HashMap<String, Class<?>> impl = Maps.newHashMap();
+        impl.put("varchar", StringSerializer.class);
+        impl.put("decimal", BigDecimalSerializer.class);
+        impl.put("double", DoubleSerializer.class);
+        impl.put("float", DoubleSerializer.class);
+        impl.put("bigint", LongSerializer.class);
+        impl.put("long", LongSerializer.class);
+        impl.put("integer", LongSerializer.class);
+        impl.put("int", LongSerializer.class);
+        impl.put("smallint", LongSerializer.class);
+        impl.put("date", DateTimeSerializer.class);
+        impl.put("datetime", DateTimeSerializer.class);
+        impl.put("timestamp", DateTimeSerializer.class);
+        implementations = Collections.unmodifiableMap(impl);
+    }
+    
+    public static boolean hasRegistered(String dataTypeName) {
+        return implementations.containsKey(dataTypeName);
+    }
+    
+    public static void register(String dataTypeName, Class<? extends DataTypeSerializer<?>> impl) {
+        implementations.put(dataTypeName, impl);
+    }
+
+    public static DataTypeSerializer<?> create(String dataType) {
+        return create(DataType.getType(dataType));
+    }
+
+    public static DataTypeSerializer<?> create(DataType type) {
+        Class<?> clz = implementations.get(type.getName());
+        if (clz == null)
+            throw new RuntimeException("No DataTypeSerializer for type " + type);
+
+        try {
+            return (DataTypeSerializer<?>) clz.getConstructor(DataType.class).newInstance(type);
+        } catch (Exception e) {
+            throw new RuntimeException(e); // never happen
+        }
+    }
+    
+    /** peek into buffer and return the length of serialization */
+    abstract public int peekLength(ByteBuffer in);
+
+    /** return the max number of bytes to the longest serialization */
+    abstract public int maxLength();
+
+    /** get an estimate of size in bytes of the serialized data */
+    abstract public int getStorageBytesEstimate();
+
+    /** convert from String to obj (string often come as byte[] in mapred) */
+    abstract public T valueOf(byte[] value);
+
+    /** convert from String to obj */
+    public T valueOf(String value) {
+        try {
+            return valueOf(value.getBytes("UTF-8"));
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e); // never happen
+        }
+    }
+
+    /** convert from obj to string */
+    public String toString(T value) {
+        if (value == null)
+            return "NULL";
+        else
+            return value.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/DateTimeSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/DateTimeSerializer.java
new file mode 100644
index 0000000..d3ef7cd
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/DateTimeSerializer.java
@@ -0,0 +1,62 @@
+package org.apache.kylin.common.datatype;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.DateFormat;
+
+public class DateTimeSerializer extends DataTypeSerializer<LongMutable> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+
+    public DateTimeSerializer(DataType type) {
+    }
+
+    @Override
+    public void serialize(LongMutable value, ByteBuffer out) {
+        out.putLong(value.get());
+    }
+
+    private LongMutable current() {
+        LongMutable l = current.get();
+        if (l == null) {
+            l = new LongMutable();
+            current.set(l);
+        }
+        return l;
+    }
+
+    @Override
+    public LongMutable deserialize(ByteBuffer in) {
+        LongMutable l = current();
+        l.set(in.getLong());
+        return l;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return 8;
+    }
+
+    @Override
+    public int maxLength() {
+        return 8;
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return 8;
+    }
+
+    @Override
+    public LongMutable valueOf(byte[] value) {
+        LongMutable l = current();
+        if (value == null)
+            l.set(0L);
+        else
+            l.set(DateFormat.stringToMillis(Bytes.toString(value)));
+        return l;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleMutable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleMutable.java b/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleMutable.java
new file mode 100644
index 0000000..a32fbd8
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleMutable.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.datatype;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public class DoubleMutable implements Comparable<DoubleMutable>, Serializable {
+
+    private double v;
+
+    public DoubleMutable() {
+        this(0);
+    }
+
+    public DoubleMutable(double v) {
+        set(v);
+    }
+
+    public double get() {
+        return v;
+    }
+
+    public void set(double v) {
+        this.v = v;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof DoubleMutable)) {
+            return false;
+        }
+        DoubleMutable other = (DoubleMutable) o;
+        return this.v == other.v;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int) Double.doubleToLongBits(v);
+    }
+
+    @Override
+    public int compareTo(DoubleMutable o) {
+        return (v < o.v ? -1 : (v == o.v ? 0 : 1));
+    }
+
+    @Override
+    public String toString() {
+        return Double.toString(v);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleSerializer.java
new file mode 100644
index 0000000..f128576
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/DoubleSerializer.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.datatype;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.Bytes;
+
+/**
+ */
+public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<DoubleMutable> current = new ThreadLocal<DoubleMutable>();
+
+    public DoubleSerializer(DataType type) {
+    }
+
+    @Override
+    public void serialize(DoubleMutable value, ByteBuffer out) {
+        out.putDouble(value.get());
+    }
+
+    private DoubleMutable current() {
+        DoubleMutable d = current.get();
+        if (d == null) {
+            d = new DoubleMutable();
+            current.set(d);
+        }
+        return d;
+    }
+
+    @Override
+    public DoubleMutable deserialize(ByteBuffer in) {
+        DoubleMutable d = current();
+        d.set(in.getDouble());
+        return d;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return 8;
+    }
+
+    @Override
+    public int maxLength() {
+        return 8;
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return 8;
+    }
+
+    @Override
+    public DoubleMutable valueOf(byte[] value) {
+        DoubleMutable d = current();
+        if (value == null)
+            d.set(0d);
+        else
+            d.set(Double.parseDouble(Bytes.toString(value)));
+        return d;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/LongMutable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/LongMutable.java b/core-common/src/main/java/org/apache/kylin/common/datatype/LongMutable.java
new file mode 100644
index 0000000..72b540c
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/LongMutable.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.datatype;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public class LongMutable implements Comparable<LongMutable>, Serializable {
+
+    private long v;
+
+    public LongMutable() {
+        this(0);
+    }
+
+    public LongMutable(long v) {
+        set(v);
+    }
+
+    public long get() {
+        return v;
+    }
+
+    public void set(long v) {
+        this.v = v;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof LongMutable)) {
+            return false;
+        }
+        LongMutable other = (LongMutable) o;
+        return this.v == other.v;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int) v;
+    }
+
+    @Override
+    public int compareTo(LongMutable o) {
+        long thisValue = this.v;
+        long thatValue = o.v;
+        return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+    }
+
+    @Override
+    public String toString() {
+        return Long.toString(v);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/LongSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/LongSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/LongSerializer.java
new file mode 100644
index 0000000..5c5e10f
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/LongSerializer.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.datatype;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ */
+public class LongSerializer extends DataTypeSerializer<LongMutable> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+
+    public LongSerializer(DataType type) {
+    }
+
+    @Override
+    public void serialize(LongMutable value, ByteBuffer out) {
+        BytesUtil.writeVLong(value.get(), out);
+    }
+
+    private LongMutable current() {
+        LongMutable l = current.get();
+        if (l == null) {
+            l = new LongMutable();
+            current.set(l);
+        }
+        return l;
+    }
+
+    @Override
+    public LongMutable deserialize(ByteBuffer in) {
+        LongMutable l = current();
+        l.set(BytesUtil.readVLong(in));
+        return l;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        int mark = in.position();
+
+        BytesUtil.readVLong(in);
+        int len = in.position() - mark;
+
+        in.position(mark);
+        return len;
+    }
+
+    @Override
+    public int maxLength() {
+        return 9; // vlong: 1 + 8
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return 5;
+    }
+
+    @Override
+    public LongMutable valueOf(byte[] value) {
+        LongMutable l = current();
+        if (value == null)
+            l.set(0L);
+        else
+            l.set(Long.parseLong(Bytes.toString(value)));
+        return l;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/datatype/StringSerializer.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/datatype/StringSerializer.java b/core-common/src/main/java/org/apache/kylin/common/datatype/StringSerializer.java
new file mode 100644
index 0000000..b3562eb
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/datatype/StringSerializer.java
@@ -0,0 +1,54 @@
+package org.apache.kylin.common.datatype;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+
+public class StringSerializer extends DataTypeSerializer<String> {
+
+    final DataType type;
+    final int maxLength;
+
+    public StringSerializer(DataType type) {
+        this.type = type;
+        // see serialize(): 2 byte length, rest is String.toBytes()
+        this.maxLength = 2 + type.getPrecision();
+    }
+
+    @Override
+    public void serialize(String value, ByteBuffer out) {
+        int start = out.position();
+
+        BytesUtil.writeUTFString(value, out);
+
+        if (out.position() - start > maxLength)
+            throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
+    }
+
+    @Override
+    public String deserialize(ByteBuffer in) {
+        return BytesUtil.readUTFString(in);
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return BytesUtil.peekByteArrayLength(in);
+    }
+
+    @Override
+    public int maxLength() {
+        return maxLength;
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return maxLength;
+    }
+
+    @Override
+    public String valueOf(byte[] value) {
+        return Bytes.toString(value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
index 7304b8b..f3745d7 100644
--- a/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
+++ b/core-common/src/main/java/org/apache/kylin/common/debug/BackdoorToggles.java
@@ -20,15 +20,10 @@ package org.apache.kylin.common.debug;
 
 import java.util.Map;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 /**
  */
 public class BackdoorToggles {
 
-    private static final Logger logger = LoggerFactory.getLogger(BackdoorToggles.class);
-
     private static final ThreadLocal<Map<String, String>> _backdoorToggles = new ThreadLocal<Map<String, String>>();
 
     public static void setToggles(Map<String, String> toggles) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
index 1520ce1..4a3d6e8 100644
--- a/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
+++ b/core-common/src/main/java/org/apache/kylin/common/topn/DoublyLinkedList.java
@@ -18,8 +18,6 @@
 
 package org.apache.kylin.common.topn;
 
-import java.util.ConcurrentModificationException;
-import java.util.Iterator;
 
 /**
  * Modified from DoublyLinkedList.java in https://github.com/addthis/stream-lib

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java b/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java
deleted file mode 100644
index 520cd74..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.util;
-
-import java.io.Serializable;
-
-@SuppressWarnings("serial")
-public class DoubleMutable implements Comparable<DoubleMutable>, Serializable {
-
-    private double v;
-
-    public DoubleMutable() {
-        this(0);
-    }
-
-    public DoubleMutable(double v) {
-        set(v);
-    }
-
-    public double get() {
-        return v;
-    }
-
-    public void set(double v) {
-        this.v = v;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof DoubleMutable)) {
-            return false;
-        }
-        DoubleMutable other = (DoubleMutable) o;
-        return this.v == other.v;
-    }
-
-    @Override
-    public int hashCode() {
-        return (int) Double.doubleToLongBits(v);
-    }
-
-    @Override
-    public int compareTo(DoubleMutable o) {
-        return (v < o.v ? -1 : (v == o.v ? 0 : 1));
-    }
-
-    @Override
-    public String toString() {
-        return Double.toString(v);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java b/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java
deleted file mode 100644
index 238bb86..0000000
--- a/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.util;
-
-import java.io.Serializable;
-
-@SuppressWarnings("serial")
-public class LongMutable implements Comparable<LongMutable>, Serializable {
-
-    private long v;
-
-    public LongMutable() {
-        this(0);
-    }
-
-    public LongMutable(long v) {
-        set(v);
-    }
-
-    public long get() {
-        return v;
-    }
-
-    public void set(long v) {
-        this.v = v;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-        if (!(o instanceof LongMutable)) {
-            return false;
-        }
-        LongMutable other = (LongMutable) o;
-        return this.v == other.v;
-    }
-
-    @Override
-    public int hashCode() {
-        return (int) v;
-    }
-
-    @Override
-    public int compareTo(LongMutable o) {
-        long thisValue = this.v;
-        long thatValue = o.v;
-        return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
-    }
-
-    @Override
-    public String toString() {
-        return Long.toString(v);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-common/src/test/java/org/apache/kylin/common/datatype/BigDecimalSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/datatype/BigDecimalSerializerTest.java b/core-common/src/test/java/org/apache/kylin/common/datatype/BigDecimalSerializerTest.java
new file mode 100644
index 0000000..cba6795
--- /dev/null
+++ b/core-common/src/test/java/org/apache/kylin/common/datatype/BigDecimalSerializerTest.java
@@ -0,0 +1,51 @@
+package org.apache.kylin.common.datatype;
+
+import static org.junit.Assert.*;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ */
+public class BigDecimalSerializerTest {
+
+    private static BigDecimalSerializer bigDecimalSerializer;
+
+    @BeforeClass
+    public static void beforeClass() {
+        bigDecimalSerializer = new BigDecimalSerializer(DataType.getType("decimal"));
+    }
+
+    @Test
+    public void testNormal() {
+        BigDecimal input = new BigDecimal("1234.1234");
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        buffer.mark();
+        bigDecimalSerializer.serialize(input, buffer);
+        buffer.reset();
+        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
+        assertEquals(input, output);
+    }
+
+    @Test
+    public void testScaleOutOfRange() {
+        BigDecimal input = new BigDecimal("1234.1234567890");
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        buffer.mark();
+        bigDecimalSerializer.serialize(input, buffer);
+        buffer.reset();
+        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
+        assertEquals(input.setScale(bigDecimalSerializer.type.getScale(), BigDecimal.ROUND_HALF_EVEN), output);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testOutOfPrecision() {
+        BigDecimal input = new BigDecimal("66855344214907231736.4924");
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        bigDecimalSerializer.serialize(input, buffer);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java b/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java
deleted file mode 100644
index 193c5de..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package org.apache.kylin.aggregation;
-
-import java.util.List;
-
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-abstract public class Aggregation {
-    
-    /* ============================================================================
-     * Define
-     * ---------------------------------------------------------------------------- */
-    
-    abstract public DataType getAggregationDataType();
-    
-    abstract public DataType getResultDataType();
-    
-    abstract public void validate(MeasureDesc measureDesc) throws IllegalArgumentException;
-    
-    /* ============================================================================
-     * Build
-     * ---------------------------------------------------------------------------- */
-    
-    abstract public DataTypeSerializer<?> getSeralizer();
-    
-    abstract public MeasureAggregator<?> getAggregator();
- 
-    abstract public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc);
-    
-    abstract public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts);
-
-    /* ============================================================================
-     * Cube Selection
-     * ---------------------------------------------------------------------------- */
-    
-    /* ============================================================================
-     * Query
-     * ---------------------------------------------------------------------------- */
-    
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/AggregationType.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/AggregationType.java b/core-cube/src/main/java/org/apache/kylin/aggregation/AggregationType.java
new file mode 100644
index 0000000..81345b7
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/AggregationType.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.aggregation.basic.BasicAggregationFactory;
+import org.apache.kylin.aggregation.hllc.HLLCAggregationFactory;
+import org.apache.kylin.aggregation.topn.TopNAggregationFactory;
+import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.common.datatype.DataTypeSerializer;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Maps;
+
+abstract public class AggregationType {
+    
+    private static final Map<String, IAggregationFactory> factoryRegistry = Maps.newConcurrentMap();
+    private static final IAggregationFactory defaultFactory = new BasicAggregationFactory();
+    
+    static {
+        factoryRegistry.put(FunctionDesc.FUNC_COUNT_DISTINCT, new HLLCAggregationFactory());
+        factoryRegistry.put(FunctionDesc.FUNC_TOP_N, new TopNAggregationFactory());
+    }
+    
+    public static AggregationType create(String funcName, String dataType) {
+        funcName = funcName.toUpperCase();
+        dataType = dataType.toLowerCase();
+        
+        IAggregationFactory factory = factoryRegistry.get(funcName);
+        if (factory == null)
+            factory = defaultFactory;
+        
+        AggregationType result = factory.createAggregationType(funcName, dataType);
+        
+        // register serializer for aggr data type
+        DataType aggregationDataType = result.getAggregationDataType();
+        if (DataTypeSerializer.hasRegistered(aggregationDataType.getName()) == false) {
+            DataTypeSerializer.register(aggregationDataType.getName(), result.getAggregationDataSeralizer());
+        }
+        
+        return result;
+    }
+    
+    /* ============================================================================
+     * Define
+     * ---------------------------------------------------------------------------- */
+    
+    abstract public DataType getAggregationDataType();
+    
+    abstract public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer();
+    
+    abstract public void validate(MeasureDesc measureDesc) throws IllegalArgumentException;
+    
+    /* ============================================================================
+     * Build
+     * ---------------------------------------------------------------------------- */
+    
+    abstract public MeasureAggregator<?> newAggregator();
+ 
+    abstract public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc);
+    
+    abstract public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts);
+
+    /* ============================================================================
+     * Cube Selection
+     * ---------------------------------------------------------------------------- */
+    
+    /* ============================================================================
+     * Query
+     * ---------------------------------------------------------------------------- */
+    
+    /* ============================================================================
+     * Storage
+     * ---------------------------------------------------------------------------- */
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java
deleted file mode 100644
index df6833c..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation;
-
-import java.io.UnsupportedEncodingException;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.kylin.aggregation.basic.BigDecimalSerializer;
-import org.apache.kylin.aggregation.basic.DateTimeSerializer;
-import org.apache.kylin.aggregation.basic.DoubleSerializer;
-import org.apache.kylin.aggregation.basic.LongSerializer;
-import org.apache.kylin.aggregation.basic.StringSerializer;
-import org.apache.kylin.aggregation.hllc.HLLCSerializer;
-import org.apache.kylin.aggregation.topn.TopNCounterSerializer;
-import org.apache.kylin.common.util.BytesSerializer;
-import org.apache.kylin.metadata.model.DataType;
-
-import com.google.common.collect.Maps;
-
-/**
- * @author yangli9
- * 
- * Note: the implementations MUST be thread-safe.
- * 
- */
-abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
-
-    final static Map<String, Class<?>> implementations;
-    static {
-        HashMap<String, Class<?>> impl = Maps.newHashMap();
-        impl.put("varchar", StringSerializer.class);
-        impl.put("decimal", BigDecimalSerializer.class);
-        impl.put("double", DoubleSerializer.class);
-        impl.put("float", DoubleSerializer.class);
-        impl.put("bigint", LongSerializer.class);
-        impl.put("long", LongSerializer.class);
-        impl.put("integer", LongSerializer.class);
-        impl.put("int", LongSerializer.class);
-        impl.put("smallint", LongSerializer.class);
-        impl.put("date", DateTimeSerializer.class);
-        impl.put("datetime", DateTimeSerializer.class);
-        impl.put("timestamp", DateTimeSerializer.class);
-        implementations = Collections.unmodifiableMap(impl);
-
-    }
-
-    public static DataTypeSerializer<?> create(String dataType) {
-        return create(DataType.getInstance(dataType));
-    }
-
-    public static DataTypeSerializer<?> create(DataType type) {
-        if (type.isHLLC()) {
-            return new HLLCSerializer(type);
-        }
-
-        if (type.isTopN()) {
-            return new TopNCounterSerializer(type);
-        }
-
-        Class<?> clz = implementations.get(type.getName());
-        if (clz == null)
-            throw new RuntimeException("No MeasureSerializer for type " + type);
-
-        try {
-            return (DataTypeSerializer<?>) clz.getConstructor(DataType.class).newInstance(type);
-        } catch (Exception e) {
-            throw new RuntimeException(e); // never happen
-        }
-    }
-    
-    /** peek into buffer and return the length of serialization */
-    abstract public int peekLength(ByteBuffer in);
-
-    /** return the max number of bytes to the longest serialization */
-    abstract public int maxLength();
-
-    /** get an estimate of size in bytes of the serialized data */
-    abstract public int getStorageBytesEstimate();
-
-    /** convert from String to obj (string often come as byte[] in mapred) */
-    abstract public T valueOf(byte[] value);
-
-    /** convert from String to obj */
-    public T valueOf(String value) {
-        try {
-            return valueOf(value.getBytes("UTF-8"));
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException(e); // never happen
-        }
-    }
-
-    /** convert from obj to string */
-    public String toString(T value) {
-        if (value == null)
-            return "NULL";
-        else
-            return value.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/IAggregationFactory.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/IAggregationFactory.java b/core-cube/src/main/java/org/apache/kylin/aggregation/IAggregationFactory.java
new file mode 100644
index 0000000..cbdd9da
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/IAggregationFactory.java
@@ -0,0 +1,6 @@
+package org.apache.kylin.aggregation;
+
+public interface IAggregationFactory {
+
+    public AggregationType createAggregationType(String funcName, String dataType);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
index 9c8945d..6968a9d 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
@@ -18,21 +18,6 @@
 
 package org.apache.kylin.aggregation;
 
-import org.apache.kylin.aggregation.basic.BigDecimalMaxAggregator;
-import org.apache.kylin.aggregation.basic.BigDecimalMinAggregator;
-import org.apache.kylin.aggregation.basic.BigDecimalSumAggregator;
-import org.apache.kylin.aggregation.basic.DoubleMaxAggregator;
-import org.apache.kylin.aggregation.basic.DoubleMinAggregator;
-import org.apache.kylin.aggregation.basic.DoubleSumAggregator;
-import org.apache.kylin.aggregation.basic.LongMaxAggregator;
-import org.apache.kylin.aggregation.basic.LongMinAggregator;
-import org.apache.kylin.aggregation.basic.LongSumAggregator;
-import org.apache.kylin.aggregation.hllc.HLLCAggregator;
-import org.apache.kylin.aggregation.hllc.LDCAggregator;
-import org.apache.kylin.aggregation.topn.TopNAggregator;
-import org.apache.kylin.metadata.model.DataType;
-import org.apache.kylin.metadata.model.FunctionDesc;
-
 import java.io.Serializable;
 
 /**
@@ -40,50 +25,8 @@ import java.io.Serializable;
 @SuppressWarnings("serial")
 abstract public class MeasureAggregator<V> implements Serializable {
 
-    public static MeasureAggregator<?> create(String funcName, String returnType) {
-        if (FunctionDesc.FUNC_SUM.equalsIgnoreCase(funcName) || FunctionDesc.FUNC_COUNT.equalsIgnoreCase(funcName)) {
-            if (isInteger(returnType))
-                return new LongSumAggregator();
-            else if (isBigDecimal(returnType))
-                return new BigDecimalSumAggregator();
-            else if (isDouble(returnType))
-                return new DoubleSumAggregator();
-        } else if (FunctionDesc.FUNC_COUNT_DISTINCT.equalsIgnoreCase(funcName)) {
-            DataType hllcType = DataType.getInstance(returnType);
-            if (hllcType.isHLLC())
-                return new HLLCAggregator(hllcType.getPrecision());
-            else
-                return new LDCAggregator();
-        } else if (FunctionDesc.FUNC_MAX.equalsIgnoreCase(funcName)) {
-            if (isInteger(returnType))
-                return new LongMaxAggregator();
-            else if (isBigDecimal(returnType))
-                return new BigDecimalMaxAggregator();
-            else if (isDouble(returnType))
-                return new DoubleMaxAggregator();
-        } else if (FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName)) {
-            if (isInteger(returnType))
-                return new LongMinAggregator();
-            else if (isBigDecimal(returnType))
-                return new BigDecimalMinAggregator();
-            else if (isDouble(returnType))
-                return new DoubleMinAggregator();
-        } else if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName)) {
-            return new TopNAggregator();
-        }
-        throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + returnType + "'");
-    }
-
-    public static boolean isBigDecimal(String type) {
-        return type.startsWith("decimal");
-    }
-
-    public static boolean isDouble(String type) {
-        return "double".equalsIgnoreCase(type) || "float".equalsIgnoreCase(type) || "real".equalsIgnoreCase(type);
-    }
-
-    public static boolean isInteger(String type) {
-        return "long".equalsIgnoreCase(type) || "bigint".equalsIgnoreCase(type) || "int".equalsIgnoreCase(type) || "integer".equalsIgnoreCase(type);
+    public static MeasureAggregator<?> create(String funcName, String dataType) {
+        return AggregationType.create(funcName, dataType).newAggregator();
     }
 
     public static int guessBigDecimalMemBytes() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
index cbcb3a8..8721954 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
@@ -21,6 +21,7 @@ package org.apache.kylin.aggregation;
 import java.nio.ByteBuffer;
 import java.util.Collection;
 
+import org.apache.kylin.common.datatype.DataTypeSerializer;
 import org.apache.kylin.metadata.model.MeasureDesc;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregation.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregation.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregation.java
new file mode 100644
index 0000000..7ea88bd
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregation.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import java.util.List;
+
+import org.apache.kylin.aggregation.AggregationType;
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.datatype.BigDecimalSerializer;
+import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.common.datatype.DataTypeSerializer;
+import org.apache.kylin.common.datatype.DateTimeSerializer;
+import org.apache.kylin.common.datatype.DoubleSerializer;
+import org.apache.kylin.common.datatype.LongSerializer;
+import org.apache.kylin.common.datatype.StringSerializer;
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class BasicAggregation extends AggregationType {
+    
+    private final String funcName;
+    private final DataType dataType;
+
+    public BasicAggregation(String funcName, String dataType) {
+        this.funcName = funcName;
+        this.dataType = DataType.getType(dataType);
+    }
+
+    @Override
+    public DataType getAggregationDataType() {
+        return dataType;
+    }
+
+    public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
+        if (dataType.isStringFamily())
+            return StringSerializer.class;
+        else if (dataType.isIntegerFamily())
+            return LongSerializer.class;
+        else if (dataType.isDecimal())
+            return BigDecimalSerializer.class;
+        else if (dataType.isNumberFamily())
+            return DoubleSerializer.class;
+        else if (dataType.isDateTimeFamily())
+            return DateTimeSerializer.class;
+        else
+            throw new IllegalArgumentException("No default serializer for type " + dataType);
+    }
+    
+    @Override
+    public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public MeasureAggregator<?> newAggregator() {
+        if (isSum() || isCount()) {
+            if (dataType.isDecimal())
+                return new BigDecimalSumAggregator();
+            else if (dataType.isIntegerFamily())
+                return new LongSumAggregator();
+            else if (dataType.isNumberFamily())
+                return new DoubleSumAggregator();
+        } else if (isMax()) {
+            if (dataType.isDecimal())
+                return new BigDecimalMaxAggregator();
+            else if (dataType.isIntegerFamily())
+                return new LongMaxAggregator();
+            else if (dataType.isNumberFamily())
+                return new DoubleMaxAggregator();
+        } else if (isMin()) {
+            if (dataType.isDecimal())
+                return new BigDecimalMinAggregator();
+            else if (dataType.isIntegerFamily())
+                return new LongMinAggregator();
+            else if (dataType.isNumberFamily())
+                return new DoubleMinAggregator();
+        }
+        throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + dataType + "'");
+    }
+    
+    private boolean isSum() {
+        return FunctionDesc.FUNC_SUM.equalsIgnoreCase(funcName);
+    }
+
+    private boolean isCount() {
+        return FunctionDesc.FUNC_COUNT.equalsIgnoreCase(funcName);
+    }
+    
+    private boolean isMax() {
+        return FunctionDesc.FUNC_MAX.equalsIgnoreCase(funcName);
+    }
+    
+    private boolean isMin() {
+        return FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName);
+    }
+    
+    @Override
+    public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregationFactory.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregationFactory.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregationFactory.java
new file mode 100644
index 0000000..547d45f
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BasicAggregationFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import org.apache.kylin.aggregation.AggregationType;
+import org.apache.kylin.aggregation.IAggregationFactory;
+
+public class BasicAggregationFactory implements IAggregationFactory {
+
+    @Override
+    public AggregationType createAggregationType(String funcName, String dataType) {
+        return new BasicAggregation(funcName, dataType);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java
deleted file mode 100644
index 9f7c3cf..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import java.math.BigDecimal;
-import java.math.BigInteger;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.aggregation.DataTypeSerializer;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.metadata.model.DataType;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author yangli9
- * 
- */
-public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
-
-    private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class);
-
-    final DataType type;
-    final int maxLength;
-
-    int avoidVerbose = 0;
-
-    public BigDecimalSerializer(DataType type) {
-        this.type = type;
-        // see serialize(): 1 byte scale, 1 byte length, assume every 2 digits takes 1 byte
-        this.maxLength = 1 + 1 + (type.getPrecision() + 1) / 2;
-    }
-
-    @Override
-    public void serialize(BigDecimal value, ByteBuffer out) {
-        if (value.scale() > type.getScale()) {
-            if (avoidVerbose % 10000 == 0) {
-                logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (avoidVerbose++));
-            }
-            value = value.setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN);
-        }
-        byte[] bytes = value.unscaledValue().toByteArray();
-        if (bytes.length + 2 > maxLength) {
-            throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
-        }
-
-        BytesUtil.writeVInt(value.scale(), out);
-        BytesUtil.writeVInt(bytes.length, out);
-        out.put(bytes);
-    }
-
-    @Override
-    public BigDecimal deserialize(ByteBuffer in) {
-        int scale = BytesUtil.readVInt(in);
-        int n = BytesUtil.readVInt(in);
-
-        byte[] bytes = new byte[n];
-        in.get(bytes);
-
-        return new BigDecimal(new BigInteger(bytes), scale);
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        int mark = in.position();
-
-        @SuppressWarnings("unused")
-        int scale = BytesUtil.readVInt(in);
-        int n = BytesUtil.readVInt(in);
-        int len = in.position() - mark + n;
-
-        in.position(mark);
-        return len;
-    }
-
-    @Override
-    public int maxLength() {
-        return maxLength;
-    }
-
-    @Override
-    public int getStorageBytesEstimate() {
-        return 8;
-    }
-
-    @Override
-    public BigDecimal valueOf(byte[] value) {
-        if (value == null)
-            return new BigDecimal(0);
-        else
-            return new BigDecimal(Bytes.toString(value));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java
deleted file mode 100644
index 0bf4aba..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java
+++ /dev/null
@@ -1,65 +0,0 @@
-package org.apache.kylin.aggregation.basic;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.aggregation.DataTypeSerializer;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.LongMutable;
-import org.apache.kylin.metadata.model.DataType;
-
-public class DateTimeSerializer extends DataTypeSerializer<LongMutable> {
-
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
-
-    public DateTimeSerializer(DataType type) {
-    }
-
-    @Override
-    public void serialize(LongMutable value, ByteBuffer out) {
-        out.putLong(value.get());
-    }
-
-    private LongMutable current() {
-        LongMutable l = current.get();
-        if (l == null) {
-            l = new LongMutable();
-            current.set(l);
-        }
-        return l;
-    }
-
-    @Override
-    public LongMutable deserialize(ByteBuffer in) {
-        LongMutable l = current();
-        l.set(in.getLong());
-        return l;
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        return 8;
-    }
-
-    @Override
-    public int maxLength() {
-        return 8;
-    }
-
-    @Override
-    public int getStorageBytesEstimate() {
-        return 8;
-    }
-
-    @Override
-    public LongMutable valueOf(byte[] value) {
-        LongMutable l = current();
-        if (value == null)
-            l.set(0L);
-        else
-            l.set(DateFormat.stringToMillis(Bytes.toString(value)));
-        return l;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
index f09614d..99896a6 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
@@ -19,7 +19,7 @@
 package org.apache.kylin.aggregation.basic;
 
 import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.util.DoubleMutable;
+import org.apache.kylin.common.datatype.DoubleMutable;
 
 /**
  */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
index b93c15c..7430c4e 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
@@ -19,7 +19,7 @@
 package org.apache.kylin.aggregation.basic;
 
 import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.util.DoubleMutable;
+import org.apache.kylin.common.datatype.DoubleMutable;
 
 /**
  */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java
deleted file mode 100644
index f207054..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.aggregation.DataTypeSerializer;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.DoubleMutable;
-import org.apache.kylin.metadata.model.DataType;
-
-/**
- */
-public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> {
-
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<DoubleMutable> current = new ThreadLocal<DoubleMutable>();
-
-    public DoubleSerializer(DataType type) {
-    }
-
-    @Override
-    public void serialize(DoubleMutable value, ByteBuffer out) {
-        out.putDouble(value.get());
-    }
-
-    private DoubleMutable current() {
-        DoubleMutable d = current.get();
-        if (d == null) {
-            d = new DoubleMutable();
-            current.set(d);
-        }
-        return d;
-    }
-
-    @Override
-    public DoubleMutable deserialize(ByteBuffer in) {
-        DoubleMutable d = current();
-        d.set(in.getDouble());
-        return d;
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        return 8;
-    }
-
-    @Override
-    public int maxLength() {
-        return 8;
-    }
-
-    @Override
-    public int getStorageBytesEstimate() {
-        return 8;
-    }
-
-    @Override
-    public DoubleMutable valueOf(byte[] value) {
-        DoubleMutable d = current();
-        if (value == null)
-            d.set(0d);
-        else
-            d.set(Double.parseDouble(Bytes.toString(value)));
-        return d;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
index 298cec6..6e66c1b 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
@@ -19,7 +19,7 @@
 package org.apache.kylin.aggregation.basic;
 
 import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.util.DoubleMutable;
+import org.apache.kylin.common.datatype.DoubleMutable;
 
 /**
  */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
index 71d95f2..7fdf3d8 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
@@ -19,7 +19,7 @@
 package org.apache.kylin.aggregation.basic;
 
 import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.util.LongMutable;
+import org.apache.kylin.common.datatype.LongMutable;
 
 /**
  */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/8f2a56cf/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
index d1e93f2..22ae865 100644
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
@@ -19,7 +19,7 @@
 package org.apache.kylin.aggregation.basic;
 
 import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.util.LongMutable;
+import org.apache.kylin.common.datatype.LongMutable;
 
 /**
  */