You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/27 10:00:59 UTC

[04/13] incubator-kylin git commit: KYLIN-976 very initial

KYLIN-976 very initial


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/1218bbde
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/1218bbde
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/1218bbde

Branch: refs/heads/KYLIN-976
Commit: 1218bbde487e973de0391162204d73c76f1a9e81
Parents: 6515b0a
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Nov 24 13:15:54 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Nov 27 14:47:05 2015 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/util/DoubleMutable.java |  68 +++++
 .../apache/kylin/common/util/LongMutable.java   |  70 +++++
 .../apache/kylin/aggregation/Aggregation.java   |  42 +++
 .../kylin/aggregation/DataTypeSerializer.java   | 118 +++++++++
 .../kylin/aggregation/MeasureAggregator.java    | 133 ++++++++++
 .../kylin/aggregation/MeasureAggregators.java   |  81 ++++++
 .../apache/kylin/aggregation/MeasureCodec.java  |  78 ++++++
 .../basic/BigDecimalMaxAggregator.java          |  54 ++++
 .../basic/BigDecimalMinAggregator.java          |  55 ++++
 .../aggregation/basic/BigDecimalSerializer.java | 111 ++++++++
 .../basic/BigDecimalSumAggregator.java          |  51 ++++
 .../aggregation/basic/DateTimeSerializer.java   |  65 +++++
 .../aggregation/basic/DoubleMaxAggregator.java  |  54 ++++
 .../aggregation/basic/DoubleMinAggregator.java  |  54 ++++
 .../aggregation/basic/DoubleSerializer.java     |  84 ++++++
 .../aggregation/basic/DoubleSumAggregator.java  |  51 ++++
 .../aggregation/basic/LongMaxAggregator.java    |  54 ++++
 .../aggregation/basic/LongMinAggregator.java    |  54 ++++
 .../kylin/aggregation/basic/LongSerializer.java |  91 +++++++
 .../aggregation/basic/LongSumAggregator.java    |  51 ++++
 .../aggregation/basic/StringSerializer.java     |  56 ++++
 .../kylin/aggregation/hllc/HLLCAggregator.java  |  64 +++++
 .../kylin/aggregation/hllc/HLLCSerializer.java  |  98 +++++++
 .../kylin/aggregation/hllc/LDCAggregator.java   |  63 +++++
 .../kylin/aggregation/topn/TopNAggregator.java  |  66 +++++
 .../aggregation/topn/TopNCounterSerializer.java | 117 +++++++++
 .../kylin/cube/gridtable/CubeCodeSystem.java    |   6 +-
 .../cube/gridtable/TrimmedCubeCodeSystem.java   |   6 +-
 .../cube/inmemcubing/DoggedCubeBuilder.java     |   2 +-
 .../cube/inmemcubing/InMemCubeBuilder.java      |   2 +-
 .../InMemCubeBuilderInputConverter.java         |   5 +-
 .../kylin/gridtable/GTAggregateScanner.java     |   2 +-
 .../kylin/gridtable/GTSampleCodeSystem.java     |   4 +-
 .../apache/kylin/gridtable/IGTCodeSystem.java   |   2 +-
 .../apache/kylin/gridtable/UnitTestSupport.java |   2 +-
 .../basic/BigDecimalSerializerTest.java         |  53 ++++
 .../topn/TopNCounterSerializerTest.java         |  61 +++++
 .../gridtable/AggregationCacheMemSizeTest.java  |  14 +-
 .../kylin/gridtable/DictGridTableTest.java      |   2 +-
 .../kylin/gridtable/SimpleGridTableTest.java    |   2 +-
 .../gridtable/SimpleInvertedIndexTest.java      |   5 +-
 .../metadata/measure/MeasureCodecTest.java      |   3 +
 .../measure/BigDecimalMaxAggregator.java        |  52 ----
 .../measure/BigDecimalMinAggregator.java        |  53 ----
 .../measure/BigDecimalSumAggregator.java        |  49 ----
 .../metadata/measure/DoubleMaxAggregator.java   |  51 ----
 .../metadata/measure/DoubleMinAggregator.java   |  51 ----
 .../kylin/metadata/measure/DoubleMutable.java   |  68 -----
 .../metadata/measure/DoubleSumAggregator.java   |  48 ----
 .../kylin/metadata/measure/HLLCAggregator.java  |  63 -----
 .../kylin/metadata/measure/LDCAggregator.java   |  60 -----
 .../metadata/measure/LongMaxAggregator.java     |  51 ----
 .../metadata/measure/LongMinAggregator.java     |  51 ----
 .../kylin/metadata/measure/LongMutable.java     |  70 -----
 .../metadata/measure/LongSumAggregator.java     |  48 ----
 .../metadata/measure/MeasureAggregator.java     | 121 ---------
 .../metadata/measure/MeasureAggregators.java    |  81 ------
 .../kylin/metadata/measure/MeasureCodec.java    |  79 ------
 .../kylin/metadata/measure/TopNAggregator.java  |  66 -----
 .../measure/fixedlen/FixedHLLCodec.java         |  80 ------
 .../measure/fixedlen/FixedLenMeasureCodec.java  |  49 ----
 .../measure/fixedlen/FixedPointLongCodec.java   | 117 ---------
 .../serializer/BigDecimalSerializer.java        | 110 --------
 .../measure/serializer/DataTypeSerializer.java  | 111 --------
 .../measure/serializer/DateTimeSerializer.java  |  64 -----
 .../measure/serializer/DoubleSerializer.java    |  83 ------
 .../measure/serializer/HLLCSerializer.java      |  97 -------
 .../measure/serializer/LongSerializer.java      |  90 -------
 .../measure/serializer/StringSerializer.java    |  55 ----
 .../serializer/TopNCounterSerializer.java       | 116 --------
 .../apache/kylin/metadata/model/DataType.java   |   2 +-
 .../fixedlen/FixedPointLongCodecTest.java       |  44 ----
 .../serializer/BigDecimalSerializerTest.java    |  52 ----
 .../serializer/TopNCounterSerializerTest.java   |  61 -----
 .../org/apache/kylin/storage/tuple/Tuple.java   |   4 +-
 .../engine/mr/steps/BaseCuboidMapperBase.java   |   3 +-
 .../kylin/engine/mr/steps/CuboidReducer.java    |   4 +-
 .../engine/mr/steps/InMemCuboidReducer.java     |   4 +-
 .../mr/steps/MergeCuboidFromStorageMapper.java  |   2 +-
 .../engine/mr/steps/MergeCuboidMapper.java      |   2 +-
 .../kylin/engine/mr/steps/CubeReducerTest.java  |   4 +-
 .../apache/kylin/engine/spark/SparkCubing.java  |   8 +-
 .../invertedindex/index/RawTableRecord.java     |   4 +-
 .../kylin/invertedindex/index/TableRecord.java  |   2 +-
 .../invertedindex/index/TableRecordInfo.java    |   2 +-
 .../index/TableRecordInfoDigest.java            |   4 +-
 .../invertedindex/measure/FixedHLLCodec.java    |  80 ++++++
 .../measure/FixedLenMeasureCodec.java           |  49 ++++
 .../measure/FixedPointLongCodec.java            | 117 +++++++++
 .../invertedindex/model/IIKeyValueCodec.java    |   2 +-
 .../kylin/invertedindex/IIDescManagerTest.java  | 104 ++++++++
 .../apache/kylin/invertedindex/IIDescTest.java  |  67 +++++
 .../kylin/invertedindex/IIInstanceTest.java     |  74 ++++++
 .../invertedindex/InvertedIndexLocalTest.java   | 262 +++++++++++++++++++
 .../org/apache/kylin/invertedindex/LZFTest.java |  49 ++++
 .../invertedindex/IIDescManagerTest.java        | 104 --------
 .../invertedindex/invertedindex/IIDescTest.java |  67 -----
 .../invertedindex/IIInstanceTest.java           |  74 ------
 .../invertedindex/InvertedIndexLocalTest.java   | 262 -------------------
 .../invertedindex/invertedindex/LZFTest.java    |  49 ----
 .../measure/FixedPointLongCodecTest.java        |  45 ++++
 .../common/coprocessor/AggregationCache.java    |   2 +-
 .../observer/AggregationScanner.java            |   2 +-
 .../observer/ObserverAggregationCache.java      |   2 +-
 .../observer/ObserverAggregators.java           |   4 +-
 .../endpoint/EndpointAggregationCache.java      |   2 +-
 .../endpoint/EndpointAggregators.java           |   6 +-
 .../ii/coprocessor/endpoint/IIEndpoint.java     |   3 +-
 .../storage/hbase/steps/CubeHFileMapper.java    |   2 +-
 .../hbase/steps/HBaseMROutput2Transition.java   |   2 +-
 .../storage/hbase/steps/KeyValueCreator.java    |   2 +-
 .../storage/hbase/steps/RowValueDecoder.java    |   6 +-
 .../observer/AggregateRegionObserverTest.java   |   2 +-
 .../endpoint/EndpointAggregationTest.java       |   7 +-
 .../hbase/steps/CubeHFileMapper2Test.java       |   2 +-
 .../hbase/steps/RowValueDecoderTest.java        |   4 +-
 116 files changed, 2924 insertions(+), 2819 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java b/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java
new file mode 100644
index 0000000..520cd74
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/DoubleMutable.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public class DoubleMutable implements Comparable<DoubleMutable>, Serializable {
+
+    private double v;
+
+    public DoubleMutable() {
+        this(0);
+    }
+
+    public DoubleMutable(double v) {
+        set(v);
+    }
+
+    public double get() {
+        return v;
+    }
+
+    public void set(double v) {
+        this.v = v;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof DoubleMutable)) {
+            return false;
+        }
+        DoubleMutable other = (DoubleMutable) o;
+        return this.v == other.v;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int) Double.doubleToLongBits(v);
+    }
+
+    @Override
+    public int compareTo(DoubleMutable o) {
+        return (v < o.v ? -1 : (v == o.v ? 0 : 1));
+    }
+
+    @Override
+    public String toString() {
+        return Double.toString(v);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java b/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java
new file mode 100644
index 0000000..238bb86
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/util/LongMutable.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public class LongMutable implements Comparable<LongMutable>, Serializable {
+
+    private long v;
+
+    public LongMutable() {
+        this(0);
+    }
+
+    public LongMutable(long v) {
+        set(v);
+    }
+
+    public long get() {
+        return v;
+    }
+
+    public void set(long v) {
+        this.v = v;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof LongMutable)) {
+            return false;
+        }
+        LongMutable other = (LongMutable) o;
+        return this.v == other.v;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int) v;
+    }
+
+    @Override
+    public int compareTo(LongMutable o) {
+        long thisValue = this.v;
+        long thatValue = o.v;
+        return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+    }
+
+    @Override
+    public String toString() {
+        return Long.toString(v);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java b/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java
new file mode 100644
index 0000000..193c5de
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/Aggregation.java
@@ -0,0 +1,42 @@
+package org.apache.kylin.aggregation;
+
+import java.util.List;
+
+import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+abstract public class Aggregation {
+    
+    /* ============================================================================
+     * Define
+     * ---------------------------------------------------------------------------- */
+    
+    abstract public DataType getAggregationDataType();
+    
+    abstract public DataType getResultDataType();
+    
+    abstract public void validate(MeasureDesc measureDesc) throws IllegalArgumentException;
+    
+    /* ============================================================================
+     * Build
+     * ---------------------------------------------------------------------------- */
+    
+    abstract public DataTypeSerializer<?> getSeralizer();
+    
+    abstract public MeasureAggregator<?> getAggregator();
+ 
+    abstract public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc);
+    
+    abstract public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts);
+
+    /* ============================================================================
+     * Cube Selection
+     * ---------------------------------------------------------------------------- */
+    
+    /* ============================================================================
+     * Query
+     * ---------------------------------------------------------------------------- */
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java
new file mode 100644
index 0000000..df6833c
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/DataTypeSerializer.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.aggregation.basic.BigDecimalSerializer;
+import org.apache.kylin.aggregation.basic.DateTimeSerializer;
+import org.apache.kylin.aggregation.basic.DoubleSerializer;
+import org.apache.kylin.aggregation.basic.LongSerializer;
+import org.apache.kylin.aggregation.basic.StringSerializer;
+import org.apache.kylin.aggregation.hllc.HLLCSerializer;
+import org.apache.kylin.aggregation.topn.TopNCounterSerializer;
+import org.apache.kylin.common.util.BytesSerializer;
+import org.apache.kylin.metadata.model.DataType;
+
+import com.google.common.collect.Maps;
+
+/**
+ * @author yangli9
+ * 
+ * Note: the implementations MUST be thread-safe.
+ * 
+ */
+abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
+
+    final static Map<String, Class<?>> implementations;
+    static {
+        HashMap<String, Class<?>> impl = Maps.newHashMap();
+        impl.put("varchar", StringSerializer.class);
+        impl.put("decimal", BigDecimalSerializer.class);
+        impl.put("double", DoubleSerializer.class);
+        impl.put("float", DoubleSerializer.class);
+        impl.put("bigint", LongSerializer.class);
+        impl.put("long", LongSerializer.class);
+        impl.put("integer", LongSerializer.class);
+        impl.put("int", LongSerializer.class);
+        impl.put("smallint", LongSerializer.class);
+        impl.put("date", DateTimeSerializer.class);
+        impl.put("datetime", DateTimeSerializer.class);
+        impl.put("timestamp", DateTimeSerializer.class);
+        implementations = Collections.unmodifiableMap(impl);
+
+    }
+
+    public static DataTypeSerializer<?> create(String dataType) {
+        return create(DataType.getInstance(dataType));
+    }
+
+    public static DataTypeSerializer<?> create(DataType type) {
+        if (type.isHLLC()) {
+            return new HLLCSerializer(type);
+        }
+
+        if (type.isTopN()) {
+            return new TopNCounterSerializer(type);
+        }
+
+        Class<?> clz = implementations.get(type.getName());
+        if (clz == null)
+            throw new RuntimeException("No MeasureSerializer for type " + type);
+
+        try {
+            return (DataTypeSerializer<?>) clz.getConstructor(DataType.class).newInstance(type);
+        } catch (Exception e) {
+            throw new RuntimeException(e); // never happen
+        }
+    }
+    
+    /** peek into buffer and return the length of serialization */
+    abstract public int peekLength(ByteBuffer in);
+
+    /** return the max number of bytes to the longest serialization */
+    abstract public int maxLength();
+
+    /** get an estimate of size in bytes of the serialized data */
+    abstract public int getStorageBytesEstimate();
+
+    /** convert from String to obj (string often come as byte[] in mapred) */
+    abstract public T valueOf(byte[] value);
+
+    /** convert from String to obj */
+    public T valueOf(String value) {
+        try {
+            return valueOf(value.getBytes("UTF-8"));
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e); // never happen
+        }
+    }
+
+    /** convert from obj to string */
+    public String toString(T value) {
+        if (value == null)
+            return "NULL";
+        else
+            return value.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
new file mode 100644
index 0000000..9c8945d
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregator.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation;
+
+import org.apache.kylin.aggregation.basic.BigDecimalMaxAggregator;
+import org.apache.kylin.aggregation.basic.BigDecimalMinAggregator;
+import org.apache.kylin.aggregation.basic.BigDecimalSumAggregator;
+import org.apache.kylin.aggregation.basic.DoubleMaxAggregator;
+import org.apache.kylin.aggregation.basic.DoubleMinAggregator;
+import org.apache.kylin.aggregation.basic.DoubleSumAggregator;
+import org.apache.kylin.aggregation.basic.LongMaxAggregator;
+import org.apache.kylin.aggregation.basic.LongMinAggregator;
+import org.apache.kylin.aggregation.basic.LongSumAggregator;
+import org.apache.kylin.aggregation.hllc.HLLCAggregator;
+import org.apache.kylin.aggregation.hllc.LDCAggregator;
+import org.apache.kylin.aggregation.topn.TopNAggregator;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.FunctionDesc;
+
+import java.io.Serializable;
+
+/**
+ */
+@SuppressWarnings("serial")
+abstract public class MeasureAggregator<V> implements Serializable {
+
+    public static MeasureAggregator<?> create(String funcName, String returnType) {
+        if (FunctionDesc.FUNC_SUM.equalsIgnoreCase(funcName) || FunctionDesc.FUNC_COUNT.equalsIgnoreCase(funcName)) {
+            if (isInteger(returnType))
+                return new LongSumAggregator();
+            else if (isBigDecimal(returnType))
+                return new BigDecimalSumAggregator();
+            else if (isDouble(returnType))
+                return new DoubleSumAggregator();
+        } else if (FunctionDesc.FUNC_COUNT_DISTINCT.equalsIgnoreCase(funcName)) {
+            DataType hllcType = DataType.getInstance(returnType);
+            if (hllcType.isHLLC())
+                return new HLLCAggregator(hllcType.getPrecision());
+            else
+                return new LDCAggregator();
+        } else if (FunctionDesc.FUNC_MAX.equalsIgnoreCase(funcName)) {
+            if (isInteger(returnType))
+                return new LongMaxAggregator();
+            else if (isBigDecimal(returnType))
+                return new BigDecimalMaxAggregator();
+            else if (isDouble(returnType))
+                return new DoubleMaxAggregator();
+        } else if (FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName)) {
+            if (isInteger(returnType))
+                return new LongMinAggregator();
+            else if (isBigDecimal(returnType))
+                return new BigDecimalMinAggregator();
+            else if (isDouble(returnType))
+                return new DoubleMinAggregator();
+        } else if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName)) {
+            return new TopNAggregator();
+        }
+        throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + returnType + "'");
+    }
+
+    public static boolean isBigDecimal(String type) {
+        return type.startsWith("decimal");
+    }
+
+    public static boolean isDouble(String type) {
+        return "double".equalsIgnoreCase(type) || "float".equalsIgnoreCase(type) || "real".equalsIgnoreCase(type);
+    }
+
+    public static boolean isInteger(String type) {
+        return "long".equalsIgnoreCase(type) || "bigint".equalsIgnoreCase(type) || "int".equalsIgnoreCase(type) || "integer".equalsIgnoreCase(type);
+    }
+
+    public static int guessBigDecimalMemBytes() {
+        // 116 returned by AggregationCacheMemSizeTest
+        return 8 // aggregator obj shell
+        + 8 // ref to BigDecimal
+        + 8 // BigDecimal obj shell
+        + 100; // guess of BigDecimal internal
+    }
+
+    public static int guessDoubleMemBytes() {
+        // 29 to 44 returned by AggregationCacheMemSizeTest
+        return 44;
+        /*
+        return 8 // aggregator obj shell
+        + 8 // ref to DoubleWritable
+        + 8 // DoubleWritable obj shell
+        + 8; // size of double
+        */
+    }
+
+    public static int guessLongMemBytes() {
+        // 29 to 44 returned by AggregationCacheMemSizeTest
+        return 44;
+        /*
+        return 8 // aggregator obj shell
+        + 8 // ref to LongWritable
+        + 8 // LongWritable obj shell
+        + 8; // size of long
+        */
+    }
+
+    // ============================================================================
+
+    @SuppressWarnings("rawtypes")
+    public void setDependentAggregator(MeasureAggregator agg) {
+    }
+
+    abstract public void reset();
+
+    abstract public void aggregate(V value);
+
+    abstract public V getState();
+
+    // get an estimate of memory consumption UPPER BOUND
+    abstract public int getMemBytesEstimate();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregators.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregators.java b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregators.java
new file mode 100644
index 0000000..3aa575b
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureAggregators.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ */
+@SuppressWarnings({ "rawtypes", "unchecked", "serial" })
+public class MeasureAggregators implements Serializable {
+
+    private final MeasureAggregator[] aggs;
+    private final int descLength;
+
+    public MeasureAggregators(Collection<MeasureDesc> measureDescs) {
+        this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
+    }
+
+    public MeasureAggregators(MeasureDesc... measureDescs) {
+        descLength = measureDescs.length;
+        aggs = new MeasureAggregator[descLength];
+
+        Map<String, Integer> measureIndexMap = new HashMap<String, Integer>();
+        for (int i = 0; i < descLength; i++) {
+            FunctionDesc func = measureDescs[i].getFunction();
+            aggs[i] = MeasureAggregator.create(func.getExpression(), func.getReturnType());
+            measureIndexMap.put(measureDescs[i].getName(), i);
+        }
+        // fill back dependent aggregator
+        for (int i = 0; i < descLength; i++) {
+            String depMsrRef = measureDescs[i].getDependentMeasureRef();
+            if (depMsrRef != null) {
+                int index = measureIndexMap.get(depMsrRef);
+                aggs[i].setDependentAggregator(aggs[index]);
+            }
+        }
+    }
+
+    public void reset() {
+        for (int i = 0; i < aggs.length; i++) {
+            aggs[i].reset();
+        }
+    }
+    
+    public void aggregate(Object[] values) {
+        assert values.length == descLength;
+
+        for (int i = 0; i < descLength; i++) {
+            aggs[i].aggregate(values[i]);
+        }
+    }
+
+    public void collectStates(Object[] states) {
+        for (int i = 0; i < descLength; i++) {
+            states[i] = aggs[i].getState();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
new file mode 100644
index 0000000..cbcb3a8
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/MeasureCodec.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ * @author yangli9
+ * 
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class MeasureCodec {
+
+    int nMeasures;
+    DataTypeSerializer[] serializers;
+
+    public MeasureCodec(Collection<MeasureDesc> measureDescs) {
+        this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
+    }
+
+    public MeasureCodec(MeasureDesc... measureDescs) {
+        String[] dataTypes = new String[measureDescs.length];
+        for (int i = 0; i < dataTypes.length; i++) {
+            dataTypes[i] = measureDescs[i].getFunction().getReturnType();
+        }
+        init(dataTypes);
+    }
+
+    public MeasureCodec(String... dataTypes) {
+        init(dataTypes);
+    }
+
+    private void init(String[] dataTypes) {
+        nMeasures = dataTypes.length;
+        serializers = new DataTypeSerializer[nMeasures];
+
+        for (int i = 0; i < nMeasures; i++) {
+            serializers[i] = DataTypeSerializer.create(dataTypes[i]);
+        }
+    }
+
+    public DataTypeSerializer getSerializer(int idx) {
+        return serializers[idx];
+    }
+
+    public void decode(ByteBuffer buf, Object[] result) {
+        assert result.length == nMeasures;
+        for (int i = 0; i < nMeasures; i++) {
+            result[i] = serializers[i].deserialize(buf);
+        }
+    }
+
+    public void encode(Object[] values, ByteBuffer out) {
+        assert values.length == nMeasures;
+        for (int i = 0; i < nMeasures; i++) {
+            serializers[i].serialize(values[i], out);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMaxAggregator.java
new file mode 100644
index 0000000..ca044d0
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMaxAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import java.math.BigDecimal;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class BigDecimalMaxAggregator extends MeasureAggregator<BigDecimal> {
+
+    BigDecimal max = null;
+
+    @Override
+    public void reset() {
+        max = null;
+    }
+
+    @Override
+    public void aggregate(BigDecimal value) {
+        if (max == null)
+            max = value;
+        else if (max.compareTo(value) < 0)
+            max = value;
+    }
+
+    @Override
+    public BigDecimal getState() {
+        return max;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessBigDecimalMemBytes();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMinAggregator.java
new file mode 100644
index 0000000..3c3c85e
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalMinAggregator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import java.math.BigDecimal;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class BigDecimalMinAggregator extends MeasureAggregator<BigDecimal> {
+
+    BigDecimal max = null;
+
+    @Override
+    public void reset() {
+        max = null;
+    }
+
+    @Override
+    public void aggregate(BigDecimal value) {
+        if (max == null)
+            max = value;
+        else if (max.compareTo(value) > 0)
+            max = value;
+    }
+
+    @Override
+    public BigDecimal getState() {
+        return max;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessBigDecimalMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java
new file mode 100644
index 0000000..9f7c3cf
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSerializer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.model.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
+
+    private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class);
+
+    final DataType type;
+    final int maxLength;
+
+    int avoidVerbose = 0;
+
+    public BigDecimalSerializer(DataType type) {
+        this.type = type;
+        // see serialize(): 1 byte scale, 1 byte length, assume every 2 digits takes 1 byte
+        this.maxLength = 1 + 1 + (type.getPrecision() + 1) / 2;
+    }
+
+    @Override
+    public void serialize(BigDecimal value, ByteBuffer out) {
+        if (value.scale() > type.getScale()) {
+            if (avoidVerbose % 10000 == 0) {
+                logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (avoidVerbose++));
+            }
+            value = value.setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN);
+        }
+        byte[] bytes = value.unscaledValue().toByteArray();
+        if (bytes.length + 2 > maxLength) {
+            throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
+        }
+
+        BytesUtil.writeVInt(value.scale(), out);
+        BytesUtil.writeVInt(bytes.length, out);
+        out.put(bytes);
+    }
+
+    @Override
+    public BigDecimal deserialize(ByteBuffer in) {
+        int scale = BytesUtil.readVInt(in);
+        int n = BytesUtil.readVInt(in);
+
+        byte[] bytes = new byte[n];
+        in.get(bytes);
+
+        return new BigDecimal(new BigInteger(bytes), scale);
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        int mark = in.position();
+
+        @SuppressWarnings("unused")
+        int scale = BytesUtil.readVInt(in);
+        int n = BytesUtil.readVInt(in);
+        int len = in.position() - mark + n;
+
+        in.position(mark);
+        return len;
+    }
+
+    @Override
+    public int maxLength() {
+        return maxLength;
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return 8;
+    }
+
+    @Override
+    public BigDecimal valueOf(byte[] value) {
+        if (value == null)
+            return new BigDecimal(0);
+        else
+            return new BigDecimal(Bytes.toString(value));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSumAggregator.java
new file mode 100644
index 0000000..19aef3c
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/BigDecimalSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import java.math.BigDecimal;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class BigDecimalSumAggregator extends MeasureAggregator<BigDecimal> {
+
+    BigDecimal sum = new BigDecimal(0);
+
+    @Override
+    public void reset() {
+        sum = new BigDecimal(0);
+    }
+
+    @Override
+    public void aggregate(BigDecimal value) {
+        sum = sum.add(value);
+    }
+
+    @Override
+    public BigDecimal getState() {
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessBigDecimalMemBytes();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java
new file mode 100644
index 0000000..0bf4aba
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DateTimeSerializer.java
@@ -0,0 +1,65 @@
+package org.apache.kylin.aggregation.basic;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.LongMutable;
+import org.apache.kylin.metadata.model.DataType;
+
+public class DateTimeSerializer extends DataTypeSerializer<LongMutable> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+
+    public DateTimeSerializer(DataType type) {
+    }
+
+    @Override
+    public void serialize(LongMutable value, ByteBuffer out) {
+        out.putLong(value.get());
+    }
+
+    private LongMutable current() {
+        LongMutable l = current.get();
+        if (l == null) {
+            l = new LongMutable();
+            current.set(l);
+        }
+        return l;
+    }
+
+    @Override
+    public LongMutable deserialize(ByteBuffer in) {
+        LongMutable l = current();
+        l.set(in.getLong());
+        return l;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return 8;
+    }
+
+    @Override
+    public int maxLength() {
+        return 8;
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return 8;
+    }
+
+    @Override
+    public LongMutable valueOf(byte[] value) {
+        LongMutable l = current();
+        if (value == null)
+            l.set(0L);
+        else
+            l.set(DateFormat.stringToMillis(Bytes.toString(value)));
+        return l;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
new file mode 100644
index 0000000..f09614d
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMaxAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.util.DoubleMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class DoubleMaxAggregator extends MeasureAggregator<DoubleMutable> {
+
+    DoubleMutable max = null;
+
+    @Override
+    public void reset() {
+        max = null;
+    }
+
+    @Override
+    public void aggregate(DoubleMutable value) {
+        if (max == null)
+            max = new DoubleMutable(value.get());
+        else if (max.get() < value.get())
+            max.set(value.get());
+    }
+
+    @Override
+    public DoubleMutable getState() {
+        return max;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessDoubleMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
new file mode 100644
index 0000000..b93c15c
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.util.DoubleMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class DoubleMinAggregator extends MeasureAggregator<DoubleMutable> {
+
+    DoubleMutable min = null;
+
+    @Override
+    public void reset() {
+        min = null;
+    }
+
+    @Override
+    public void aggregate(DoubleMutable value) {
+        if (min == null)
+            min = new DoubleMutable(value.get());
+        else if (min.get() > value.get())
+            min.set(value.get());
+    }
+
+    @Override
+    public DoubleMutable getState() {
+        return min;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessDoubleMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java
new file mode 100644
index 0000000..f207054
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSerializer.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.DoubleMutable;
+import org.apache.kylin.metadata.model.DataType;
+
+/**
+ */
+public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<DoubleMutable> current = new ThreadLocal<DoubleMutable>();
+
+    public DoubleSerializer(DataType type) {
+    }
+
+    @Override
+    public void serialize(DoubleMutable value, ByteBuffer out) {
+        out.putDouble(value.get());
+    }
+
+    private DoubleMutable current() {
+        DoubleMutable d = current.get();
+        if (d == null) {
+            d = new DoubleMutable();
+            current.set(d);
+        }
+        return d;
+    }
+
+    @Override
+    public DoubleMutable deserialize(ByteBuffer in) {
+        DoubleMutable d = current();
+        d.set(in.getDouble());
+        return d;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return 8;
+    }
+
+    @Override
+    public int maxLength() {
+        return 8;
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return 8;
+    }
+
+    @Override
+    public DoubleMutable valueOf(byte[] value) {
+        DoubleMutable d = current();
+        if (value == null)
+            d.set(0d);
+        else
+            d.set(Double.parseDouble(Bytes.toString(value)));
+        return d;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
new file mode 100644
index 0000000..298cec6
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.util.DoubleMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class DoubleSumAggregator extends MeasureAggregator<DoubleMutable> {
+
+    DoubleMutable sum = new DoubleMutable();
+
+    @Override
+    public void reset() {
+        sum.set(0.0);
+    }
+
+    @Override
+    public void aggregate(DoubleMutable value) {
+        sum.set(sum.get() + value.get());
+    }
+
+    @Override
+    public DoubleMutable getState() {
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessDoubleMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
new file mode 100644
index 0000000..71d95f2
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.util.LongMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class LongMaxAggregator extends MeasureAggregator<LongMutable> {
+
+    LongMutable max = null;
+
+    @Override
+    public void reset() {
+        max = null;
+    }
+
+    @Override
+    public void aggregate(LongMutable value) {
+        if (max == null)
+            max = new LongMutable(value.get());
+        else if (max.get() < value.get())
+            max.set(value.get());
+    }
+
+    @Override
+    public LongMutable getState() {
+        return max;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessLongMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
new file mode 100644
index 0000000..d1e93f2
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.util.LongMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class LongMinAggregator extends MeasureAggregator<LongMutable> {
+
+    LongMutable min = null;
+
+    @Override
+    public void reset() {
+        min = null;
+    }
+
+    @Override
+    public void aggregate(LongMutable value) {
+        if (min == null)
+            min = new LongMutable(value.get());
+        else if (min.get() > value.get())
+            min.set(value.get());
+    }
+
+    @Override
+    public LongMutable getState() {
+        return min;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessLongMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSerializer.java
new file mode 100644
index 0000000..202596d
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSerializer.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.LongMutable;
+import org.apache.kylin.metadata.model.DataType;
+
+/**
+ */
+public class LongSerializer extends DataTypeSerializer<LongMutable> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+
+    public LongSerializer(DataType type) {
+    }
+
+    @Override
+    public void serialize(LongMutable value, ByteBuffer out) {
+        BytesUtil.writeVLong(value.get(), out);
+    }
+
+    private LongMutable current() {
+        LongMutable l = current.get();
+        if (l == null) {
+            l = new LongMutable();
+            current.set(l);
+        }
+        return l;
+    }
+
+    @Override
+    public LongMutable deserialize(ByteBuffer in) {
+        LongMutable l = current();
+        l.set(BytesUtil.readVLong(in));
+        return l;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        int mark = in.position();
+
+        BytesUtil.readVLong(in);
+        int len = in.position() - mark;
+
+        in.position(mark);
+        return len;
+    }
+
+    @Override
+    public int maxLength() {
+        return 9; // vlong: 1 + 8
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return 5;
+    }
+
+    @Override
+    public LongMutable valueOf(byte[] value) {
+        LongMutable l = current();
+        if (value == null)
+            l.set(0L);
+        else
+            l.set(Long.parseLong(Bytes.toString(value)));
+        return l;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java
new file mode 100644
index 0000000..c85c83c
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.basic;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.util.LongMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class LongSumAggregator extends MeasureAggregator<LongMutable> {
+
+    LongMutable sum = new LongMutable();
+
+    @Override
+    public void reset() {
+        sum.set(0);
+    }
+
+    @Override
+    public void aggregate(LongMutable value) {
+        sum.set(sum.get() + value.get());
+    }
+
+    @Override
+    public LongMutable getState() {
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessLongMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/basic/StringSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/StringSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/StringSerializer.java
new file mode 100644
index 0000000..e84278d
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/StringSerializer.java
@@ -0,0 +1,56 @@
+package org.apache.kylin.aggregation.basic;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.model.DataType;
+
+public class StringSerializer extends DataTypeSerializer<String> {
+
+    final DataType type;
+    final int maxLength;
+
+    public StringSerializer(DataType type) {
+        this.type = type;
+        // see serialize(): 2 byte length, rest is String.toBytes()
+        this.maxLength = 2 + type.getPrecision();
+    }
+
+    @Override
+    public void serialize(String value, ByteBuffer out) {
+        int start = out.position();
+
+        BytesUtil.writeUTFString(value, out);
+
+        if (out.position() - start > maxLength)
+            throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
+    }
+
+    @Override
+    public String deserialize(ByteBuffer in) {
+        return BytesUtil.readUTFString(in);
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return BytesUtil.peekByteArrayLength(in);
+    }
+
+    @Override
+    public int maxLength() {
+        return maxLength;
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return maxLength;
+    }
+
+    @Override
+    public String valueOf(byte[] value) {
+        return Bytes.toString(value);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java
new file mode 100644
index 0000000..8f85fe8
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.hllc;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> {
+
+    final int precision;
+    HyperLogLogPlusCounter sum = null;
+
+    public HLLCAggregator(int precision) {
+        this.precision = precision;
+    }
+
+    @Override
+    public void reset() {
+        sum = null;
+    }
+
+    @Override
+    public void aggregate(HyperLogLogPlusCounter value) {
+        if (sum == null)
+            sum = new HyperLogLogPlusCounter(value);
+        else
+            sum.merge(value);
+    }
+
+    @Override
+    public HyperLogLogPlusCounter getState() {
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        // 1024 + 60 returned by AggregationCacheMemSizeTest
+        return 8 // aggregator obj shell
+                + 4 // precision
+                + 8 // ref to HLLC
+                + 8 // HLLC obj shell
+                + 32 + (1 << precision); // HLLC internal
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java
new file mode 100644
index 0000000..f7804f4
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.hllc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.model.DataType;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<HyperLogLogPlusCounter> current = new ThreadLocal<HyperLogLogPlusCounter>();
+
+    private int precision;
+
+    public HLLCSerializer(DataType type) {
+        this.precision = type.getPrecision();
+    }
+
+    @Override
+    public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) {
+        try {
+            value.writeRegisters(out);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private HyperLogLogPlusCounter current() {
+        HyperLogLogPlusCounter hllc = current.get();
+        if (hllc == null) {
+            hllc = new HyperLogLogPlusCounter(precision);
+            current.set(hllc);
+        }
+        return hllc;
+    }
+
+    @Override
+    public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
+        HyperLogLogPlusCounter hllc = current();
+        try {
+            hllc.readRegisters(in);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return hllc;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return current().peekLength(in);
+    }
+
+    @Override
+    public int maxLength() {
+        return current().maxLength();
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return current().maxLength();
+    }
+
+    @Override
+    public HyperLogLogPlusCounter valueOf(byte[] value) {
+        HyperLogLogPlusCounter hllc = current();
+        hllc.clear();
+        if (value == null)
+            hllc.add("__nUlL__");
+        else
+            hllc.add(value);
+        return hllc;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java
new file mode 100644
index 0000000..643bcae
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.hllc;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.util.LongMutable;
+
+/**
+ * Long Distinct Count
+ */
+@SuppressWarnings("serial")
+public class LDCAggregator extends MeasureAggregator<LongMutable> {
+
+    private static LongMutable ZERO = new LongMutable(0);
+
+    private HLLCAggregator hllAgg = null;
+    private LongMutable state = new LongMutable(0);
+
+    @SuppressWarnings("rawtypes")
+    public void setDependentAggregator(MeasureAggregator agg) {
+        this.hllAgg = (HLLCAggregator) agg;
+    }
+
+    @Override
+    public void reset() {
+    }
+
+    @Override
+    public void aggregate(LongMutable value) {
+    }
+
+    @Override
+    public LongMutable getState() {
+        if (hllAgg == null) {
+            return ZERO;
+        } else {
+            state.set(hllAgg.getState().getCountEstimate());
+            return state;
+        }
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessLongMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java
new file mode 100644
index 0000000..4f6c7ee
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.topn;
+
+import java.util.Map;
+
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+
+import com.google.common.collect.Maps;
+
+/**
+ * 
+ */
+@SuppressWarnings("serial")
+public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
+
+    int capacity = 0;
+    TopNCounter<ByteArray> sum = null;
+    Map<ByteArray, Double> sanityCheckMap;
+
+    @Override
+    public void reset() {
+        sum = null;
+    }
+
+    @Override
+    public void aggregate(TopNCounter<ByteArray> value) {
+        if (sum == null) {
+            capacity = value.getCapacity();
+            sum = new TopNCounter<>(capacity);
+            sanityCheckMap = Maps.newHashMap();
+        }
+        sum.merge(value);
+    }
+
+    @Override
+    public TopNCounter<ByteArray> getState() {
+        
+        //sum.retain(capacity);
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return 8 * capacity / 4;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java
new file mode 100644
index 0000000..8c44f8f
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.aggregation.topn;
+
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.DoubleDeltaSerializer;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.model.DataType;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * 
+ */
+public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> {
+
+    private DoubleDeltaSerializer dds = new DoubleDeltaSerializer(3);
+
+    private int precision;
+
+    public TopNCounterSerializer(DataType dataType) {
+        this.precision = dataType.getPrecision();
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        int mark = in.position();
+        @SuppressWarnings("unused")
+        int capacity = in.getInt();
+        int size = in.getInt();
+        int keyLength = in.getInt();
+        dds.deserialize(in);
+        int len = in.position() - mark + keyLength * size;
+        in.position(mark);
+        return len;
+    }
+
+    @Override
+    public int maxLength() {
+        return precision * TopNCounter.EXTRA_SPACE_RATE * (4 + 8);
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return precision * TopNCounter.EXTRA_SPACE_RATE * 8;
+    }
+
+    @Override
+    public TopNCounter<ByteArray> valueOf(byte[] value) {
+        ByteBuffer buffer = ByteBuffer.wrap(value);
+        int sizeOfId = buffer.getInt();
+        int keyEncodedValue = buffer.getInt();
+        double counter = buffer.getDouble();
+
+        ByteArray key = new ByteArray(sizeOfId);
+        BytesUtil.writeUnsigned(keyEncodedValue, key.array(), 0, sizeOfId);
+
+        TopNCounter<ByteArray> topNCounter = new TopNCounter<ByteArray>(precision * TopNCounter.EXTRA_SPACE_RATE);
+        topNCounter.offer(key, counter);
+        return topNCounter;
+    }
+
+    @Override
+    public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
+        double[] counters = value.getCounters();
+        List<ByteArray> peek = value.peek(1);
+        int keyLength = peek.size() > 0 ? peek.get(0).length() : 0;
+        out.putInt(value.getCapacity());
+        out.putInt(value.size());
+        out.putInt(keyLength);
+        dds.serialize(counters, out);
+        Iterator<Counter<ByteArray>> iterator = value.iterator();
+        while (iterator.hasNext()) {
+            out.put(iterator.next().getItem().array());
+        }
+    }
+
+    @Override
+    public TopNCounter<ByteArray> deserialize(ByteBuffer in) {
+        int capacity = in.getInt();
+        int size = in.getInt();
+        int keyLength = in.getInt();
+        double[] counters = dds.deserialize(in);
+
+        TopNCounter<ByteArray> counter = new TopNCounter<ByteArray>(capacity);
+        ByteArray byteArray;
+        for (int i = 0; i < size; i++) {
+            byteArray = new ByteArray(keyLength);
+            in.get(byteArray.array());
+            counter.offerToHead(byteArray, counters[i]);
+        }
+
+        return counter;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
index 65d639f..b88f9df 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -6,6 +6,9 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.aggregation.basic.StringSerializer;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;
@@ -15,9 +18,6 @@ import org.apache.kylin.gridtable.DefaultGTComparator;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.IGTCodeSystem;
 import org.apache.kylin.gridtable.IGTComparator;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer;
-import org.apache.kylin.metadata.measure.serializer.StringSerializer;
 
 /**
  * defines how column values will be encoded to/ decoded from GTRecord 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
index e4f32fb..26f1636 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
@@ -23,13 +23,13 @@ package org.apache.kylin.cube.gridtable;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
+import org.apache.kylin.aggregation.DataTypeSerializer;
+import org.apache.kylin.aggregation.MeasureAggregator;
+import org.apache.kylin.aggregation.basic.StringSerializer;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.IGTCodeSystem;
 import org.apache.kylin.gridtable.IGTComparator;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
-import org.apache.kylin.metadata.measure.serializer.DataTypeSerializer;
-import org.apache.kylin.metadata.measure.serializer.StringSerializer;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class TrimmedCubeCodeSystem implements IGTCodeSystem {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 7fe2122..ce912a3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -29,6 +29,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.kylin.aggregation.MeasureAggregators;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
@@ -37,7 +38,6 @@ import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.metadata.measure.MeasureAggregators;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index e9d940a..8c6146b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kylin.common.topn.Counter;
 import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.DoubleMutable;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.common.util.Pair;
@@ -46,7 +47,6 @@ import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.metadata.measure.DoubleMutable;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
index 69a9fc9..951c054 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
@@ -18,15 +18,16 @@
 package org.apache.kylin.cube.inmemcubing;
 
 import com.google.common.base.Preconditions;
+
+import org.apache.kylin.aggregation.MeasureCodec;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.LongMutable;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.metadata.measure.LongMutable;
-import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1218bbde/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index cd2881e..01696e8 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -7,10 +7,10 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.SortedMap;
 
+import org.apache.kylin.aggregation.MeasureAggregator;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;