You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/22 06:13:10 UTC

[20/47] incubator-kylin git commit: KYLIN-875 rename modules: core-common, core-cube, core-dictionary, core-cube

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
new file mode 100644
index 0000000..a7705d3
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/ExtractTupleFilter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.filter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
+
+
+/**
+ * 
+ * @author xjiang
+ * 
+ */
+public class ExtractTupleFilter extends TupleFilter {
+
+    private int date;
+    private List<Object> values;
+
+    public ExtractTupleFilter(FilterOperatorEnum op) {
+        super(new ArrayList<TupleFilter>(3), op);
+        assert (op == FilterOperatorEnum.EXTRACT);
+        this.values = new ArrayList<Object>(1);
+        this.values.add(null);
+        this.date = 0;
+    }
+
+    @Override
+    public String toString() {
+        return "ExtractTupleFilter=[children=" + this.children + "]";
+    }
+
+    @Override
+    public boolean isEvaluable() {
+        return false;
+    }
+
+    @Override
+    public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
+        // extract tuple value
+        String extractType = null;
+        String tupleValue = null;
+        for (TupleFilter filter : this.children) {
+            filter.evaluate(tuple, cs);
+            if (filter instanceof ConstantTupleFilter) {
+                tupleValue = filter.getValues().iterator().next().toString();
+            } else if (filter instanceof CompareTupleFilter) {
+                extractType = filter.getValues().iterator().next().toString();
+            }
+        }
+
+        // extract date
+        this.date = extractDate(extractType, Integer.valueOf(tupleValue));
+        return true;
+    }
+
+    private int extractDate(String type, int inDate) {
+        // this shifts the epoch back to astronomical year -4800 instead of the
+        // start of the Christian era in year AD 1 of the proleptic Gregorian
+        // calendar.
+        int j = inDate + 32044;
+        int g = j / 146097;
+        int dg = j % 146097;
+        int c = (dg / 36524 + 1) * 3 / 4;
+        int dc = dg - c * 36524;
+        int b = dc / 1461;
+        int db = dc % 1461;
+        int a = (db / 365 + 1) * 3 / 4;
+        int da = db - a * 365;
+
+        // integer number of full years elapsed since March 1, 4801 BC
+        int y = g * 400 + c * 100 + b * 4 + a;
+        // integer number of full months elapsed since the last March 1
+        int m = (da * 5 + 308) / 153 - 2;
+        // number of days elapsed since day 1 of the month
+        int d = da - (m + 4) * 153 / 5 + 122;
+        int year = y - 4800 + (m + 2) / 12;
+        int month = (m + 2) % 12 + 1;
+        int day = d + 1;
+        if ("YEAR".equalsIgnoreCase(type)) {
+            return year;
+        }
+        if ("MONTH".equalsIgnoreCase(type)) {
+            return month;
+        }
+        if ("DAY".equalsIgnoreCase(type)) {
+            return day;
+        }
+        return -1;
+    }
+
+    @Override
+    public Collection<?> getValues() {
+        this.values.set(0, String.valueOf(this.date));
+        return this.values;
+    }
+
+    @Override
+    public byte[] serialize(IFilterCodeSystem<?> cs) {
+        return new byte[0];
+    }
+
+    @Override
+    public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/filter/IFilterCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/IFilterCodeSystem.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/IFilterCodeSystem.java
new file mode 100644
index 0000000..1f94bb8
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/IFilterCodeSystem.java
@@ -0,0 +1,31 @@
+package org.apache.kylin.metadata.filter;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+/**
+ * Decides how constant values are coded and compared.
+ * 
+ * TupleFilter are involved in both query engine and coprocessor. In query engine, the values are strings.
+ * In coprocessor, the values are dictionary IDs.
+ * 
+ * The type parameter is the java type of code, which should be bytes. However some legacy implementation
+ * stores code as String.
+ * 
+ * @author yangli9
+ */
+public interface IFilterCodeSystem<T> extends Comparator<T> {
+    
+    /** if given code represents the NULL value */
+    boolean isNull(T code);
+
+    /** compare two values by their codes */
+    // int compare(T code1, T code2);
+
+    /** write code to buffer */
+    void serialize(T code, ByteBuffer buf);
+
+    /** read code from buffer */
+    T deserialize(ByteBuffer buf);
+    
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
new file mode 100644
index 0000000..4d38565
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/LogicalTupleFilter.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.filter;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
+
+
+public class LogicalTupleFilter extends TupleFilter {
+
+    public LogicalTupleFilter(FilterOperatorEnum op) {
+        super(new ArrayList<TupleFilter>(2), op);
+        boolean opGood = (op == FilterOperatorEnum.AND || op == FilterOperatorEnum.OR || op == FilterOperatorEnum.NOT);
+        if (opGood == false)
+            throw new IllegalArgumentException("Unsupported operator " + op);
+    }
+
+    private LogicalTupleFilter(List<TupleFilter> filters, FilterOperatorEnum op) {
+        super(filters, op);
+    }
+
+    @Override
+    public TupleFilter copy() {
+        List<TupleFilter> cloneChildren = new LinkedList<TupleFilter>(children);
+        TupleFilter cloneTuple = new LogicalTupleFilter(cloneChildren, operator);
+        return cloneTuple;
+    }
+
+    @Override
+    public TupleFilter reverse() {
+        switch (operator) {
+        case NOT:
+            assert (children.size() == 1);
+            return children.get(0);
+        case AND:
+        case OR:
+            LogicalTupleFilter reverse = new LogicalTupleFilter(REVERSE_OP_MAP.get(operator));
+            for (TupleFilter child : children) {
+                reverse.addChild(child.reverse());
+            }
+            return reverse;
+        default:
+            throw new IllegalStateException();
+        }
+    }
+
+    @Override
+    public String toString() {
+        return operator + " " + children;
+    }
+
+    @Override
+    public boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
+        switch (this.operator) {
+        case AND:
+            return evalAnd(tuple, cs);
+        case OR:
+            return evalOr(tuple, cs);
+        case NOT:
+            return evalNot(tuple, cs);
+        default:
+            return false;
+        }
+    }
+
+    private boolean evalAnd(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
+        for (TupleFilter filter : this.children) {
+            if (!filter.evaluate(tuple, cs)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private boolean evalOr(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
+        for (TupleFilter filter : this.children) {
+            if (filter.evaluate(tuple, cs)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private boolean evalNot(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs) {
+        return !this.children.get(0).evaluate(tuple, cs);
+    }
+
+    @Override
+    public Collection<?> getValues() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public boolean isEvaluable() {
+        return true;
+    }
+
+    @Override
+    public byte[] serialize(IFilterCodeSystem<?> cs) {
+        return new byte[0];
+    }
+
+    @Override
+    public void deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java
new file mode 100644
index 0000000..5b0040d
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/StringCodeSystem.java
@@ -0,0 +1,40 @@
+package org.apache.kylin.metadata.filter;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ * A simple code system where all values are strings and conform to string comparison system.
+ * 
+ * @author yangli9
+ */
+public class StringCodeSystem implements IFilterCodeSystem<String> {
+    
+    public static final StringCodeSystem INSTANCE = new StringCodeSystem();
+    
+    protected StringCodeSystem() {
+        // singleton
+    }
+
+    @Override
+    public boolean isNull(String value) {
+        return value == null;
+    }
+
+    @Override
+    public int compare(String tupleValue, String constValue) {
+        return tupleValue.compareTo(constValue);
+    }
+
+    @Override
+    public void serialize(String value, ByteBuffer buffer) {
+        BytesUtil.writeUTFString( value, buffer);
+    }
+
+    @Override
+    public String deserialize(ByteBuffer buffer) {
+        return BytesUtil.readUTFString(buffer);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
new file mode 100644
index 0000000..391768a
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TimeConditionLiteralsReplacer.java
@@ -0,0 +1,70 @@
+package org.apache.kylin.metadata.filter;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.util.Collection;
+import java.util.IdentityHashMap;
+import java.util.List;
+import java.util.Set;
+
+
+public class TimeConditionLiteralsReplacer implements TupleFilterSerializer.Decorator {
+
+    private IdentityHashMap<TupleFilter, DataType> dateCompareTupleChildren;
+
+    public TimeConditionLiteralsReplacer(TupleFilter root) {
+        this.dateCompareTupleChildren = Maps.newIdentityHashMap();
+    }
+
+    @Override
+    public TupleFilter onSerialize(TupleFilter filter) {
+
+        if (filter instanceof CompareTupleFilter) {
+            CompareTupleFilter cfilter = (CompareTupleFilter) filter;
+            List<? extends TupleFilter> children = cfilter.getChildren();
+
+            if (children == null || children.size() < 1) {
+                throw new IllegalArgumentException("Illegal compare filter: " + cfilter);
+            }
+
+            TblColRef col = cfilter.getColumn();
+            if (col == null || !col.getType().isDateTimeFamily()) {
+                return cfilter;
+            }
+
+            for (TupleFilter child : filter.getChildren()) {
+                dateCompareTupleChildren.put(child, col.getType());
+            }
+        }
+
+        if (filter instanceof ConstantTupleFilter && dateCompareTupleChildren.containsKey(filter)) {
+            ConstantTupleFilter constantTupleFilter = (ConstantTupleFilter) filter;
+            Set<String> newValues = Sets.newHashSet();
+            DataType columnType = dateCompareTupleChildren.get(filter);
+
+            for (String value : (Collection<String>) constantTupleFilter.getValues()) {
+                newValues.add(formatTime(Long.valueOf(value), columnType));
+            }
+            return new ConstantTupleFilter(newValues);
+        }
+        return filter;
+    }
+
+    private String formatTime(long millis, DataType dataType) {
+        if (dataType.isDatetime() || dataType.isTime()) {
+            throw new RuntimeException("Datetime and time type are not supported yet");
+        }
+
+        if (dataType.isTimestamp()) {
+            return DateFormat.formatToTimeStr(millis);
+        } else if (dataType.isDate()) {
+            return DateFormat.formatToDateStr(millis);
+        } else {
+            throw new RuntimeException("Unknown type " + dataType + " to formatTime");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TsConditionEraser.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TsConditionEraser.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TsConditionEraser.java
new file mode 100644
index 0000000..96ff81c
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TsConditionEraser.java
@@ -0,0 +1,63 @@
+package org.apache.kylin.metadata.filter;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import java.util.IdentityHashMap;
+
+/**
+ */
+public class TsConditionEraser implements TupleFilterSerializer.Decorator {
+
+    private final TblColRef tsColumn;
+    private final TupleFilter root;
+
+    private IdentityHashMap<TupleFilter, Boolean> isInTopLevelANDs;
+
+    public TsConditionEraser(TblColRef tsColumn, TupleFilter root) {
+        this.tsColumn = tsColumn;
+        this.root = root;
+        this.isInTopLevelANDs = Maps.newIdentityHashMap();
+    }
+
+    /**
+     * replace filter on timestamp column to null, so that two tuple filter trees can
+     * be compared regardless of the filter condition on timestamp column (In top level where conditions concatenated by ANDs)
+     * @param filter
+     * @return
+     */
+    @Override
+    public TupleFilter onSerialize(TupleFilter filter) {
+
+        if (filter == null)
+            return null;
+
+        //we just need reference equal
+        if (root == filter) {
+            isInTopLevelANDs.put(filter, true);
+        }
+
+        if (isInTopLevelANDs.containsKey(filter)) {
+            classifyChildrenByMarking(filter);
+
+            if (filter instanceof CompareTupleFilter) {
+                TblColRef c = ((CompareTupleFilter) filter).getColumn();
+                if (c != null && c.equals(tsColumn)) {
+                    return null;
+                }
+            }
+        }
+
+        return filter;
+    }
+
+    private void classifyChildrenByMarking(TupleFilter filter) {
+        if (filter instanceof LogicalTupleFilter) {
+            if (filter.getOperator() == TupleFilter.FilterOperatorEnum.AND) {
+                for (TupleFilter child : filter.getChildren()) {
+                    isInTopLevelANDs.put(child, true);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
new file mode 100644
index 0000000..c5bd3e0
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilter.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.filter;
+
+import java.util.Collection;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.IEvaluatableTuple;
+
+import com.google.common.collect.Maps;
+
+/**
+ * 
+ * @author xjiang
+ * 
+ */
+public abstract class TupleFilter {
+
+    public enum FilterOperatorEnum {
+        EQ(1), NEQ(2), GT(3), LT(4), GTE(5), LTE(6), ISNULL(7), ISNOTNULL(8), IN(9), NOTIN(10), AND(20), OR(21), NOT(22), COLUMN(30), CONSTANT(31), DYNAMIC(32), EXTRACT(33), CASE(34);
+
+        private final int value;
+
+        private FilterOperatorEnum(int v) {
+            this.value = v;
+        }
+
+        public int getValue() {
+            return this.value;
+        }
+    }
+
+    public static final int BUFFER_SIZE = 10240;
+
+    protected static final Map<FilterOperatorEnum, FilterOperatorEnum> REVERSE_OP_MAP = Maps.newHashMap();
+    protected static final Map<FilterOperatorEnum, FilterOperatorEnum> SWAP_OP_MAP = Maps.newHashMap();
+
+    static {
+        REVERSE_OP_MAP.put(FilterOperatorEnum.EQ, FilterOperatorEnum.NEQ);
+        REVERSE_OP_MAP.put(FilterOperatorEnum.NEQ, FilterOperatorEnum.EQ);
+        REVERSE_OP_MAP.put(FilterOperatorEnum.GT, FilterOperatorEnum.LTE);
+        REVERSE_OP_MAP.put(FilterOperatorEnum.LTE, FilterOperatorEnum.GT);
+        REVERSE_OP_MAP.put(FilterOperatorEnum.LT, FilterOperatorEnum.GTE);
+        REVERSE_OP_MAP.put(FilterOperatorEnum.GTE, FilterOperatorEnum.LT);
+        REVERSE_OP_MAP.put(FilterOperatorEnum.IN, FilterOperatorEnum.NOTIN);
+        REVERSE_OP_MAP.put(FilterOperatorEnum.NOTIN, FilterOperatorEnum.IN);
+        REVERSE_OP_MAP.put(FilterOperatorEnum.ISNULL, FilterOperatorEnum.ISNOTNULL);
+        REVERSE_OP_MAP.put(FilterOperatorEnum.ISNOTNULL, FilterOperatorEnum.ISNULL);
+        REVERSE_OP_MAP.put(FilterOperatorEnum.AND, FilterOperatorEnum.OR);
+        REVERSE_OP_MAP.put(FilterOperatorEnum.OR, FilterOperatorEnum.AND);
+
+        SWAP_OP_MAP.put(FilterOperatorEnum.EQ, FilterOperatorEnum.EQ);
+        SWAP_OP_MAP.put(FilterOperatorEnum.NEQ, FilterOperatorEnum.NEQ);
+        SWAP_OP_MAP.put(FilterOperatorEnum.GT, FilterOperatorEnum.LT);
+        SWAP_OP_MAP.put(FilterOperatorEnum.LTE, FilterOperatorEnum.GTE);
+        SWAP_OP_MAP.put(FilterOperatorEnum.LT, FilterOperatorEnum.GT);
+        SWAP_OP_MAP.put(FilterOperatorEnum.GTE, FilterOperatorEnum.LTE);
+    }
+
+    protected final List<TupleFilter> children;
+    protected FilterOperatorEnum operator;
+    protected boolean hasChildren;
+
+    protected TupleFilter(List<TupleFilter> filters, FilterOperatorEnum op) {
+        this.children = filters;
+        this.operator = op;
+    }
+
+    public void addChild(TupleFilter child) {
+        children.add(child);
+    }
+
+    final public void addChildren(List<? extends TupleFilter> children) {
+        for (TupleFilter c : children)
+            addChild(c); // subclass overrides addChild()
+    }
+
+    public List<? extends TupleFilter> getChildren() {
+        return children;
+    }
+
+    public boolean hasChildren() {
+        return children != null && !children.isEmpty();
+    }
+
+    public FilterOperatorEnum getOperator() {
+        return operator;
+    }
+
+    public TupleFilter copy() {
+        throw new UnsupportedOperationException();
+    }
+
+    public TupleFilter reverse() {
+        throw new UnsupportedOperationException();
+    }
+
+    public TupleFilter flatFilter() {
+        return flattenInternal(this);
+    }
+
+    private TupleFilter flattenInternal(TupleFilter filter) {
+        TupleFilter flatFilter = null;
+        if (!(filter instanceof LogicalTupleFilter)) {
+            flatFilter = new LogicalTupleFilter(FilterOperatorEnum.AND);
+            flatFilter.addChild(filter);
+            return flatFilter;
+        }
+
+        // post-order recursive travel
+        FilterOperatorEnum op = filter.getOperator();
+        List<TupleFilter> andChildren = new LinkedList<TupleFilter>();
+        List<TupleFilter> orChildren = new LinkedList<TupleFilter>();
+        for (TupleFilter child : filter.getChildren()) {
+            TupleFilter flatChild = flattenInternal(child);
+            FilterOperatorEnum childOp = flatChild.getOperator();
+            if (childOp == FilterOperatorEnum.AND) {
+                andChildren.add(flatChild);
+            } else if (childOp == FilterOperatorEnum.OR) {
+                orChildren.add(flatChild);
+            } else {
+                throw new IllegalStateException("Filter is " + filter + " and child is " + flatChild);
+            }
+        }
+
+        // boolean algebra flatten
+        if (op == FilterOperatorEnum.AND) {
+            flatFilter = new LogicalTupleFilter(FilterOperatorEnum.AND);
+            for (TupleFilter andChild : andChildren) {
+                flatFilter.addChildren(andChild.getChildren());
+            }
+            if (!orChildren.isEmpty()) {
+                List<TupleFilter> fullAndFilters = cartesianProduct(orChildren, flatFilter);
+                flatFilter = new LogicalTupleFilter(FilterOperatorEnum.OR);
+                flatFilter.addChildren(fullAndFilters);
+            }
+        } else if (op == FilterOperatorEnum.OR) {
+            flatFilter = new LogicalTupleFilter(FilterOperatorEnum.OR);
+            for (TupleFilter orChild : orChildren) {
+                flatFilter.addChildren(orChild.getChildren());
+            }
+            flatFilter.addChildren(andChildren);
+        } else if (op == FilterOperatorEnum.NOT) {
+            assert (filter.children.size() == 1);
+            TupleFilter reverse = filter.children.get(0).reverse();
+            flatFilter = flattenInternal(reverse);
+        } else {
+            throw new IllegalStateException("Filter is " + filter);
+        }
+        return flatFilter;
+    }
+
+    private List<TupleFilter> cartesianProduct(List<TupleFilter> leftOrFilters, TupleFilter partialAndFilter) {
+        List<TupleFilter> oldProductFilters = new LinkedList<TupleFilter>();
+        oldProductFilters.add(partialAndFilter);
+        for (TupleFilter orFilter : leftOrFilters) {
+            List<TupleFilter> newProductFilters = new LinkedList<TupleFilter>();
+            for (TupleFilter orChildFilter : orFilter.getChildren()) {
+                for (TupleFilter productFilter : oldProductFilters) {
+                    TupleFilter fullAndFilter = productFilter.copy();
+                    fullAndFilter.addChildren(orChildFilter.getChildren());
+                    newProductFilters.add(fullAndFilter);
+                }
+            }
+            oldProductFilters = newProductFilters;
+        }
+        return oldProductFilters;
+    }
+
+    public abstract boolean isEvaluable();
+
+    public abstract boolean evaluate(IEvaluatableTuple tuple, IFilterCodeSystem<?> cs);
+
+    public abstract Collection<?> getValues();
+
+    abstract byte[] serialize(IFilterCodeSystem<?> cs);
+
+    abstract void deserialize(byte[] bytes, IFilterCodeSystem<?> cs);
+
+    public static boolean isEvaluableRecursively(TupleFilter filter) {
+        if (filter == null)
+            return true;
+
+        if (!filter.isEvaluable())
+            return false;
+
+        for (TupleFilter child : filter.getChildren()) {
+            if (!isEvaluableRecursively(child))
+                return false;
+        }
+        return true;
+    }
+
+    public static void collectColumns(TupleFilter filter, Set<TblColRef> collector) {
+        if (filter == null || collector == null)
+            return;
+
+        if (filter instanceof ColumnTupleFilter) {
+            ColumnTupleFilter columnTupleFilter = (ColumnTupleFilter) filter;
+            collector.add(columnTupleFilter.getColumn());
+        }
+
+        for (TupleFilter child : filter.getChildren()) {
+            collectColumns(child, collector);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
new file mode 100644
index 0000000..a9d785b
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/filter/TupleFilterSerializer.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.filter;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Stack;
+
+import org.apache.kylin.common.util.BytesUtil;
+
+/**
+ * http://eli.thegreenplace.net/2011/09/29/an-interesting-tree-serialization-algorithm-from-dwarf
+ * 
+ * @author xjiang
+ * 
+ */
+public class TupleFilterSerializer {
+
+    public static interface Decorator {
+        TupleFilter onSerialize(TupleFilter filter);
+    }
+
+    private static final int BUFFER_SIZE = 65536;
+    private static final Map<Integer, TupleFilter.FilterOperatorEnum> ID_OP_MAP = new HashMap<Integer, TupleFilter.FilterOperatorEnum>();
+
+    static {
+        for (TupleFilter.FilterOperatorEnum op : TupleFilter.FilterOperatorEnum.values()) {
+            ID_OP_MAP.put(op.getValue(), op);
+        }
+    }
+
+    public static byte[] serialize(TupleFilter rootFilter, IFilterCodeSystem<?> cs) {
+        return serialize(rootFilter, null, cs);
+    }
+
+    public static byte[] serialize(TupleFilter rootFilter, Decorator decorator, IFilterCodeSystem<?> cs) {
+        ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
+        internalSerialize(rootFilter, decorator, buffer, cs);
+        byte[] result = new byte[buffer.position()];
+        System.arraycopy(buffer.array(), 0, result, 0, buffer.position());
+        return result;
+    }
+
+    private static void internalSerialize(TupleFilter filter, Decorator decorator, ByteBuffer buffer, IFilterCodeSystem<?> cs) {
+        if (decorator != null) { // give decorator a chance to manipulate the output filter
+            filter = decorator.onSerialize(filter);
+        }
+
+        if (filter == null) {
+            return;
+        }
+
+        if (filter.hasChildren()) {
+            // serialize filter+true
+            serializeFilter(1, filter, decorator, buffer, cs);
+            // serialize children
+            for (TupleFilter child : filter.getChildren()) {
+                internalSerialize(child, decorator, buffer, cs);
+            }
+            // serialize none
+            serializeFilter(-1, filter, decorator, buffer, cs);
+        } else {
+            // serialize filter+false
+            serializeFilter(0, filter, decorator, buffer, cs);
+        }
+    }
+
+    private static void serializeFilter(int flag, TupleFilter filter, Decorator decorator, ByteBuffer buffer, IFilterCodeSystem<?> cs) {
+        if (flag < 0) {
+            BytesUtil.writeVInt(-1, buffer);
+        } else {
+            byte[] bytes = filter.serialize(cs);
+            int opVal = filter.getOperator().getValue();
+            BytesUtil.writeVInt(opVal, buffer);
+            BytesUtil.writeByteArray(bytes, buffer);
+            BytesUtil.writeVInt(flag, buffer);
+        }
+    }
+
+    public static TupleFilter deserialize(byte[] bytes, IFilterCodeSystem<?> cs) {
+        ByteBuffer buffer = ByteBuffer.wrap(bytes);
+        TupleFilter rootFilter = null;
+        Stack<TupleFilter> parentStack = new Stack<TupleFilter>();
+        while (buffer.hasRemaining()) {
+            int opVal = BytesUtil.readVInt(buffer);
+            if (opVal < 0) {
+                parentStack.pop();
+                continue;
+            }
+
+            // deserialize filter
+            TupleFilter filter = createTupleFilter(opVal);
+            byte[] filetrBytes = BytesUtil.readByteArray(buffer);
+            filter.deserialize(filetrBytes, cs);
+
+            if (rootFilter == null) {
+                // push root to stack
+                rootFilter = filter;
+                parentStack.push(filter);
+                BytesUtil.readVInt(buffer);
+                continue;
+            }
+
+            // add filter to parent
+            TupleFilter parentFilter = parentStack.peek();
+            if (parentFilter != null) {
+                parentFilter.addChild(filter);
+            }
+
+            // push filter to stack or not based on having children or not
+            int hasChild = BytesUtil.readVInt(buffer);
+            if (hasChild == 1) {
+                parentStack.push(filter);
+            }
+        }
+        return rootFilter;
+    }
+
+    private static TupleFilter createTupleFilter(int opVal) {
+        TupleFilter.FilterOperatorEnum op = ID_OP_MAP.get(opVal);
+        if (op == null) {
+            throw new IllegalStateException("operator value is " + opVal);
+        }
+        TupleFilter filter = null;
+        switch (op) {
+        case AND:
+        case OR:
+        case NOT:
+            filter = new LogicalTupleFilter(op);
+            break;
+        case EQ:
+        case NEQ:
+        case LT:
+        case LTE:
+        case GT:
+        case GTE:
+        case IN:
+        case ISNULL:
+        case ISNOTNULL:
+            filter = new CompareTupleFilter(op);
+            break;
+        case EXTRACT:
+            filter = new ExtractTupleFilter(op);
+            break;
+        case CASE:
+            filter = new CaseTupleFilter();
+            break;
+        case COLUMN:
+            filter = new ColumnTupleFilter(null);
+            break;
+        case CONSTANT:
+            filter = new ConstantTupleFilter();
+            break;
+        case DYNAMIC:
+            filter = new DynamicTupleFilter(null);
+            break;
+        default:
+            throw new IllegalStateException("Error FilterOperatorEnum: " + op.getValue());
+        }
+
+        return filter;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java
new file mode 100644
index 0000000..562fade
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMaxAggregator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import java.math.BigDecimal;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class BigDecimalMaxAggregator extends MeasureAggregator<BigDecimal> {
+
+    BigDecimal max = null;
+
+    @Override
+    public void reset() {
+        max = null;
+    }
+
+    @Override
+    public void aggregate(BigDecimal value) {
+        if (max == null)
+            max = value;
+        else if (max.compareTo(value) < 0)
+            max = value;
+    }
+
+    @Override
+    public BigDecimal getState() {
+        return max;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessBigDecimalMemBytes();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMinAggregator.java
new file mode 100644
index 0000000..8e11fd9
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalMinAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import java.math.BigDecimal;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class BigDecimalMinAggregator extends MeasureAggregator<BigDecimal> {
+
+    BigDecimal max = null;
+
+    @Override
+    public void reset() {
+        max = null;
+    }
+
+    @Override
+    public void aggregate(BigDecimal value) {
+        if (max == null)
+            max = value;
+        else if (max.compareTo(value) > 0)
+            max = value;
+    }
+
+    @Override
+    public BigDecimal getState() {
+        return max;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessBigDecimalMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSumAggregator.java
new file mode 100644
index 0000000..1b2aae1
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/BigDecimalSumAggregator.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import java.math.BigDecimal;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class BigDecimalSumAggregator extends MeasureAggregator<BigDecimal> {
+
+    BigDecimal sum = new BigDecimal(0);
+
+    @Override
+    public void reset() {
+        sum = new BigDecimal(0);
+    }
+
+    @Override
+    public void aggregate(BigDecimal value) {
+        sum = sum.add(value);
+    }
+
+    @Override
+    public BigDecimal getState() {
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessBigDecimalMemBytes();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java
new file mode 100644
index 0000000..1e5bf82
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMaxAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class DoubleMaxAggregator extends MeasureAggregator<DoubleWritable> {
+
+    DoubleWritable max = null;
+
+    @Override
+    public void reset() {
+        max = null;
+    }
+
+    @Override
+    public void aggregate(DoubleWritable value) {
+        if (max == null)
+            max = new DoubleWritable(value.get());
+        else if (max.get() < value.get())
+            max.set(value.get());
+    }
+
+    @Override
+    public DoubleWritable getState() {
+        return max;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessDoubleMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java
new file mode 100644
index 0000000..d9112f7
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleMinAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class DoubleMinAggregator extends MeasureAggregator<DoubleWritable> {
+
+    DoubleWritable min = null;
+
+    @Override
+    public void reset() {
+        min = null;
+    }
+
+    @Override
+    public void aggregate(DoubleWritable value) {
+        if (min == null)
+            min = new DoubleWritable(value.get());
+        else if (min.get() > value.get())
+            min.set(value.get());
+    }
+
+    @Override
+    public DoubleWritable getState() {
+        return min;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessDoubleMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java
new file mode 100644
index 0000000..923bd0b
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/DoubleSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.hadoop.io.DoubleWritable;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class DoubleSumAggregator extends MeasureAggregator<DoubleWritable> {
+
+    DoubleWritable sum = new DoubleWritable();
+
+    @Override
+    public void reset() {
+        sum.set(0.0);
+    }
+
+    @Override
+    public void aggregate(DoubleWritable value) {
+        sum.set(sum.get() + value.get());
+    }
+
+    @Override
+    public DoubleWritable getState() {
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessDoubleMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
new file mode 100644
index 0000000..a168743
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCAggregator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> {
+
+    final int precision;
+    HyperLogLogPlusCounter sum = null;
+
+    public HLLCAggregator(int precision) {
+        this.precision = precision;
+    }
+
+    @Override
+    public void reset() {
+        sum = null;
+    }
+
+    @Override
+    public void aggregate(HyperLogLogPlusCounter value) {
+        if (sum == null)
+            sum = new HyperLogLogPlusCounter(value);
+        else
+            sum.merge(value);
+    }
+
+    @Override
+    public HyperLogLogPlusCounter getState() {
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        // 1024 + 60 returned by AggregationCacheMemSizeTest
+        return 8 // aggregator obj shell
+                + 4 // precision
+                + 8 // ref to HLLC
+                + 8 // HLLC obj shell
+                + 32 + (1 << precision); // HLLC internal
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java
new file mode 100644
index 0000000..767a6d8
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LDCAggregator.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * Long Distinct Count
+ * 
+ * @author xjiang
+ * 
+ */
+public class LDCAggregator extends MeasureAggregator<LongWritable> {
+
+    private static LongWritable ZERO = new LongWritable(0);
+
+    private HLLCAggregator hllAgg = null;
+    private LongWritable state = new LongWritable(0);
+
+    @SuppressWarnings("rawtypes")
+    public void setDependentAggregator(MeasureAggregator agg) {
+        this.hllAgg = (HLLCAggregator) agg;
+    }
+
+    @Override
+    public void reset() {
+    }
+
+    @Override
+    public void aggregate(LongWritable value) {
+    }
+
+    @Override
+    public LongWritable getState() {
+        if (hllAgg == null) {
+            return ZERO;
+        } else {
+            state.set(hllAgg.getState().getCountEstimate());
+            return state;
+        }
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessLongMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java
new file mode 100644
index 0000000..0fac3c7
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMaxAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class LongMaxAggregator extends MeasureAggregator<LongWritable> {
+
+    LongWritable max = null;
+
+    @Override
+    public void reset() {
+        max = null;
+    }
+
+    @Override
+    public void aggregate(LongWritable value) {
+        if (max == null)
+            max = new LongWritable(value.get());
+        else if (max.get() < value.get())
+            max.set(value.get());
+    }
+
+    @Override
+    public LongWritable getState() {
+        return max;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessLongMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java
new file mode 100644
index 0000000..4c058d8
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongMinAggregator.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class LongMinAggregator extends MeasureAggregator<LongWritable> {
+
+    LongWritable min = null;
+
+    @Override
+    public void reset() {
+        min = null;
+    }
+
+    @Override
+    public void aggregate(LongWritable value) {
+        if (min == null)
+            min = new LongWritable(value.get());
+        else if (min.get() > value.get())
+            min.set(value.get());
+    }
+
+    @Override
+    public LongWritable getState() {
+        return min;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessLongMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java
new file mode 100644
index 0000000..e40870d
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/LongSumAggregator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.hadoop.io.LongWritable;
+
+/**
+ * @author yangli9
+ * 
+ */
+public class LongSumAggregator extends MeasureAggregator<LongWritable> {
+
+    LongWritable sum = new LongWritable();
+
+    @Override
+    public void reset() {
+        sum.set(0);
+    }
+
+    @Override
+    public void aggregate(LongWritable value) {
+        sum.set(sum.get() + value.get());
+    }
+
+    @Override
+    public LongWritable getState() {
+        return sum;
+    }
+
+    @Override
+    public int getMemBytesEstimate() {
+        return guessLongMemBytes();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
new file mode 100644
index 0000000..4153cbd
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregator.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import org.apache.kylin.metadata.model.DataType;
+import org.apache.kylin.metadata.model.FunctionDesc;
+
+/**
+ * @author yangli9
+ * 
+ */
+abstract public class MeasureAggregator<V> {
+
+    public static MeasureAggregator<?> create(String funcName, String returnType) {
+        if (FunctionDesc.FUNC_SUM.equalsIgnoreCase(funcName) || FunctionDesc.FUNC_COUNT.equalsIgnoreCase(funcName)) {
+            if (isInteger(returnType))
+                return new LongSumAggregator();
+            else if (isBigDecimal(returnType))
+                return new BigDecimalSumAggregator();
+            else if (isDouble(returnType))
+                return new DoubleSumAggregator();
+        } else if (FunctionDesc.FUNC_COUNT_DISTINCT.equalsIgnoreCase(funcName)) {
+            DataType hllcType = DataType.getInstance(returnType);
+            if (hllcType.isHLLC())
+                return new HLLCAggregator(hllcType.getPrecision());
+            else
+                return new LDCAggregator();
+        } else if (FunctionDesc.FUNC_MAX.equalsIgnoreCase(funcName)) {
+            if (isInteger(returnType))
+                return new LongMaxAggregator();
+            else if (isBigDecimal(returnType))
+                return new BigDecimalMaxAggregator();
+            else if (isDouble(returnType))
+                return new DoubleMaxAggregator();
+        } else if (FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName)) {
+            if (isInteger(returnType))
+                return new LongMinAggregator();
+            else if (isBigDecimal(returnType))
+                return new BigDecimalMinAggregator();
+            else if (isDouble(returnType))
+                return new DoubleMinAggregator();
+        }
+        throw new IllegalArgumentException("No aggregator for func '" + funcName + "' and return type '" + returnType + "'");
+    }
+
+    public static boolean isBigDecimal(String type) {
+        return type.startsWith("decimal");
+    }
+
+    public static boolean isDouble(String type) {
+        return "double".equalsIgnoreCase(type) || "float".equalsIgnoreCase(type) || "real".equalsIgnoreCase(type);
+    }
+
+    public static boolean isInteger(String type) {
+        return "long".equalsIgnoreCase(type) || "bigint".equalsIgnoreCase(type) || "int".equalsIgnoreCase(type) || "integer".equalsIgnoreCase(type);
+    }
+
+    public static int guessBigDecimalMemBytes() {
+        // 116 returned by AggregationCacheMemSizeTest
+        return 8 // aggregator obj shell
+        + 8 // ref to BigDecimal
+        + 8 // BigDecimal obj shell
+        + 100; // guess of BigDecimal internal
+    }
+
+    public static int guessDoubleMemBytes() {
+        // 29 to 44 returned by AggregationCacheMemSizeTest
+        return 44;
+        /*
+        return 8 // aggregator obj shell
+        + 8 // ref to DoubleWritable
+        + 8 // DoubleWritable obj shell
+        + 8; // size of double
+        */
+    }
+
+    public static int guessLongMemBytes() {
+        // 29 to 44 returned by AggregationCacheMemSizeTest
+        return 44;
+        /*
+        return 8 // aggregator obj shell
+        + 8 // ref to LongWritable
+        + 8 // LongWritable obj shell
+        + 8; // size of long
+        */
+    }
+
+    // ============================================================================
+
+    @SuppressWarnings("rawtypes")
+    public void setDependentAggregator(MeasureAggregator agg) {
+    }
+
+    abstract public void reset();
+
+    abstract public void aggregate(V value);
+
+    abstract public V getState();
+
+    // get an estimate of memory consumption UPPER BOUND
+    abstract public int getMemBytesEstimate();
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java
new file mode 100644
index 0000000..aa58664
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureAggregators.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.FunctionDesc;
+
+/**
+ * @author yangli9
+ * 
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class MeasureAggregators {
+
+    private MeasureDesc[] descs;
+    private MeasureAggregator[] aggs;
+
+    public MeasureAggregators(Collection<MeasureDesc> measureDescs) {
+        this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
+    }
+
+    public MeasureAggregators(MeasureDesc... measureDescs) {
+        descs = measureDescs;
+        aggs = new MeasureAggregator[descs.length];
+
+        Map<String, Integer> measureIndexMap = new HashMap<String, Integer>();
+        for (int i = 0; i < descs.length; i++) {
+            FunctionDesc func = descs[i].getFunction();
+            aggs[i] = MeasureAggregator.create(func.getExpression(), func.getReturnType());
+            measureIndexMap.put(descs[i].getName(), i);
+        }
+        // fill back dependent aggregator
+        for (int i = 0; i < descs.length; i++) {
+            String depMsrRef = descs[i].getDependentMeasureRef();
+            if (depMsrRef != null) {
+                int index = measureIndexMap.get(depMsrRef);
+                aggs[i].setDependentAggregator(aggs[index]);
+            }
+        }
+    }
+
+    public void reset() {
+        for (int i = 0; i < aggs.length; i++) {
+            aggs[i].reset();
+        }
+    }
+
+    public void aggregate(Object[] values) {
+        assert values.length == descs.length;
+
+        for (int i = 0; i < descs.length; i++) {
+            aggs[i].aggregate(values[i]);
+        }
+    }
+
+    public void collectStates(Object[] states) {
+        for (int i = 0; i < descs.length; i++) {
+            states[i] = aggs[i].getState();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
new file mode 100644
index 0000000..f620ace
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/MeasureCodec.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.serializer.DataTypeSerializer;
+
+/**
+ * @author yangli9
+ * 
+ */
+@SuppressWarnings({ "rawtypes", "unchecked" })
+public class MeasureCodec {
+
+    int nMeasures;
+    DataTypeSerializer[] serializers;
+
+    public MeasureCodec(Collection<MeasureDesc> measureDescs) {
+        this((MeasureDesc[]) measureDescs.toArray(new MeasureDesc[measureDescs.size()]));
+    }
+
+    public MeasureCodec(MeasureDesc... measureDescs) {
+        String[] dataTypes = new String[measureDescs.length];
+        for (int i = 0; i < dataTypes.length; i++) {
+            dataTypes[i] = measureDescs[i].getFunction().getReturnType();
+        }
+        init(dataTypes);
+    }
+
+    public MeasureCodec(String... dataTypes) {
+        init(dataTypes);
+    }
+
+    private void init(String[] dataTypes) {
+        nMeasures = dataTypes.length;
+        serializers = new DataTypeSerializer[nMeasures];
+
+        for (int i = 0; i < nMeasures; i++) {
+            serializers[i] = DataTypeSerializer.create(dataTypes[i]);
+        }
+    }
+
+    public DataTypeSerializer getSerializer(int idx) {
+        return serializers[idx];
+    }
+
+    public void decode(ByteBuffer buf, Object[] result) {
+        assert result.length == nMeasures;
+        for (int i = 0; i < nMeasures; i++) {
+            result[i] = serializers[i].deserialize(buf);
+        }
+    }
+
+    public void encode(Object[] values, ByteBuffer out) {
+        assert values.length == nMeasures;
+        for (int i = 0; i < nMeasures; i++) {
+            serializers[i].serialize(values[i], out);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
new file mode 100644
index 0000000..85dfe14
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.metadata.measure.fixedlen;
+
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.model.DataType;
+
+/**
+ */
+public class FixedHLLCodec extends FixedLenMeasureCodec<HyperLogLogPlusCounter> {
+
+    private DataType type;
+    private int presision;
+    private HyperLogLogPlusCounter current;
+
+    public FixedHLLCodec(DataType type) {
+        this.type = type;
+        this.presision = type.getPrecision();
+        this.current = new HyperLogLogPlusCounter(this.presision);
+    }
+
+    @Override
+    public int getLength() {
+        return 1 << presision;
+    }
+
+    @Override
+    public DataType getDataType() {
+        return type;
+    }
+
+    @Override
+    public HyperLogLogPlusCounter valueOf(String value) {
+        current.clear();
+        if (value == null)
+            current.add("__nUlL__");
+        else
+            current.add(value.getBytes());
+        return current;
+    }
+
+    @Override
+    public Object getValue() {
+        return current;
+    }
+
+    @Override
+    public HyperLogLogPlusCounter read(byte[] buf, int offset) {
+        current.readRegistersArray(ByteBuffer.wrap(buf, offset, buf.length - offset));
+        return current;
+    }
+
+    @Override
+    public void write(HyperLogLogPlusCounter v, byte[] buf, int offset) {
+        v.writeRegistersArray(ByteBuffer.wrap(buf, offset, buf.length - offset));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
new file mode 100644
index 0000000..ad8c483
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure.fixedlen;
+
+import org.apache.kylin.metadata.model.DataType;
+
+abstract public class FixedLenMeasureCodec<T> {
+
+    public static FixedLenMeasureCodec<?> get(DataType type) {
+        if (type.isHLLC()) {
+            return new FixedHLLCodec(type);
+        } else {
+            return new FixedPointLongCodec(type);
+        }
+    }
+
+    abstract public int getLength();
+
+    abstract public DataType getDataType();
+
+    abstract public T valueOf(String value);
+
+    abstract public Object getValue();
+
+    abstract public T read(byte[] buf, int offset);
+
+    abstract public void write(T v, byte[] buf, int offset);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
new file mode 100644
index 0000000..5218101
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedPointLongCodec.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.measure.fixedlen;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.metadata.model.DataType;
+
+public class FixedPointLongCodec extends FixedLenMeasureCodec<LongWritable> {
+
+    private static final int SIZE = 8;
+    // number of digits after decimal point
+    int scale;
+    DataType type;
+    // avoid massive object creation
+    LongWritable current = new LongWritable();
+
+    public FixedPointLongCodec(DataType type) {
+        this.type = type;
+        this.scale = Math.max(0, type.getScale());
+    }
+
+    @Override
+    public int getLength() {
+        return SIZE;
+    }
+
+    @Override
+    public DataType getDataType() {
+        return type;
+    }
+
+    long getValueIgnoringDecimalPoint(String value) {
+        int index = value.indexOf('.');
+
+        if (index == 0 || index == value.length() - 1) {
+            throw new RuntimeException("Bad decimal format: " + value);
+        } else if (index < 0) {
+            return Long.valueOf(value) * (int) Math.pow(10, scale);
+        } else {
+            StringBuilder sb = new StringBuilder();
+            sb.append(value.substring(0, index));
+
+            //if there are more than scale digits after the decimal point, the tail will be discarded
+            int end = Math.min(value.length(), index + scale + 1);
+            sb.append(value.substring(index + 1, end));
+            int diff = index + scale + 1 - value.length();
+            //if there are less than scale digits after the decimal point, the tail will be compensated
+            for (int i = 0; i < diff; i++) {
+                sb.append('0');
+            }
+            return Long.valueOf(sb.toString());
+        }
+    }
+
+    String restoreDecimalPoint(long value) {
+        if (scale < 0) {
+            throw new RuntimeException("Bad scale: " + scale + " with value: " + value);
+        } else if (scale == 0) {
+            return Long.toString(value);
+        } else {
+            return String.format("%." + scale + "f", value / (Math.pow(10, scale)));
+        }
+    }
+
+    @Override
+    public LongWritable valueOf(String value) {
+        if (value == null)
+            current.set(0L);
+        else
+            current.set(getValueIgnoringDecimalPoint(value));
+        return current;
+    }
+
+    @Override
+    public String getValue() {
+        if (scale == 0)
+            return current.toString();
+        else
+            return restoreDecimalPoint(current.get());
+    }
+
+    @Override
+    public LongWritable read(byte[] buf, int offset) {
+        current.set(BytesUtil.readLong(buf, offset, SIZE));
+        return current;
+    }
+
+    @Override
+    public void write(LongWritable v, byte[] buf, int offset) {
+        BytesUtil.writeLong(v == null ? 0 : v.get(), buf, offset, SIZE);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7e8896ac/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
new file mode 100644
index 0000000..6392eca
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.model;
+
+import org.apache.commons.lang.StringUtils;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Column Metadata from Source. All name should be uppercase.
+ * <p/>
+ */
+@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
+public class ColumnDesc {
+
+    @JsonProperty("id")
+    private String id;
+    @JsonProperty("name")
+    private String name;
+    @JsonProperty("datatype")
+    private String datatype;
+
+    // parsed from data type
+    private DataType type;
+
+    private TableDesc table;
+    private int zeroBasedIndex = -1;
+    private boolean isNullable = true;
+
+    public ColumnDesc() { // default constructor for Jackson
+    }
+
+    public static ColumnDesc mockup(TableDesc table, int oneBasedColumnIndex, String name, String datatype) {
+        ColumnDesc desc = new ColumnDesc();
+        String id = "" + oneBasedColumnIndex;
+        desc.setId(id);
+        desc.setName(name);
+        desc.setDatatype(datatype);
+        desc.init(table);
+        return desc;
+    }
+
+    public int getZeroBasedIndex() {
+        return zeroBasedIndex;
+    }
+
+    public String getDatatype() {
+        return datatype;
+    }
+
+    public void setDatatype(String datatype) {
+        //logger.info("setting datatype to " + datatype);
+        this.datatype = datatype;
+        type = DataType.getInstance(datatype);
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    public TableDesc getTable() {
+        return table;
+    }
+
+    public void setTable(TableDesc table) {
+        this.table = table;
+    }
+
+    public DataType getType() {
+        return type;
+    }
+
+    public String getTypeName() {
+        return type.getName();
+    }
+
+    public int getTypePrecision() {
+        return type.getPrecision();
+    }
+
+    public int getTypeScale() {
+        return type.getScale();
+    }
+
+    public boolean isNullable() {
+        return this.isNullable;
+    }
+
+    public void setNullable(boolean nullable) {
+        this.isNullable = nullable;
+    }
+
+    public void init(TableDesc table) {
+        this.table = table;
+
+        if (name != null)
+            name = name.toUpperCase();
+
+        if (id != null)
+            zeroBasedIndex = Integer.parseInt(id) - 1;
+
+        DataType normalized = DataType.getInstance(datatype);
+        if (normalized == null) {
+            this.setDatatype(null);
+        } else {
+            this.setDatatype(normalized.toString());
+        }
+    }
+
+    public boolean isSameAs(String tableName, String columnName) {
+        return StringUtils.equalsIgnoreCase(table.getIdentity(), tableName) && //
+                StringUtils.equalsIgnoreCase(name, columnName);
+    }
+
+    @Override
+    public String toString() {
+        return "ColumnDesc [name=" + name + ",table=" + table.getIdentity() + "]";
+    }
+}