You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/27 10:01:05 UTC

[10/13] incubator-kylin git commit: KYLIN-976 Add ingester; Build part done, in-mem cube test pass

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
index ed0a166..832584c 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderTest.java
@@ -33,10 +33,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.junit.AfterClass;
@@ -58,7 +58,7 @@ public class DoggedCubeBuilderTest extends LocalFileMetadataTestCase {
 
     private static CubeInstance cube;
     private static String flatTable;
-    private static Map<TblColRef, Dictionary<?>> dictionaryMap;
+    private static Map<TblColRef, Dictionary<String>> dictionaryMap;
 
     @BeforeClass
     public static void before() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
index f853b08..ab87c2b 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
@@ -33,13 +33,13 @@ import java.util.concurrent.Future;
 import org.apache.commons.io.FileUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryGenerator;
 import org.apache.kylin.dict.IterableDictionaryValueEnumerator;
 import org.apache.kylin.gridtable.GTRecord;
@@ -66,7 +66,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
 
     private static CubeInstance cube;
     private static String flatTable;
-    private static Map<TblColRef, Dictionary<?>> dictionaryMap;
+    private static Map<TblColRef, Dictionary<String>> dictionaryMap;
 
     @BeforeClass
     public static void before() throws IOException {
@@ -166,8 +166,8 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
         queue.put(new ArrayList<String>(0));
     }
 
-    static Map<TblColRef, Dictionary<?>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
-        Map<TblColRef, Dictionary<?>> result = Maps.newHashMap();
+    static Map<TblColRef, Dictionary<String>> getDictionaryMap(CubeInstance cube, String flatTable) throws IOException {
+        Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
         CubeDesc desc = cube.getDescriptor();
         CubeJoinedFlatTableDesc flatTableDesc = new CubeJoinedFlatTableDesc(desc, null);
         int nColumns = flatTableDesc.getColumnList().size();
@@ -178,7 +178,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
             if (desc.getRowkey().isUseDictionary(col)) {
                 logger.info("Building dictionary for " + col);
                 List<byte[]> valueList = readValueList(flatTable, nColumns, flatTableDesc.getRowKeyColumnIndexes()[c]);
-                Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
+                Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(col.getType(), new IterableDictionaryValueEnumerator(valueList));
                 result.put(col, dict);
             }
         }
@@ -192,7 +192,7 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
                 TblColRef literalCol = func.getTopNLiteralColumn();
                 logger.info("Building dictionary for " + literalCol);
                 List<byte[]> valueList = readValueList(flatTable, nColumns, literalColIdx);
-                Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(literalCol.getType(), new IterableDictionaryValueEnumerator(valueList));
+                Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(literalCol.getType(), new IterableDictionaryValueEnumerator(valueList));
 
                 result.put(literalCol, dict);
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
index 93f0419..6ae0d0e 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheMemSizeTest.java
@@ -23,15 +23,15 @@ import java.util.Random;
 import java.util.SortedMap;
 import java.util.TreeMap;
 
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.aggregation.basic.BigDecimalSumAggregator;
-import org.apache.kylin.aggregation.basic.DoubleSumAggregator;
-import org.apache.kylin.aggregation.basic.LongSumAggregator;
-import org.apache.kylin.aggregation.hllc.HLLCAggregator;
-import org.apache.kylin.common.datatype.DoubleMutable;
-import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.basic.BigDecimalSumAggregator;
+import org.apache.kylin.measure.basic.DoubleSumAggregator;
+import org.apache.kylin.measure.basic.LongSumAggregator;
+import org.apache.kylin.measure.hllc.HLLCAggregator;
+import org.apache.kylin.metadata.datatype.DoubleMutable;
+import org.apache.kylin.metadata.datatype.LongMutable;
 import org.junit.Test;
 
 public class AggregationCacheMemSizeTest {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
index b3981e8..836bafd 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -27,18 +27,18 @@ import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.gridtable.CubeCodeSystem;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.NumberDictionaryBuilder;
 import org.apache.kylin.dict.StringBytesConverter;
 import org.apache.kylin.dict.TrieDictionaryBuilder;
 import org.apache.kylin.gridtable.GTInfo.Builder;
 import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.LongMutable;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
index 4313f4b..5454490 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
@@ -26,9 +26,9 @@ import java.math.BigDecimal;
 import java.util.BitSet;
 import java.util.List;
 
-import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.gridtable.memstore.GTSimpleMemStore;
+import org.apache.kylin.metadata.datatype.LongMutable;
 import org.junit.Test;
 
 public class SimpleGridTableTest {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
index 7f4da61..02fbecc 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleInvertedIndexTest.java
@@ -24,10 +24,10 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.datatype.LongMutable;
-import org.apache.kylin.common.datatype.StringSerializer;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.LongMutable;
+import org.apache.kylin.metadata.datatype.StringSerializer;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
index 4da5471..e9f74a4 100644
--- a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
@@ -23,11 +23,11 @@ import static org.junit.Assert.assertEquals;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
-import org.apache.kylin.aggregation.MeasureCodec;
-import org.apache.kylin.common.datatype.DoubleMutable;
-import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.metadata.datatype.DoubleMutable;
+import org.apache.kylin.metadata.datatype.LongMutable;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
index b882e58..62b06aa 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DateStrDictionary.java
@@ -30,6 +30,7 @@ import java.io.UnsupportedEncodingException;
 import java.util.Date;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.util.Dictionary;
 
 /**
  * A dictionary for date string (date only, no time).

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/DictCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictCodeSystem.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictCodeSystem.java
index f19a2a8..b0326c1 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictCodeSystem.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictCodeSystem.java
@@ -3,6 +3,7 @@ package org.apache.kylin.dict;
 import java.nio.ByteBuffer;
 
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.metadata.filter.IFilterCodeSystem;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
deleted file mode 100644
index d1fc6f9..0000000
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/Dictionary.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.dict;
-
-import java.io.PrintStream;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
-
-import org.apache.kylin.common.persistence.Writable;
-import org.apache.kylin.common.util.BytesUtil;
-
-/**
- * A bi-way dictionary that maps from dimension/column values to IDs and vice
- * versa. By storing IDs instead of real values, the size of cube is
- * significantly reduced.
- * 
- * - IDs are smallest integers possible for the cardinality of a column, for the
- * purpose of minimal storage space - IDs preserve ordering of values, such that
- * range query can be applied to IDs directly
- * 
- * A dictionary once built, is immutable. This allows optimal memory footprint
- * by e.g. flatten the Trie structure into a byte array, replacing node pointers
- * with array offsets.
- * 
- * @author yangli9
- */
-@SuppressWarnings("serial")
-abstract public class Dictionary<T> implements Writable, Serializable {
-
-    public static final byte NULL = (byte) 0xff;
-
-    // ID with all bit-1 (0xff e.g.) reserved for NULL value
-    public static final int NULL_ID[] = new int[] { 0, 0xff, 0xffff, 0xffffff, 0xffffffff };
-
-    abstract public int getMinId();
-
-    abstract public int getMaxId();
-
-    public int getSize() {
-        return getMaxId() - getMinId() + 1;
-    }
-
-    /**
-     * @return the size of an ID in bytes, determined by the cardinality of column
-     */
-    abstract public int getSizeOfId();
-
-    /**
-     * @return the (maximum) size of value in bytes, determined by the longest value
-     */
-    abstract public int getSizeOfValue();
-
-    /**
-     * @return true if each entry of this dict is contained by the dict in param
-     */
-    abstract public boolean contains(Dictionary<?> another);
-
-    /**
-     * Convenient form of <code>getIdFromValue(value, 0)</code>
-     */
-    final public int getIdFromValue(T value) throws IllegalArgumentException {
-        return getIdFromValue(value, 0);
-    }
-
-    /**
-     * Returns the ID integer of given value. In case of not found
-     * <p>
-     * - if roundingFlag=0, throw IllegalArgumentException; <br>
-     * - if roundingFlag<0, the closest smaller ID integer if exist; <br>
-     * - if roundingFlag>0, the closest bigger ID integer if exist. <br>
-     * <p>
-     * The implementation often has cache, thus faster than the byte[] version getIdFromValueBytes()
-     * 
-     * @throws IllegalArgumentException
-     *             if value is not found in dictionary and rounding is off;
-     *             or if rounding cannot find a smaller or bigger ID
-     */
-    final public int getIdFromValue(T value, int roundingFlag) throws IllegalArgumentException {
-        if (isNullObjectForm(value))
-            return nullId();
-        else
-            return getIdFromValueImpl(value, roundingFlag);
-    }
-
-    final public boolean containsValue(T value) throws IllegalArgumentException {
-        if (isNullObjectForm(value)) {
-            return true;
-        } else {
-            try {
-                //if no key found, it will throw exception
-                getIdFromValueImpl(value, 0);
-            } catch (IllegalArgumentException e) {
-                return false;
-            }
-            return true;
-        }
-    }
-
-    protected boolean isNullObjectForm(T value) {
-        return value == null;
-    }
-
-    abstract protected int getIdFromValueImpl(T value, int roundingFlag);
-
-    /**
-     * @return the value corresponds to the given ID
-     * @throws IllegalArgumentException
-     *             if ID is not found in dictionary
-     */
-    final public T getValueFromId(int id) throws IllegalArgumentException {
-        if (isNullId(id))
-            return null;
-        else
-            return getValueFromIdImpl(id);
-    }
-
-    abstract protected T getValueFromIdImpl(int id);
-
-    /**
-     * Convenient form of
-     * <code>getIdFromValueBytes(value, offset, len, 0)</code>
-     */
-    final public int getIdFromValueBytes(byte[] value, int offset, int len) throws IllegalArgumentException {
-        return getIdFromValueBytes(value, offset, len, 0);
-    }
-
-    /**
-     * A lower level API, return ID integer from raw value bytes. In case of not found 
-     * <p>
-     * - if roundingFlag=0, throw IllegalArgumentException; <br>
-     * - if roundingFlag<0, the closest smaller ID integer if exist; <br>
-     * - if roundingFlag>0, the closest bigger ID integer if exist. <br>
-     * <p>
-     * Bypassing the cache layer, this could be significantly slower than getIdFromValue(T value).
-     * 
-     * @throws IllegalArgumentException
-     *             if value is not found in dictionary and rounding is off;
-     *             or if rounding cannot find a smaller or bigger ID
-     */
-    final public int getIdFromValueBytes(byte[] value, int offset, int len, int roundingFlag) throws IllegalArgumentException {
-        if (isNullByteForm(value, offset, len))
-            return nullId();
-        else {
-            int id = getIdFromValueBytesImpl(value, offset, len, roundingFlag);
-            if (id < 0)
-                throw new IllegalArgumentException("Value not exists!");
-            return id;
-        }
-    }
-
-    protected boolean isNullByteForm(byte[] value, int offset, int len) {
-        return value == null;
-    }
-
-    abstract protected int getIdFromValueBytesImpl(byte[] value, int offset, int len, int roundingFlag);
-
-    final public byte[] getValueBytesFromId(int id) {
-        if (isNullId(id))
-            return BytesUtil.EMPTY_BYTE_ARRAY;
-        else
-            return getValueBytesFromIdImpl(id);
-    }
-
-    abstract protected byte[] getValueBytesFromIdImpl(int id);
-
-    /**
-     * A lower level API, get byte values from ID, return the number of bytes
-     * written. Bypassing the cache layer, this could be significantly slower
-     * than getIdFromValue(T value).
-     *
-     * @return size of value bytes, 0 if empty string, -1 if null
-     *
-     * @throws IllegalArgumentException
-     *             if ID is not found in dictionary
-     */
-    final public int getValueBytesFromId(int id, byte[] returnValue, int offset) throws IllegalArgumentException {
-        if (isNullId(id))
-            return -1;
-        else
-            return getValueBytesFromIdImpl(id, returnValue, offset);
-    }
-
-    abstract protected int getValueBytesFromIdImpl(int id, byte[] returnValue, int offset);
-
-    abstract public void dump(PrintStream out);
-
-    public int nullId() {
-        return NULL_ID[getSizeOfId()];
-    }
-
-    public boolean isNullId(int id) {
-        int nullId = NULL_ID[getSizeOfId()];
-        return (nullId & id) == nullId;
-    }
-
-    /** utility that converts a dictionary ID to string, preserving order */
-    public static String dictIdToString(byte[] idBytes, int offset, int length) {
-        try {
-            return new String(idBytes, offset, length, "ISO-8859-1");
-        } catch (UnsupportedEncodingException e) {
-            // never happen
-            return null;
-        }
-    }
-
-    /** the reverse of dictIdToString(), returns integer ID */
-    public static int stringToDictId(String str) {
-        try {
-            byte[] bytes = str.getBytes("ISO-8859-1");
-            return BytesUtil.readUnsigned(bytes, 0, bytes.length);
-        } catch (UnsupportedEncodingException e) {
-            // never happen
-            return 0;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
index 300c240..4b01e60 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryGenerator.java
@@ -26,9 +26,10 @@ import java.util.List;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.source.ReadableTable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,7 +56,7 @@ public class DictionaryGenerator {
         }
     }
 
-    public static Dictionary<?> buildDictionaryFromValueEnumerator(DataType dataType, IDictionaryValueEnumerator valueEnumerator) throws IOException {
+    public static Dictionary<String> buildDictionaryFromValueEnumerator(DataType dataType, IDictionaryValueEnumerator valueEnumerator) throws IOException {
         Preconditions.checkNotNull(dataType, "dataType cannot be null");
         Dictionary dict;
         int baseId = 0; // always 0 for now
@@ -94,7 +95,7 @@ public class DictionaryGenerator {
         return buildDictionaryFromValueEnumerator(dataType, new MultipleDictionaryValueEnumerator(sourceDicts));
     }
 
-    public static Dictionary<?> buildDictionary(DictionaryInfo info, ReadableTable inpTable) throws IOException {
+    public static Dictionary<String> buildDictionary(DictionaryInfo info, ReadableTable inpTable) throws IOException {
 
         // currently all data types are casted to string to build dictionary
         // String dataType = info.getDataType();
@@ -111,7 +112,7 @@ public class DictionaryGenerator {
         }
     }
 
-    private static Dictionary buildDateDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException {
+    private static Dictionary<String> buildDateDict(IDictionaryValueEnumerator valueEnumerator, int baseId, int nSamples, ArrayList samples) throws IOException {
         final int BAD_THRESHOLD = 0;
         String matchPattern = null;
         byte[] value;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
index f482002..4fba59a 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfo.java
@@ -20,6 +20,7 @@ package org.apache.kylin.dict;
 
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.source.ReadableTable.TableSignature;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java
index 5b7a318..69b29fe 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryInfoSerializer.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.JsonUtil;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
index 2f4b761..f538142 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionaryManager.java
@@ -30,9 +30,10 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java
index b80f838..6b47868 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/DictionarySerializer.java
@@ -10,6 +10,7 @@ import java.io.OutputStream;
 
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
 
 /**
  */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryAware.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryAware.java b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryAware.java
index 1c7a009..4586163 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryAware.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/IDictionaryAware.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.dict;
 
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.metadata.model.TblColRef;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
index 13f7394..df7b1c6 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/MultipleDictionaryValueEnumerator.java
@@ -19,7 +19,9 @@
 package org.apache.kylin.dict;
 
 import com.google.common.collect.Lists;
+
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
 
 import java.io.IOException;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
index 6bae129..65c6c05 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TimeStrDictionary.java
@@ -7,6 +7,7 @@ import java.io.PrintStream;
 import java.io.UnsupportedEncodingException;
 
 import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.Dictionary;
 
 /**
  */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
index 3a05d0a..552aa92 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/TrieDictionary.java
@@ -19,9 +19,11 @@
 package org.apache.kylin.dict;
 
 import com.google.common.base.Preconditions;
+
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.Dictionary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
index e9a99b9..6297906 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/SnapshotTable.java
@@ -30,7 +30,7 @@ import java.util.List;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.dict.StringBytesConverter;
 import org.apache.kylin.dict.TrieDictionary;
 import org.apache.kylin.dict.TrieDictionaryBuilder;
@@ -44,6 +44,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 /**
  * @author yangli9
  */
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class SnapshotTable extends RootPersistentEntity implements ReadableTable {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
----------------------------------------------------------------------
diff --git a/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java b/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
index 8c8dcfc..8020729 100644
--- a/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
+++ b/core-dictionary/src/test/java/org/apache/kylin/dict/NumberDictionaryTest.java
@@ -28,8 +28,9 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 
-import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/IMeasureFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/IMeasureFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/IMeasureFactory.java
new file mode 100644
index 0000000..0ab547a
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/IMeasureFactory.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+public interface IMeasureFactory {
+
+    public MeasureType createMeasureType(String funcName, String dataType);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java
new file mode 100644
index 0000000..32e5128
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.io.Serializable;
+
+/**
+ */
+@SuppressWarnings("serial")
+abstract public class MeasureAggregator<V> implements Serializable {
+
+    public static MeasureAggregator<?> create(String funcName, String dataType) {
+        return MeasureType.create(funcName, dataType).newAggregator();
+    }
+
+    public static int guessBigDecimalMemBytes() {
+        // 116 returned by AggregationCacheMemSizeTest
+        return 8 // aggregator obj shell
+        + 8 // ref to BigDecimal
+        + 8 // BigDecimal obj shell
+        + 100; // guess of BigDecimal internal
+    }
+
+    public static int guessDoubleMemBytes() {
+        // 29 to 44 returned by AggregationCacheMemSizeTest
+        return 44;
+        /*
+        return 8 // aggregator obj shell
+        + 8 // ref to DoubleWritable
+        + 8 // DoubleWritable obj shell
+        + 8; // size of double
+        */
+    }
+
+    public static int guessLongMemBytes() {
+        // 29 to 44 returned by AggregationCacheMemSizeTest
+        return 44;
+        /*
+        return 8 // aggregator obj shell
+        + 8 // ref to LongWritable
+        + 8 // LongWritable obj shell
+        + 8; // size of long
+        */
+    }
+
+    // ============================================================================
+
+    @SuppressWarnings("rawtypes")
+    public void setDependentAggregator(MeasureAggregator agg) {
+    }
+
+    abstract public void reset();
+
+    abstract public void aggregate(V value);
+
+    abstract public V getState();
+
+    // get an estimate of memory consumption UPPER BOUND
+    abstract public int getMemBytesEstimate();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
new file mode 100644
index 0000000..c6b456e
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureAggregators.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ */
+@SuppressWarnings({ "rawtypes", "unchecked", "serial" })
+public class MeasureAggregators implements Serializable {
+
+    private final MeasureAggregator[] aggs;
+    private final int descLength;
+
+    public MeasureAggregators(Collection<MeasureDesc> measureDescs) {
+        this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
+    }
+
+    public MeasureAggregators(MeasureDesc... measureDescs) {
+        descLength = measureDescs.length;
+        aggs = new MeasureAggregator[descLength];
+
+        Map<String, Integer> measureIndexMap = new HashMap<String, Integer>();
+        for (int i = 0; i < descLength; i++) {
+            FunctionDesc func = measureDescs[i].getFunction();
+            aggs[i] = MeasureAggregator.create(func.getExpression(), func.getReturnType());
+            measureIndexMap.put(measureDescs[i].getName(), i);
+        }
+        // fill back dependent aggregator
+        for (int i = 0; i < descLength; i++) {
+            String depMsrRef = measureDescs[i].getDependentMeasureRef();
+            if (depMsrRef != null) {
+                int index = measureIndexMap.get(depMsrRef);
+                aggs[i].setDependentAggregator(aggs[index]);
+            }
+        }
+    }
+
+    public void reset() {
+        for (int i = 0; i < aggs.length; i++) {
+            aggs[i].reset();
+        }
+    }
+    
+    public void aggregate(Object[] values) {
+        assert values.length == descLength;
+
+        for (int i = 0; i < descLength; i++) {
+            aggs[i].aggregate(values[i]);
+        }
+    }
+
+    public void collectStates(Object[] states) {
+        for (int i = 0; i < descLength; i++) {
+            states[i] = aggs[i].getState();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
new file mode 100644
index 0000000..b02addd
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureCodec.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+/**
+ * @author yangli9
+ * 
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class MeasureCodec {
+
+    int nMeasures;
+    DataTypeSerializer[] serializers;
+
+    public MeasureCodec(Collection<MeasureDesc> measureDescs) {
+        this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
+    }
+
+    public MeasureCodec(MeasureDesc... measureDescs) {
+        String[] dataTypes = new String[measureDescs.length];
+        for (int i = 0; i < dataTypes.length; i++) {
+            dataTypes[i] = measureDescs[i].getFunction().getReturnType();
+        }
+        init(dataTypes);
+    }
+
+    public MeasureCodec(String... dataTypes) {
+        init(dataTypes);
+    }
+
+    private void init(String[] dataTypes) {
+        nMeasures = dataTypes.length;
+        serializers = new DataTypeSerializer[nMeasures];
+
+        for (int i = 0; i < nMeasures; i++) {
+            serializers[i] = DataTypeSerializer.create(dataTypes[i]);
+        }
+    }
+
+    public DataTypeSerializer getSerializer(int idx) {
+        return serializers[idx];
+    }
+
+    public void decode(ByteBuffer buf, Object[] result) {
+        assert result.length == nMeasures;
+        for (int i = 0; i < nMeasures; i++) {
+            result[i] = serializers[i].deserialize(buf);
+        }
+    }
+
+    public void encode(Object[] values, ByteBuffer out) {
+        assert values.length == nMeasures;
+        for (int i = 0; i < nMeasures; i++) {
+            serializers[i].serialize(values[i], out);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
new file mode 100644
index 0000000..8d6e601
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.util.Collection;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+abstract public class MeasureIngester<V> {
+    
+    public static MeasureIngester<?> create(MeasureDesc measure) {
+        return MeasureType.create(measure.getFunction()).newIngester();
+    }
+    
+    public static MeasureIngester<?>[] create(Collection<MeasureDesc> measures) {
+        MeasureIngester<?>[] result = new MeasureIngester<?>[measures.size()];
+        int i = 0;
+        for (MeasureDesc measure : measures) {
+            result[i++] = create(measure);
+        }
+        return result;
+    }
+
+    abstract public V valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap);
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
new file mode 100644
index 0000000..604a7b6
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.basic.BasicMeasureFactory;
+import org.apache.kylin.measure.hllc.HLLCAggregationFactory;
+import org.apache.kylin.measure.topn.TopNMeasureFactory;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Maps;
+
+abstract public class MeasureType {
+    
+    private static final Map<String, IMeasureFactory> factoryRegistry = Maps.newConcurrentMap();
+    private static final IMeasureFactory defaultFactory = new BasicMeasureFactory();
+    
+    static {
+        factoryRegistry.put(FunctionDesc.FUNC_COUNT_DISTINCT, new HLLCAggregationFactory());
+        factoryRegistry.put(FunctionDesc.FUNC_TOP_N, new TopNMeasureFactory());
+    }
+    
+    public static MeasureType create(FunctionDesc function) {
+        return create(function.getExpression(), function.getReturnType());
+    }
+
+    public static MeasureType create(String funcName, String dataType) {
+        funcName = funcName.toUpperCase();
+        dataType = dataType.toLowerCase();
+        
+        IMeasureFactory factory = factoryRegistry.get(funcName);
+        if (factory == null)
+            factory = defaultFactory;
+        
+        MeasureType result = factory.createMeasureType(funcName, dataType);
+        
+        // register serializer for aggr data type
+        DataType aggregationDataType = result.getAggregationDataType();
+        if (DataTypeSerializer.hasRegistered(aggregationDataType.getName()) == false) {
+            DataTypeSerializer.register(aggregationDataType.getName(), result.getAggregationDataSeralizer());
+        }
+        
+        return result;
+    }
+    
+    /* ============================================================================
+     * Define
+     * ---------------------------------------------------------------------------- */
+    
+    abstract public DataType getAggregationDataType();
+    
+    abstract public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer();
+    
+    abstract public void validate(MeasureDesc measureDesc) throws IllegalArgumentException;
+    
+    /* ============================================================================
+     * Build
+     * ---------------------------------------------------------------------------- */
+    
+    abstract public MeasureIngester<?> newIngester();
+    
+    abstract public MeasureAggregator<?> newAggregator();
+ 
+    abstract public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc);
+    
+    abstract public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts);
+
+    /* ============================================================================
+     * Cube Selection
+     * ---------------------------------------------------------------------------- */
+    
+    /* ============================================================================
+     * Query
+     * ---------------------------------------------------------------------------- */
+    
+    /* ============================================================================
+     * Storage
+     * ---------------------------------------------------------------------------- */
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureFactory.java
new file mode 100644
index 0000000..7bfee49
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureFactory.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.kylin.measure.IMeasureFactory;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+
+public class BasicMeasureFactory implements IMeasureFactory {
+
+    @Override
+    public MeasureType createMeasureType(String funcName, String dataType) {
+        return new BasicMeasureType(funcName, DataType.getType(dataType));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
new file mode 100644
index 0000000..f6bf090
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.metadata.datatype.BigDecimalSerializer;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.datatype.DoubleSerializer;
+import org.apache.kylin.metadata.datatype.LongSerializer;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class BasicMeasureType extends MeasureType {
+    
+    private final String funcName;
+    private final DataType dataType;
+
+    public BasicMeasureType(String funcName, DataType dataType) {
+        this.funcName = funcName;
+        this.dataType = dataType;
+    }
+
+    @Override
+    public DataType getAggregationDataType() {
+        return dataType;
+    }
+
+    public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
+        if (dataType.isIntegerFamily())
+            return LongSerializer.class;
+        else if (dataType.isDecimal())
+            return BigDecimalSerializer.class;
+        else if (dataType.isNumberFamily())
+            return DoubleSerializer.class;
+        else
+            throw new IllegalArgumentException("No serializer for aggregation type " + dataType);
+    }
+    
+    @Override
+    public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @Override
+    public MeasureIngester<?> newIngester() {
+        if (dataType.isIntegerFamily())
+            return new LongIngester();
+        else if (dataType.isDecimal())
+            return new BigDecimalIngester();
+        else if (dataType.isNumberFamily())
+            return new DoubleIngester();
+        else
+            throw new IllegalArgumentException("No ingester for aggregation type " + dataType);
+    }
+    
+    @Override
+    public MeasureAggregator<?> newAggregator() {
+        if (isSum() || isCount()) {
+            if (dataType.isDecimal())
+                return new BigDecimalSumAggregator();
+            else if (dataType.isIntegerFamily())
+                return new LongSumAggregator();
+            else if (dataType.isNumberFamily())
+                return new DoubleSumAggregator();
+        } else if (isMax()) {
+            if (dataType.isDecimal())
+                return new BigDecimalMaxAggregator();
+            else if (dataType.isIntegerFamily())
+                return new LongMaxAggregator();
+            else if (dataType.isNumberFamily())
+                return new DoubleMaxAggregator();
+        } else if (isMin()) {
+            if (dataType.isDecimal())
+                return new BigDecimalMinAggregator();
+            else if (dataType.isIntegerFamily())
+                return new LongMinAggregator();
+            else if (dataType.isNumberFamily())
+                return new DoubleMinAggregator();
+        }
+        throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + dataType + "'");
+    }
+    
+    private boolean isSum() {
+        return FunctionDesc.FUNC_SUM.equalsIgnoreCase(funcName);
+    }
+
+    private boolean isCount() {
+        return FunctionDesc.FUNC_COUNT.equalsIgnoreCase(funcName);
+    }
+    
+    private boolean isMax() {
+        return FunctionDesc.FUNC_MAX.equalsIgnoreCase(funcName);
+    }
+    
+    private boolean isMin() {
+        return FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName);
+    }
+    
+    @Override
+    public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
new file mode 100644
index 0000000..bb743d6
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.math.BigDecimal;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class BigDecimalIngester extends MeasureIngester<BigDecimal> {
+
+    public BigDecimal valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+        if (values.length > 1)
+            throw new IllegalArgumentException();
+        
+        if (values[0] == null)
+            return new BigDecimal(0);
+        else
+            return new BigDecimal(values[0]);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
new file mode 100644
index 0000000..aa42476
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMaxAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.math.BigDecimal;
+
+import org.apache.kylin.measure.MeasureAggregator;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class BigDecimalMaxAggregator extends MeasureAggregator<BigDecimal> {
+
+    BigDecimal max = null;
+
+    @Override
+    public void reset() {
+        max = null;
+    }
+
+    @Override
+    public void aggregate(BigDecimal value) {
+        if (max == null)
+            max = value;
+        else if (max.compareTo(value) < 0)
+            max = value;
+    }
+
+    @Override
+    public BigDecimal getState() {
+        return max;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessBigDecimalMemBytes();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
new file mode 100644
index 0000000..81193ad
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalMinAggregator.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.math.BigDecimal;
+
+import org.apache.kylin.measure.MeasureAggregator;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class BigDecimalMinAggregator extends MeasureAggregator<BigDecimal> {
+
+    BigDecimal max = null;
+
+    @Override
+    public void reset() {
+        max = null;
+    }
+
+    @Override
+    public void aggregate(BigDecimal value) {
+        if (max == null)
+            max = value;
+        else if (max.compareTo(value) > 0)
+            max = value;
+    }
+
+    @Override
+    public BigDecimal getState() {
+        return max;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessBigDecimalMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
new file mode 100644
index 0000000..5e00c63
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.math.BigDecimal;
+
+import org.apache.kylin.measure.MeasureAggregator;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class BigDecimalSumAggregator extends MeasureAggregator<BigDecimal> {
+
+    BigDecimal sum = new BigDecimal(0);
+
+    @Override
+    public void reset() {
+        sum = new BigDecimal(0);
+    }
+
+    @Override
+    public void aggregate(BigDecimal value) {
+        sum = sum.add(value);
+    }
+
+    @Override
+    public BigDecimal getState() {
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessBigDecimalMemBytes();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
new file mode 100644
index 0000000..506ed19
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.datatype.DoubleMutable;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class DoubleIngester extends MeasureIngester<DoubleMutable> {
+    
+    // avoid repeated object creation
+    private DoubleMutable current = new DoubleMutable();
+
+    public DoubleMutable valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+        if (values.length > 1)
+            throw new IllegalArgumentException();
+        
+        DoubleMutable l = current;
+        if (values[0] == null)
+            l.set(0L);
+        else
+            l.set(Double.parseDouble(values[0]));
+        return l;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
new file mode 100644
index 0000000..25911e8
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMaxAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.DoubleMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class DoubleMaxAggregator extends MeasureAggregator<DoubleMutable> {
+
+    DoubleMutable max = null;
+
+    @Override
+    public void reset() {
+        max = null;
+    }
+
+    @Override
+    public void aggregate(DoubleMutable value) {
+        if (max == null)
+            max = new DoubleMutable(value.get());
+        else if (max.get() < value.get())
+            max.set(value.get());
+    }
+
+    @Override
+    public DoubleMutable getState() {
+        return max;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessDoubleMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
new file mode 100644
index 0000000..be97deb
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleMinAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.DoubleMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class DoubleMinAggregator extends MeasureAggregator<DoubleMutable> {
+
+    DoubleMutable min = null;
+
+    @Override
+    public void reset() {
+        min = null;
+    }
+
+    @Override
+    public void aggregate(DoubleMutable value) {
+        if (min == null)
+            min = new DoubleMutable(value.get());
+        else if (min.get() > value.get())
+            min.set(value.get());
+    }
+
+    @Override
+    public DoubleMutable getState() {
+        return min;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessDoubleMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
new file mode 100644
index 0000000..f276817
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.DoubleMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class DoubleSumAggregator extends MeasureAggregator<DoubleMutable> {
+
+    DoubleMutable sum = new DoubleMutable();
+
+    @Override
+    public void reset() {
+        sum.set(0.0);
+    }
+
+    @Override
+    public void aggregate(DoubleMutable value) {
+        sum.set(sum.get() + value.get());
+    }
+
+    @Override
+    public DoubleMutable getState() {
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessDoubleMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
new file mode 100644
index 0000000..5bf1257
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.datatype.LongMutable;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class LongIngester extends MeasureIngester<LongMutable> {
+    
+    // avoid repeated object creation
+    private LongMutable current = new LongMutable();
+
+    public LongMutable valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+        if (values.length > 1)
+            throw new IllegalArgumentException();
+        
+        LongMutable l = current;
+        if (values[0] == null)
+            l.set(0L);
+        else
+            l.set(Long.parseLong(values[0]));
+        return l;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
new file mode 100644
index 0000000..ca44f15
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMaxAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.LongMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class LongMaxAggregator extends MeasureAggregator<LongMutable> {
+
+    LongMutable max = null;
+
+    @Override
+    public void reset() {
+        max = null;
+    }
+
+    @Override
+    public void aggregate(LongMutable value) {
+        if (max == null)
+            max = new LongMutable(value.get());
+        else if (max.get() < value.get())
+            max.set(value.get());
+    }
+
+    @Override
+    public LongMutable getState() {
+        return max;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessLongMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
new file mode 100644
index 0000000..dadc64e
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongMinAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.LongMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class LongMinAggregator extends MeasureAggregator<LongMutable> {
+
+    LongMutable min = null;
+
+    @Override
+    public void reset() {
+        min = null;
+    }
+
+    @Override
+    public void aggregate(LongMutable value) {
+        if (min == null)
+            min = new LongMutable(value.get());
+        else if (min.get() > value.get())
+            min.set(value.get());
+    }
+
+    @Override
+    public LongMutable getState() {
+        return min;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessLongMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
new file mode 100644
index 0000000..e7fdc9d
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.basic;
+
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.LongMutable;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class LongSumAggregator extends MeasureAggregator<LongMutable> {
+
+    LongMutable sum = new LongMutable();
+
+    @Override
+    public void reset() {
+        sum.set(0);
+    }
+
+    @Override
+    public void aggregate(LongMutable value) {
+        sum.set(sum.get() + value.get());
+    }
+
+    @Override
+    public LongMutable getState() {
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessLongMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregationFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregationFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregationFactory.java
new file mode 100644
index 0000000..13e5520
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregationFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import org.apache.kylin.measure.IMeasureFactory;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.FunctionDesc;
+
+public class HLLCAggregationFactory implements IMeasureFactory {
+
+    @Override
+    public MeasureType createMeasureType(String funcName, String dataType) {
+        if (FunctionDesc.FUNC_COUNT_DISTINCT.equalsIgnoreCase(funcName) == false)
+            throw new IllegalArgumentException();
+        
+        return new HLLCMeasureType(DataType.getType(dataType));
+    }
+
+}