You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/27 10:01:04 UTC

[09/13] incubator-kylin git commit: KYLIN-976 Add ingester; Build part done, in-mem cube test pass

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
new file mode 100644
index 0000000..16563fa
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCAggregator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.measure.MeasureAggregator;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> {
+
+    final int precision;
+    HyperLogLogPlusCounter sum = null;
+
+    public HLLCAggregator(int precision) {
+        this.precision = precision;
+    }
+
+    @Override
+    public void reset() {
+        sum = null;
+    }
+
+    @Override
+    public void aggregate(HyperLogLogPlusCounter value) {
+        if (sum == null)
+            sum = new HyperLogLogPlusCounter(value);
+        else
+            sum.merge(value);
+    }
+
+    @Override
+    public HyperLogLogPlusCounter getState() {
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        // 1024 + 60 returned by AggregationCacheMemSizeTest
+        return 8 // aggregator obj shell
+                + 4 // precision
+                + 8 // ref to HLLC
+                + 8 // HLLC obj shell
+                + 32 + (1 << precision); // HLLC internal
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
new file mode 100644
index 0000000..ee90818
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class HLLCMeasureType extends MeasureType {
+
+    private final DataType dataType;
+
+    public HLLCMeasureType(DataType dataType) {
+        if ("hllc".equals(dataType.getName()) == false)
+            throw new IllegalArgumentException();
+        
+        this.dataType = dataType;
+
+        if (this.dataType.getPrecision() < 10 || this.dataType.getPrecision() > 16)
+            throw new IllegalArgumentException("HLLC precision must be between 10 and 16");
+    }
+
+    @Override
+    public DataType getAggregationDataType() {
+        return dataType;
+    }
+
+    @Override
+    public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
+        return HLLCSerializer.class;
+    }
+
+    @Override
+    public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
+        // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public MeasureIngester<?> newIngester() {
+        return new MeasureIngester<HyperLogLogPlusCounter>() {
+            HyperLogLogPlusCounter current = new HyperLogLogPlusCounter(dataType.getPrecision());
+
+            @Override
+            public HyperLogLogPlusCounter valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+                HyperLogLogPlusCounter hllc = current;
+                hllc.clear();
+                for (String v : values)
+                    hllc.add(v == null ? "__nUlL__" : v);
+                return hllc;
+            }
+        };
+    }
+
+    @Override
+    public MeasureAggregator<?> newAggregator() {
+        if (dataType.isHLLC())
+            return new HLLCAggregator(dataType.getPrecision());
+        else
+            return new LDCAggregator();
+    }
+
+    @Override
+    public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
new file mode 100644
index 0000000..7131201
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCSerializer.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<HyperLogLogPlusCounter> current = new ThreadLocal<HyperLogLogPlusCounter>();
+
+    private int precision;
+
+    public HLLCSerializer(DataType type) {
+        this.precision = type.getPrecision();
+    }
+
+    @Override
+    public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) {
+        try {
+            value.writeRegisters(out);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private HyperLogLogPlusCounter current() {
+        HyperLogLogPlusCounter hllc = current.get();
+        if (hllc == null) {
+            hllc = new HyperLogLogPlusCounter(precision);
+            current.set(hllc);
+        }
+        return hllc;
+    }
+
+    @Override
+    public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
+        HyperLogLogPlusCounter hllc = current();
+        try {
+            hllc.readRegisters(in);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        return hllc;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return current().peekLength(in);
+    }
+
+    @Override
+    public int maxLength() {
+        return current().maxLength();
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return current().maxLength();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/LDCAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/LDCAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/LDCAggregator.java
new file mode 100644
index 0000000..5d96450
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/LDCAggregator.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.LongMutable;
+
+/**
+ * Long Distinct Count
+ */
+@SuppressWarnings("serial")
+public class LDCAggregator extends MeasureAggregator<LongMutable> {
+
+    private static LongMutable ZERO = new LongMutable(0);
+
+    private HLLCAggregator hllAgg = null;
+    private LongMutable state = new LongMutable(0);
+
+    @SuppressWarnings("rawtypes")
+    public void setDependentAggregator(MeasureAggregator agg) {
+        this.hllAgg = (HLLCAggregator) agg;
+    }
+
+    @Override
+    public void reset() {
+    }
+
+    @Override
+    public void aggregate(LongMutable value) {
+    }
+
+    @Override
+    public LongMutable getState() {
+        if (hllAgg == null) {
+            return ZERO;
+        } else {
+            state.set(hllAgg.getState().getCountEstimate());
+            return state;
+        }
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessLongMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
new file mode 100644
index 0000000..9b4c893
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.topn;
+
+import java.util.Map;
+
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.measure.MeasureAggregator;
+
+import com.google.common.collect.Maps;
+
+/**
+ * 
+ */
+@SuppressWarnings("serial")
+public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
+
+    int capacity = 0;
+    TopNCounter<ByteArray> sum = null;
+    Map<ByteArray, Double> sanityCheckMap;
+
+    @Override
+    public void reset() {
+        sum = null;
+    }
+
+    @Override
+    public void aggregate(TopNCounter<ByteArray> value) {
+        if (sum == null) {
+            capacity = value.getCapacity();
+            sum = new TopNCounter<>(capacity);
+            sanityCheckMap = Maps.newHashMap();
+        }
+        sum.merge(value);
+    }
+
+    @Override
+    public TopNCounter<ByteArray> getState() {
+        
+        //sum.retain(capacity);
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return 8 * capacity / 4;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
new file mode 100644
index 0000000..b422316
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNCounterSerializer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.topn;
+
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kylin.common.topn.Counter;
+import org.apache.kylin.common.topn.DoubleDeltaSerializer;
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+
+/**
+ * 
+ */
+public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> {
+
+    private DoubleDeltaSerializer dds = new DoubleDeltaSerializer(3);
+
+    private int precision;
+
+    public TopNCounterSerializer(DataType dataType) {
+        this.precision = dataType.getPrecision();
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        int mark = in.position();
+        @SuppressWarnings("unused")
+        int capacity = in.getInt();
+        int size = in.getInt();
+        int keyLength = in.getInt();
+        dds.deserialize(in);
+        int len = in.position() - mark + keyLength * size;
+        in.position(mark);
+        return len;
+    }
+
+    @Override
+    public int maxLength() {
+        return precision * TopNCounter.EXTRA_SPACE_RATE * (4 + 8);
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return precision * TopNCounter.EXTRA_SPACE_RATE * 8;
+    }
+
+    @Override
+    public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
+        double[] counters = value.getCounters();
+        List<ByteArray> peek = value.peek(1);
+        int keyLength = peek.size() > 0 ? peek.get(0).length() : 0;
+        out.putInt(value.getCapacity());
+        out.putInt(value.size());
+        out.putInt(keyLength);
+        dds.serialize(counters, out);
+        Iterator<Counter<ByteArray>> iterator = value.iterator();
+        while (iterator.hasNext()) {
+            out.put(iterator.next().getItem().array());
+        }
+    }
+
+    @Override
+    public TopNCounter<ByteArray> deserialize(ByteBuffer in) {
+        int capacity = in.getInt();
+        int size = in.getInt();
+        int keyLength = in.getInt();
+        double[] counters = dds.deserialize(in);
+
+        TopNCounter<ByteArray> counter = new TopNCounter<ByteArray>(capacity);
+        ByteArray byteArray;
+        for (int i = 0; i < size; i++) {
+            byteArray = new ByteArray(keyLength);
+            in.get(byteArray.array());
+            counter.offerToHead(byteArray, counters[i]);
+        }
+
+        return counter;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureFactory.java
new file mode 100644
index 0000000..1ceb607
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.topn;
+
+import org.apache.kylin.measure.IMeasureFactory;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.FunctionDesc;
+
+public class TopNMeasureFactory implements IMeasureFactory {
+
+    @Override
+    public MeasureType createMeasureType(String funcName, String dataType) {
+        if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName) == false)
+            throw new IllegalArgumentException();
+        
+        return new TopNMeasureType(DataType.getType(dataType));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
new file mode 100644
index 0000000..1d2c87b
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.topn;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.hllc.HLLCSerializer;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+
+public class TopNMeasureType extends MeasureType {
+
+    private final DataType dataType;
+
+    public TopNMeasureType(DataType dataType) {
+        if ("topn".equals(dataType.getName()) == false)
+            throw new IllegalArgumentException();
+        
+        this.dataType = dataType;
+        
+        if (this.dataType.getPrecision() < 1 || this.dataType.getPrecision() > 1000)
+            throw new IllegalArgumentException("TopN precision must be between 1 and 1000");
+    }
+
+    @Override
+    public DataType getAggregationDataType() {
+        return dataType;
+    }
+
+    @Override
+    public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
+        return HLLCSerializer.class;
+    }
+    
+    @Override
+    public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
+        // TODO Auto-generated method stub
+        
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public MeasureIngester<?> newIngester() {
+        return new MeasureIngester<TopNCounter>() {
+            @Override
+            public TopNCounter valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
+                if (values.length != 2)
+                    throw new IllegalArgumentException();
+                
+                double counter = values[0] == null ? 0 : Double.parseDouble(values[0]);
+                String literal = values[1];
+                
+                // encode literal using dictionary
+                TblColRef literalCol = measureDesc.getFunction().getTopNLiteralColumn();
+                Dictionary<String> dictionary = dictionaryMap.get(literalCol);
+                int keyEncodedValue = dictionary.getIdFromValue(literal);
+
+                ByteArray key = new ByteArray(dictionary.getSizeOfId());
+                BytesUtil.writeUnsigned(keyEncodedValue, key.array(), 0, dictionary.getSizeOfId());
+
+                TopNCounter<ByteArray> topNCounter = new TopNCounter<ByteArray>(dataType.getPrecision() * TopNCounter.EXTRA_SPACE_RATE);
+                topNCounter.offer(key, counter);
+                return topNCounter;
+            }
+        };
+    }
+
+    @Override
+    public MeasureAggregator<?> newAggregator() {
+        return new TopNAggregator();
+    }
+
+    @Override
+    public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    @Override
+    public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
new file mode 100644
index 0000000..134f305
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/BigDecimalSerializer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.BytesUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class BigDecimalSerializer extends DataTypeSerializer<BigDecimal> {
+
+    private static final Logger logger = LoggerFactory.getLogger(BigDecimalSerializer.class);
+
+    final DataType type;
+    final int maxLength;
+
+    int avoidVerbose = 0;
+
+    public BigDecimalSerializer(DataType type) {
+        this.type = type;
+        // see serialize(): 1 byte scale, 1 byte length, assume every 2 digits takes 1 byte
+        this.maxLength = 1 + 1 + (type.getPrecision() + 1) / 2;
+    }
+
+    @Override
+    public void serialize(BigDecimal value, ByteBuffer out) {
+        if (value.scale() > type.getScale()) {
+            if (avoidVerbose % 10000 == 0) {
+                logger.warn("value's scale has exceeded the " + type.getScale() + ", cut it off, to ensure encoded value do not exceed maxLength " + maxLength + " times:" + (avoidVerbose++));
+            }
+            value = value.setScale(type.getScale(), BigDecimal.ROUND_HALF_EVEN);
+        }
+        byte[] bytes = value.unscaledValue().toByteArray();
+        if (bytes.length + 2 > maxLength) {
+            throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
+        }
+
+        BytesUtil.writeVInt(value.scale(), out);
+        BytesUtil.writeVInt(bytes.length, out);
+        out.put(bytes);
+    }
+
+    @Override
+    public BigDecimal deserialize(ByteBuffer in) {
+        int scale = BytesUtil.readVInt(in);
+        int n = BytesUtil.readVInt(in);
+
+        byte[] bytes = new byte[n];
+        in.get(bytes);
+
+        return new BigDecimal(new BigInteger(bytes), scale);
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        int mark = in.position();
+
+        @SuppressWarnings("unused")
+        int scale = BytesUtil.readVInt(in);
+        int n = BytesUtil.readVInt(in);
+        int len = in.position() - mark + n;
+
+        in.position(mark);
+        return len;
+    }
+
+    @Override
+    public int maxLength() {
+        return maxLength;
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return 8;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
new file mode 100644
index 0000000..235c99f
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataType.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ */
+@SuppressWarnings("serial")
+public class DataType implements Serializable {
+
+    // standard sql types, ref: http://www.w3schools.com/sql/sql_datatypes_general.asp
+    public static final String VALID_TYPES_STRING = "any|char|varchar|boolean|binary" //
+            + "|integer|tinyint|smallint|bigint|decimal|numeric|float|real|double" //
+            + "|date|time|datetime|timestamp|byte|int|short|long|string|hllc|topn";
+
+    private static final String TYPE_PATTEN_TAIL = "\\s*" //
+            + "(?:" + "[(]" + "([\\d\\s,]+)" + "[)]" + ")?";
+
+    private static final Pattern TYPE_PATTERN = Pattern.compile( //
+            "(" + VALID_TYPES_STRING + ")" + TYPE_PATTEN_TAIL, Pattern.CASE_INSENSITIVE);
+
+    public static final Set<String> INTEGER_FAMILY = new HashSet<String>();
+    public static final Set<String> NUMBER_FAMILY = new HashSet<String>();
+    public static final Set<String> DATETIME_FAMILY = new HashSet<String>();
+    public static final Set<String> STRING_FAMILY = new HashSet<String>();
+    private static final Map<String, String> LEGACY_TYPE_MAP = new HashMap<String, String>();
+    static {
+        INTEGER_FAMILY.add("tinyint");
+        INTEGER_FAMILY.add("smallint");
+        INTEGER_FAMILY.add("integer");
+        INTEGER_FAMILY.add("bigint");
+
+        NUMBER_FAMILY.addAll(INTEGER_FAMILY);
+        NUMBER_FAMILY.add("float");
+        NUMBER_FAMILY.add("double");
+        NUMBER_FAMILY.add("decimal");
+        NUMBER_FAMILY.add("real");
+        NUMBER_FAMILY.add("numeric");
+
+        DATETIME_FAMILY.add("date");
+        DATETIME_FAMILY.add("time");
+        DATETIME_FAMILY.add("datetime");
+        DATETIME_FAMILY.add("timestamp");
+
+        STRING_FAMILY.add("varchar");
+        STRING_FAMILY.add("char");
+
+        LEGACY_TYPE_MAP.put("byte", "tinyint");
+        LEGACY_TYPE_MAP.put("int", "integer");
+        LEGACY_TYPE_MAP.put("short", "smallint");
+        LEGACY_TYPE_MAP.put("long", "bigint");
+        LEGACY_TYPE_MAP.put("string", "varchar");
+        LEGACY_TYPE_MAP.put("hllc10", "hllc(10)");
+        LEGACY_TYPE_MAP.put("hllc12", "hllc(12)");
+        LEGACY_TYPE_MAP.put("hllc14", "hllc(14)");
+        LEGACY_TYPE_MAP.put("hllc15", "hllc(15)");
+        LEGACY_TYPE_MAP.put("hllc16", "hllc(16)");
+    }
+
+    private static final ConcurrentMap<DataType, DataType> CACHE = new ConcurrentHashMap<DataType, DataType>();
+
+    public static final DataType ANY = DataType.getType("any");
+
+    public static DataType getType(String type) {
+        if (type == null)
+            return null;
+
+        DataType dataType = new DataType(type);
+        DataType cached = CACHE.get(dataType);
+        if (cached == null) {
+            CACHE.put(dataType, dataType);
+            cached = dataType;
+        }
+        return cached;
+    }
+
+    // ============================================================================
+
+    private String name;
+    private int precision;
+    private int scale;
+
+    private DataType(String datatype) {
+        datatype = datatype.trim().toLowerCase();
+        datatype = replaceLegacy(datatype);
+
+        Pattern pattern = TYPE_PATTERN;
+        Matcher m = pattern.matcher(datatype);
+        if (m.matches() == false)
+            throw new IllegalArgumentException("bad data type -- " + datatype + ", does not match " + pattern);
+
+        name = replaceLegacy(m.group(1));
+        precision = -1;
+        scale = -1;
+
+        String leftover = m.group(2);
+        if (leftover != null) {
+            String[] parts = leftover.split("\\s*,\\s*");
+            for (int i = 0; i < parts.length; i++) {
+                int n;
+                try {
+                    n = Integer.parseInt(parts[i]);
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("bad data type -- " + datatype + ", precision/scale not numeric");
+                }
+                if (i == 0)
+                    precision = n;
+                else if (i == 1)
+                    scale = n;
+                else
+                    throw new IllegalArgumentException("bad data type -- " + datatype + ", too many precision/scale parts");
+            }
+        }
+
+        // FIXME 256 for unknown string precision
+        if ((name.equals("char") || name.equals("varchar")) && precision == -1) {
+            precision = 256; // to save memory at frontend, e.g. tableau will
+                             // allocate memory according to this
+        }
+
+        // FIXME (19,4) for unknown decimal precision
+        if ((name.equals("decimal") || name.equals("numeric")) && precision == -1) {
+            precision = 19;
+            scale = 4;
+        }
+    }
+
+    private String replaceLegacy(String str) {
+        String replace = LEGACY_TYPE_MAP.get(str);
+        return replace == null ? str : replace;
+    }
+
+    public int getStorageBytesEstimate() {
+        return DataTypeSerializer.create(this).getStorageBytesEstimate();
+    }
+
+    public boolean isStringFamily() {
+        return STRING_FAMILY.contains(name);
+    }
+
+    public boolean isIntegerFamily() {
+        return INTEGER_FAMILY.contains(name);
+    }
+
+    public boolean isNumberFamily() {
+        return NUMBER_FAMILY.contains(name);
+    }
+
+    public boolean isDateTimeFamily() {
+        return DATETIME_FAMILY.contains(name);
+    }
+
+    public boolean isDate() {
+        return name.equals("date");
+    }
+
+    public boolean isTime() {
+        return name.equals("time");
+    }
+
+    public boolean isTimestamp() {
+        return name.equals("timestamp");
+    }
+
+    public boolean isDatetime() {
+        return name.equals("datetime");
+    }
+
+    public boolean isTinyInt() {
+        return name.equals("tinyint");
+    }
+
+    public boolean isSmallInt() {
+        return name.equals("smallint");
+    }
+
+    public boolean isInt() {
+        return name.equals("integer");
+    }
+
+    public boolean isBigInt() {
+        return name.equals("bigint");
+    }
+
+    public boolean isFloat() {
+        return name.equals("float");
+    }
+
+    public boolean isDouble() {
+        return name.equals("double");
+    }
+
+    public boolean isDecimal() {
+        return name.equals("decimal");
+    }
+
+    public boolean isHLLC() {
+        return name.equals("hllc");
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public int getPrecision() {
+        return precision;
+    }
+
+    public int getScale() {
+        return scale;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + ((name == null) ? 0 : name.hashCode());
+        result = prime * result + precision;
+        result = prime * result + scale;
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj)
+            return true;
+        if (obj == null)
+            return false;
+        if (getClass() != obj.getClass())
+            return false;
+        DataType other = (DataType) obj;
+        if (name == null) {
+            if (other.name != null)
+                return false;
+        } else if (!name.equals(other.name))
+            return false;
+        if (precision != other.precision)
+            return false;
+        if (scale != other.scale)
+            return false;
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        if (precision < 0 && scale < 0)
+            return name;
+        else if (scale < 0)
+            return name + "(" + precision + ")";
+        else
+            return name + "(" + precision + "," + scale + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
new file mode 100644
index 0000000..fd3121f
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.common.util.BytesSerializer;
+import org.apache.kylin.measure.hllc.HLLCSerializer;
+import org.apache.kylin.measure.topn.TopNCounterSerializer;
+
+import com.google.common.collect.Maps;
+
+/**
+ * @author yangli9
+ * 
+ * Note: the implementations MUST be thread-safe.
+ */
+abstract public class DataTypeSerializer<T> implements BytesSerializer<T> {
+
+    final static Map<String, Class<?>> implementations;
+    static {
+        HashMap<String, Class<?>> impl = Maps.newHashMap();
+        impl.put("varchar", StringSerializer.class);
+        impl.put("decimal", BigDecimalSerializer.class);
+        impl.put("double", DoubleSerializer.class);
+        impl.put("float", DoubleSerializer.class);
+        impl.put("bigint", LongSerializer.class);
+        impl.put("long", LongSerializer.class);
+        impl.put("integer", LongSerializer.class);
+        impl.put("int", LongSerializer.class);
+        impl.put("smallint", LongSerializer.class);
+        impl.put("date", DateTimeSerializer.class);
+        impl.put("datetime", DateTimeSerializer.class);
+        impl.put("timestamp", DateTimeSerializer.class);
+        impl.put("topn", TopNCounterSerializer.class);
+        impl.put("hllc", HLLCSerializer.class);
+        implementations = Collections.unmodifiableMap(impl);
+    }
+    
+    public static boolean hasRegistered(String dataTypeName) {
+        return implementations.containsKey(dataTypeName);
+    }
+    
+    public static void register(String dataTypeName, Class<? extends DataTypeSerializer<?>> impl) {
+        implementations.put(dataTypeName, impl);
+    }
+
+    public static DataTypeSerializer<?> create(String dataType) {
+        return create(DataType.getType(dataType));
+    }
+
+    public static DataTypeSerializer<?> create(DataType type) {
+        Class<?> clz = implementations.get(type.getName());
+        if (clz == null)
+            throw new RuntimeException("No DataTypeSerializer for type " + type);
+
+        try {
+            return (DataTypeSerializer<?>) clz.getConstructor(DataType.class).newInstance(type);
+        } catch (Exception e) {
+            throw new RuntimeException(e); // never happen
+        }
+    }
+    
+    /** peek into buffer and return the length of serialization */
+    abstract public int peekLength(ByteBuffer in);
+
+    /** return the max number of bytes to the longest serialization */
+    abstract public int maxLength();
+
+    /** get an estimate of size in bytes of the serialized data */
+    abstract public int getStorageBytesEstimate();
+
+    /** convert from obj to string */
+    public String toString(T value) {
+        if (value == null)
+            return "NULL";
+        else
+            return value.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
new file mode 100644
index 0000000..4f7935c
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DateTimeSerializer.java
@@ -0,0 +1,49 @@
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+
+public class DateTimeSerializer extends DataTypeSerializer<LongMutable> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+
+    public DateTimeSerializer(DataType type) {
+    }
+
+    @Override
+    public void serialize(LongMutable value, ByteBuffer out) {
+        out.putLong(value.get());
+    }
+
+    private LongMutable current() {
+        LongMutable l = current.get();
+        if (l == null) {
+            l = new LongMutable();
+            current.set(l);
+        }
+        return l;
+    }
+
+    @Override
+    public LongMutable deserialize(ByteBuffer in) {
+        LongMutable l = current();
+        l.set(in.getLong());
+        return l;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return 8;
+    }
+
+    @Override
+    public int maxLength() {
+        return 8;
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return 8;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleMutable.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleMutable.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleMutable.java
new file mode 100644
index 0000000..5bd75e8
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleMutable.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public class DoubleMutable implements Comparable<DoubleMutable>, Serializable {
+
+    private double v;
+
+    public DoubleMutable() {
+        this(0);
+    }
+
+    public DoubleMutable(double v) {
+        set(v);
+    }
+
+    public double get() {
+        return v;
+    }
+
+    public void set(double v) {
+        this.v = v;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof DoubleMutable)) {
+            return false;
+        }
+        DoubleMutable other = (DoubleMutable) o;
+        return this.v == other.v;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int) Double.doubleToLongBits(v);
+    }
+
+    @Override
+    public int compareTo(DoubleMutable o) {
+        return (v < o.v ? -1 : (v == o.v ? 0 : 1));
+    }
+
+    @Override
+    public String toString() {
+        return Double.toString(v);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
new file mode 100644
index 0000000..5accf1d
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DoubleSerializer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+
+/**
+ */
+public class DoubleSerializer extends DataTypeSerializer<DoubleMutable> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<DoubleMutable> current = new ThreadLocal<DoubleMutable>();
+
+    public DoubleSerializer(DataType type) {
+    }
+
+    @Override
+    public void serialize(DoubleMutable value, ByteBuffer out) {
+        out.putDouble(value.get());
+    }
+
+    private DoubleMutable current() {
+        DoubleMutable d = current.get();
+        if (d == null) {
+            d = new DoubleMutable();
+            current.set(d);
+        }
+        return d;
+    }
+
+    @Override
+    public DoubleMutable deserialize(ByteBuffer in) {
+        DoubleMutable d = current();
+        d.set(in.getDouble());
+        return d;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return 8;
+    }
+
+    @Override
+    public int maxLength() {
+        return 8;
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return 8;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongMutable.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongMutable.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongMutable.java
new file mode 100644
index 0000000..b978049
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongMutable.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public class LongMutable implements Comparable<LongMutable>, Serializable {
+
+    private long v;
+
+    public LongMutable() {
+        this(0);
+    }
+
+    public LongMutable(long v) {
+        set(v);
+    }
+
+    public long get() {
+        return v;
+    }
+
+    public void set(long v) {
+        this.v = v;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof LongMutable)) {
+            return false;
+        }
+        LongMutable other = (LongMutable) o;
+        return this.v == other.v;
+    }
+
+    @Override
+    public int hashCode() {
+        return (int) v;
+    }
+
+    @Override
+    public int compareTo(LongMutable o) {
+        long thisValue = this.v;
+        long thatValue = o.v;
+        return (thisValue < thatValue ? -1 : (thisValue == thatValue ? 0 : 1));
+    }
+
+    @Override
+    public String toString() {
+        return Long.toString(v);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
new file mode 100644
index 0000000..777f494
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/LongSerializer.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ */
+public class LongSerializer extends DataTypeSerializer<LongMutable> {
+
+    // be thread-safe and avoid repeated obj creation
+    private ThreadLocal<LongMutable> current = new ThreadLocal<LongMutable>();
+
+    public LongSerializer(DataType type) {
+    }
+
+    @Override
+    public void serialize(LongMutable value, ByteBuffer out) {
+        BytesUtil.writeVLong(value.get(), out);
+    }
+
+    private LongMutable current() {
+        LongMutable l = current.get();
+        if (l == null) {
+            l = new LongMutable();
+            current.set(l);
+        }
+        return l;
+    }
+
+    @Override
+    public LongMutable deserialize(ByteBuffer in) {
+        LongMutable l = current();
+        l.set(BytesUtil.readVLong(in));
+        return l;
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        int mark = in.position();
+
+        BytesUtil.readVLong(in);
+        int len = in.position() - mark;
+
+        in.position(mark);
+        return len;
+    }
+
+    @Override
+    public int maxLength() {
+        return 9; // vlong: 1 + 8
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return 5;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/StringSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/StringSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/StringSerializer.java
new file mode 100644
index 0000000..14c8909
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/StringSerializer.java
@@ -0,0 +1,48 @@
+package org.apache.kylin.metadata.datatype;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.BytesUtil;
+
+public class StringSerializer extends DataTypeSerializer<String> {
+
+    final DataType type;
+    final int maxLength;
+
+    public StringSerializer(DataType type) {
+        this.type = type;
+        // see serialize(): 2 byte length, rest is String.toBytes()
+        this.maxLength = 2 + type.getPrecision();
+    }
+
+    @Override
+    public void serialize(String value, ByteBuffer out) {
+        int start = out.position();
+
+        BytesUtil.writeUTFString(value, out);
+
+        if (out.position() - start > maxLength)
+            throw new IllegalArgumentException("'" + value + "' exceeds the expected length for type " + type);
+    }
+
+    @Override
+    public String deserialize(ByteBuffer in) {
+        return BytesUtil.readUTFString(in);
+    }
+
+    @Override
+    public int peekLength(ByteBuffer in) {
+        return BytesUtil.peekByteArrayLength(in);
+    }
+
+    @Override
+    public int maxLength() {
+        return maxLength;
+    }
+
+    @Override
+    public int getStorageBytesEstimate() {
+        return maxLength;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
index 0e687f7..b894fd3 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
@@ -5,8 +5,8 @@ import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.collect.Maps;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
index f03e736..2ddc75a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
@@ -21,7 +21,7 @@ package org.apache.kylin.metadata.model;
 import java.io.Serializable;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index d22d0a3..0c36873 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -21,7 +21,7 @@ package org.apache.kylin.metadata.model;
 import java.util.ArrayList;
 import java.util.Collection;
 
-import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
@@ -171,6 +171,14 @@ public class FunctionDesc {
         this.parameter = parameter;
     }
 
+    public int getParameterCount() {
+        int count = 0;
+        for (ParameterDesc p = parameter; p != null; p = p.getNextParameter()) {
+            count++;
+        }
+        return count;
+    }
+    
     public DataType getSQLType() {
         if (isCountDistinct() || isTopN())
             return DataType.ANY;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
index 618d25a..1561b1f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
@@ -18,9 +18,6 @@
 
 package org.apache.kylin.metadata.model;
 
-import java.util.Collections;
-import java.util.List;
-
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -40,15 +37,6 @@ public class MeasureDesc {
     @JsonProperty("dependent_measure_ref")
     private String dependentMeasureRef;
 
-    public List<TblColRef> getColumnsNeedDictionary() {
-        // measure could store literal values using dictionary encoding to save space, like TopN
-        if (function.isTopN()) {
-            return Collections.singletonList(function.getTopNLiteralColumn());
-        } else {
-            return Collections.emptyList();
-        }
-    }
-
     public int getId() {
         return id;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index de27145..61ba73b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -21,7 +21,7 @@ package org.apache.kylin.metadata.model;
 import java.io.Serializable;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
 
 /**
  */

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
index 4a8c5d1..e52bf3b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigestUtil.java
@@ -1,7 +1,7 @@
 package org.apache.kylin.metadata.realization;
 
-import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
new file mode 100644
index 0000000..6d1a7b8
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
@@ -0,0 +1,46 @@
+package org.apache.kylin.aggregation.topn;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.measure.topn.TopNCounterSerializer;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * 
+ */
+public class TopNCounterSerializerTest {
+
+    private static TopNCounterSerializer serializer = new TopNCounterSerializer(DataType.getType("topn(10)"));
+
+    @Test
+    public void testSerialization() {
+        TopNCounter<ByteArray> vs = new TopNCounter<ByteArray>(50);
+        Integer[] stream = { 1, 1, 2, 9, 1, 2, 3, 7, 7, 1, 3, 1, 1 };
+        for (Integer i : stream) {
+            vs.offer(new ByteArray(Bytes.toBytes(i)));
+        }
+
+        ByteBuffer out = ByteBuffer.allocate(1024);
+        serializer.serialize(vs, out);
+        
+        byte[] copyBytes = new byte[out.position()];
+        System.arraycopy(out.array(), 0, copyBytes, 0, out.position());
+
+        ByteBuffer in = ByteBuffer.wrap(copyBytes);
+        TopNCounter<ByteArray> vsNew = serializer.deserialize(in);
+
+        Assert.assertEquals(vs.toString(), vsNew.toString());
+
+    }
+    
+    @Test
+    public void testValueOf() {
+        // FIXME need a good unit test for valueOf()
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java
new file mode 100644
index 0000000..f920ee7
--- /dev/null
+++ b/core-metadata/src/test/java/org/apache/kylin/metadata/datatype/BigDecimalSerializerTest.java
@@ -0,0 +1,53 @@
+package org.apache.kylin.metadata.datatype;
+
+import static org.junit.Assert.*;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.metadata.datatype.BigDecimalSerializer;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ */
+public class BigDecimalSerializerTest {
+
+    private static BigDecimalSerializer bigDecimalSerializer;
+
+    @BeforeClass
+    public static void beforeClass() {
+        bigDecimalSerializer = new BigDecimalSerializer(DataType.getType("decimal"));
+    }
+
+    @Test
+    public void testNormal() {
+        BigDecimal input = new BigDecimal("1234.1234");
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        buffer.mark();
+        bigDecimalSerializer.serialize(input, buffer);
+        buffer.reset();
+        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
+        assertEquals(input, output);
+    }
+
+    @Test
+    public void testScaleOutOfRange() {
+        BigDecimal input = new BigDecimal("1234.1234567890");
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        buffer.mark();
+        bigDecimalSerializer.serialize(input, buffer);
+        buffer.reset();
+        BigDecimal output = bigDecimalSerializer.deserialize(buffer);
+        assertEquals(input.setScale(bigDecimalSerializer.type.getScale(), BigDecimal.ROUND_HALF_EVEN), output);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testOutOfPrecision() {
+        BigDecimal input = new BigDecimal("66855344214907231736.4924");
+        ByteBuffer buffer = ByteBuffer.allocate(256);
+        bigDecimalSerializer.serialize(input, buffer);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
index fbd6f97..0dc1afa 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/ColumnValueRange.java
@@ -23,8 +23,8 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.kv.RowKeyColumnOrder;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
index 4f011cf..3bca687 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
@@ -24,11 +24,11 @@ import java.util.List;
 
 import net.sf.ehcache.pool.sizeof.annotations.IgnoreSizeOf;
 
-import org.apache.kylin.common.datatype.DoubleMutable;
-import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.datatype.DoubleMutable;
+import org.apache.kylin.metadata.datatype.LongMutable;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.ITuple;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java b/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java
index aaf07fe..d32f171 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/translate/ColumnValueRangeTest.java
@@ -6,7 +6,7 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.dict.StringBytesConverter;
 import org.apache.kylin.dict.TrieDictionaryBuilder;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 3fa7d5c..45cc88e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -6,13 +6,11 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.collect.Maps;
-
 import org.apache.hadoop.io.Text;
-import org.apache.kylin.aggregation.MeasureCodec;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesSplitter;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -22,11 +20,16 @@ import org.apache.kylin.cube.kv.AbstractRowKeyEncoder;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.*;
+import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,6 +52,8 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
     protected String intermediateTableRowDelimiter;
     protected byte byteRowDelimiter;
     protected int counter;
+    protected MeasureIngester<?>[] aggrIngesters;
+    protected Map<TblColRef, Dictionary<String>> dictionaryMap;
     protected Object[] measures;
     protected byte[][] keyBytesBuf;
     protected BytesSplitter bytesSplitter;
@@ -58,7 +63,6 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
     private Text outputKey = new Text();
     private Text outputValue = new Text();
     private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-    private Map<Integer, Dictionary<String>> topNLiteralColDictMap;
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -93,25 +97,12 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
         int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
         keyBytesBuf = new byte[colCount][];
 
-        initTopNLiteralColDictionaryMap();
+        aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+        dictionaryMap = cubeSegment.buildDictionaryMap();
+        
         initNullBytes();
     }
-
-    private void initTopNLiteralColDictionaryMap() {
-        topNLiteralColDictMap = Maps.newHashMap();
-        for (int measureIdx = 0; measureIdx < measures.length; measureIdx++) {
-            MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx);
-            FunctionDesc func = measureDesc.getFunction();
-            if (func.isTopN()) {
-                int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
-                int literalColIdx = flatTableIdx[flatTableIdx.length - 1];
-                TblColRef literalCol = func.getTopNLiteralColumn();
-                Dictionary<String> dictionary = (Dictionary<String>) cubeSegment.getDictionary(literalCol);
-                topNLiteralColDictMap.put(literalColIdx, dictionary);
-            }
-        }
-    }
-
+    
     private void initNullBytes() {
         nullBytes = Lists.newArrayList();
         nullBytes.add(HIVE_NULL);
@@ -146,72 +137,46 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
     private void buildValue(SplittedBytes[] splitBuffers) {
 
         for (int i = 0; i < measures.length; i++) {
-            byte[] valueBytes = getValueBytes(splitBuffers, i);
-            measures[i] = measureCodec.getSerializer(i).valueOf(valueBytes);
+            measures[i] = buildValueOf(i, splitBuffers);
         }
 
         valueBuf.clear();
         measureCodec.encode(measures, valueBuf);
     }
 
-    private byte[] getValueBytes(SplittedBytes[] splitBuffers, int measureIdx) {
-        MeasureDesc desc = cubeDesc.getMeasures().get(measureIdx);
-        FunctionDesc func = desc.getFunction();
-        ParameterDesc paramDesc = func.getParameter();
-        int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
-
-        byte[] result = null;
-
-        // constant
-        if (flatTableIdx == null) {
-            result = Bytes.toBytes(paramDesc.getValue());
-        }
-        // count and count distinct
-        else if (func.isCount() || func.isHolisticCountDistinct()) {
-            // note for holistic count distinct, this value will be ignored
-            result = ONE;
-        }
-        // topN, need encode the key column
-        else if (func.isTopN()) {
-            // encode the key column with dict, and get the counter column;
-            int keyColIndex = flatTableIdx[flatTableIdx.length - 1];
-            Dictionary<String> literalColDict = topNLiteralColDictMap.get(keyColIndex);
-            int keyColEncoded = literalColDict.getIdFromValue(Bytes.toString(splitBuffers[keyColIndex].value));
-            valueBuf.clear();
-            valueBuf.putInt(literalColDict.getSizeOfId());
-            valueBuf.putInt(keyColEncoded);
-            if (flatTableIdx.length == 1) {
-                // only literalCol, use 1.0 as counter
-                valueBuf.putDouble(1.0);
+    private Object buildValueOf(int idxOfMeasure, SplittedBytes[] splitBuffers) {
+        MeasureDesc measure = cubeDesc.getMeasures().get(idxOfMeasure);
+        FunctionDesc function = measure.getFunction();
+        int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure];
+        
+        int paramCount = function.getParameterCount();
+        String[] inputToMeasure = new String[paramCount];
+
+        // pick up parameter values
+        ParameterDesc param = function.getParameter();
+        int colParamIdx = 0; // index among parameters of column type
+        for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
+            String value;
+            if (function.isCount() || function.isHolisticCountDistinct()) {
+                // note for holistic count distinct, this value will be ignored
+                value = "1";
+            } else if (param.isColumnType()) {
+                value = getCell(colIdxOnFlatTable[colParamIdx++], splitBuffers);
             } else {
-                // get the counter column value
-                valueBuf.putDouble(Double.valueOf(Bytes.toString(splitBuffers[flatTableIdx[0]].value)));
-            }
-
-            result = valueBuf.array();
-
-        }
-        // normal case, concat column values
-        else {
-            // for multiple columns, their values are joined
-            for (int i = 0; i < flatTableIdx.length; i++) {
-                SplittedBytes split = splitBuffers[flatTableIdx[i]];
-                if (result == null) {
-                    result = Arrays.copyOf(split.value, split.length);
-                } else {
-                    byte[] newResult = new byte[result.length + split.length];
-                    System.arraycopy(result, 0, newResult, 0, result.length);
-                    System.arraycopy(split.value, 0, newResult, result.length, split.length);
-                    result = newResult;
-                }
+                value = param.getValue();
             }
+            inputToMeasure[i] = value;
         }
+        
+        return aggrIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
+    }
 
-        if (isNull(result)) {
-            result = null;
-        }
-
-        return result;
+    private String getCell(int i, SplittedBytes[] splitBuffers) {
+        byte[] bytes = Arrays.copyOf(splitBuffers[i].value, splitBuffers[i].length);
+        if (isNull(bytes))
+            return null;
+        else
+            return Bytes.toString(bytes);
     }
 
     protected void outputKV(Context context) throws IOException, InterruptedException {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
index 0f94dca..381b07c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidReducer.java
@@ -23,8 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.List;
 
 import org.apache.hadoop.io.Text;
-import org.apache.kylin.aggregation.MeasureAggregators;
-import org.apache.kylin.aggregation.MeasureCodec;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.kv.RowConstants;
@@ -32,6 +30,8 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.KylinReducer;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index d724c76..752c01d 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -16,20 +16,18 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.inmemcubing.DoggedCubeBuilder;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -63,7 +61,7 @@ public class InMemCuboidMapper<KEYIN> extends KylinMapper<KEYIN, Object, ByteArr
         cubeSegment = cube.getSegment(segmentName, SegmentStatusEnum.NEW);
         flatTableInputFormat = MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat();
 
-        Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+        Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap();
 
         // dictionary
         for (TblColRef col : cubeDesc.getAllColumnsNeedDictionary()) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
index ab87b21..c35e77f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidReducer.java
@@ -3,8 +3,6 @@ package org.apache.kylin.engine.mr.steps;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.kylin.aggregation.MeasureAggregators;
-import org.apache.kylin.aggregation.MeasureCodec;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -16,6 +14,8 @@ import org.apache.kylin.engine.mr.KylinReducer;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.MeasureAggregators;
+import org.apache.kylin.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index 8d00084..bc1c883 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -23,12 +23,12 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 
-import org.apache.kylin.aggregation.MeasureCodec;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.topn.Counter;
 import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeInstance;
@@ -40,7 +40,6 @@ import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.kv.RowKeyEncoder;
 import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.engine.mr.ByteArrayWritable;
 import org.apache.kylin.engine.mr.IMROutput2.IMRStorageInputFormat;
@@ -48,6 +47,7 @@ import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index d08d2a4..c0277b5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -27,12 +27,12 @@ import java.util.regex.Pattern;
 
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
-import org.apache.kylin.aggregation.MeasureCodec;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.topn.Counter;
 import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.SplittedBytes;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -43,11 +43,11 @@ import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.kv.RowKeyEncoder;
 import org.apache.kylin.cube.kv.RowKeyEncoderProvider;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
index b73fda4..84fd46b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryStep.java
@@ -32,7 +32,6 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.DimensionDesc;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.job.exception.ExecuteException;