You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/23 03:26:41 UTC

[37/50] [abbrv] kylin git commit: KYLIN-976 Support Custom Aggregation Types

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index fd86138..f225fe2 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -59,7 +59,6 @@ import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.query.sqlfunc.HLLDistinctCountAggFunc;
 
 import com.google.common.base.Preconditions;
 
@@ -74,7 +73,6 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         AGGR_FUNC_MAP.put("$SUM0", "SUM");
         AGGR_FUNC_MAP.put("COUNT", "COUNT");
         AGGR_FUNC_MAP.put("COUNT_DISTINCT", "COUNT_DISTINCT");
-        AGGR_FUNC_MAP.put("HLL_COUNT", "COUNT_DISTINCT");
         AGGR_FUNC_MAP.put("MAX", "MAX");
         AGGR_FUNC_MAP.put("MIN", "MIN");
     }
@@ -301,10 +299,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         // rebuild function
         RelDataType fieldType = aggCall.getType();
         SqlAggFunction newAgg = aggCall.getAggregation();
-        if (func.isCountDistinct()) {
-            newAgg = createHyperLogLogAggFunction(fieldType);
-        } else if (func.isCount()) {
+        if (func.isCount()) {
             newAgg = SqlStdOperatorTable.SUM0;
+        } else if (func.getMeasureType().getRewriteCalciteAggrFunctionClass() != null) {
+            newAgg = createCustomAggFunction(func.getExpression(), fieldType, func.getMeasureType().getRewriteCalciteAggrFunctionClass());
         }
 
         // rebuild aggregate call
@@ -312,10 +310,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         return newAggCall;
     }
 
-    private SqlAggFunction createHyperLogLogAggFunction(RelDataType returnType) {
+    private SqlAggFunction createCustomAggFunction(String funcName, RelDataType returnType, Class<?> customAggFuncClz) {
         RelDataTypeFactory typeFactory = getCluster().getTypeFactory();
-        SqlIdentifier sqlIdentifier = new SqlIdentifier("HLL_COUNT", new SqlParserPos(1, 1));
-        AggregateFunction aggFunction = AggregateFunctionImpl.create(HLLDistinctCountAggFunc.class);
+        SqlIdentifier sqlIdentifier = new SqlIdentifier(funcName, new SqlParserPos(1, 1));
+        AggregateFunction aggFunction = AggregateFunctionImpl.create(customAggFuncClz);
         List<RelDataType> argTypes = new ArrayList<RelDataType>();
         List<SqlTypeFamily> typeFamilies = new ArrayList<SqlTypeFamily>();
         for (FunctionParameter o : aggFunction.getParameters()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/Candidate.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/Candidate.java b/query/src/main/java/org/apache/kylin/query/routing/Candidate.java
new file mode 100644
index 0000000..22608a0
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/routing/Candidate.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.routing;
+
+import java.util.Map;
+
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+
+import com.google.common.collect.Maps;
+
+public class Candidate implements Comparable<Candidate> {
+
+    static final Map<RealizationType, Integer> PRIORITIES = Maps.newHashMap();
+
+    static {
+        PRIORITIES.put(RealizationType.HYBRID, 0);
+        PRIORITIES.put(RealizationType.CUBE, 0);
+        PRIORITIES.put(RealizationType.INVERTED_INDEX, 1);
+    }
+
+    /** for test only */
+    public static void setPriorities(Map<RealizationType, Integer> priorities) {
+        PRIORITIES.clear();
+        PRIORITIES.putAll(priorities);
+    }
+
+    // ============================================================================
+
+    IRealization realization;
+    SQLDigest sqlDigest;
+    int priority;
+    CapabilityResult capability;
+
+    public Candidate(IRealization realization, SQLDigest sqlDigest) {
+        this.realization = realization;
+        this.sqlDigest = sqlDigest;
+        this.priority = PRIORITIES.get(realization.getType());
+    }
+
+    public IRealization getRealization() {
+        return realization;
+    }
+
+    public SQLDigest getSqlDigest() {
+        return sqlDigest;
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public CapabilityResult getCapability() {
+        return capability;
+    }
+
+    public void setCapability(CapabilityResult capability) {
+        this.capability = capability;
+    }
+
+    @Override
+    public int compareTo(Candidate o) {
+        int comp = this.priority - o.priority;
+        if (comp != 0) {
+            return comp;
+        }
+
+        comp = this.capability.cost - o.capability.cost;
+        if (comp != 0) {
+            return comp;
+        }
+
+        if (this.realization instanceof HybridInstance)
+            return -1;
+        else if (o.realization instanceof HybridInstance)
+            return 1;
+        else
+            return 0;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java b/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
index 7fdc725..7493e08 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
@@ -6,23 +6,30 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 
 package org.apache.kylin.query.routing;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
+import org.apache.kylin.metadata.realization.CapabilityResult.DimensionAsMeasure;
 import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,22 +46,54 @@ public class QueryRouter {
     public static IRealization selectRealization(OLAPContext olapContext) throws NoRealizationFoundException {
 
         ProjectManager prjMgr = ProjectManager.getInstance(olapContext.olapSchema.getConfig());
+        logger.info("The project manager's reference is " + prjMgr);
         String factTableName = olapContext.firstTableScan.getTableName();
         String projectName = olapContext.olapSchema.getProjectName();
-        List<IRealization> realizations = Lists.newArrayList(prjMgr.getRealizationsByTable(projectName, factTableName));
-        logger.info("Find candidates by table " + factTableName + " and project=" + projectName + " : " + StringUtils.join(realizations, ","));
+        Set<IRealization> realizations = prjMgr.getRealizationsByTable(projectName, factTableName);
+        SQLDigest sqlDigest = olapContext.getSQLDigest();
+
+        List<Candidate> candidates = Lists.newArrayListWithCapacity(realizations.size());
+        for (IRealization real : realizations) {
+            if (real.isReady())
+                candidates.add(new Candidate(real, sqlDigest));
+        }
 
-        //rule based realization selection, rules might reorder realizations or remove specific realization
-        RoutingRule.applyRules(realizations, olapContext);
+        logger.info("Find candidates by table " + factTableName + " and project=" + projectName + " : " + StringUtils.join(candidates, ","));
 
-        if (realizations.size() == 0) {
-            throw new NoRealizationFoundException("Can't find any realization. Please confirm with providers. SQL digest: " + olapContext.getSQLDigest().toString());
+        // rule based realization selection, rules might reorder realizations or remove specific realization
+        RoutingRule.applyRules(candidates);
+
+        if (candidates.size() == 0) {
+            throw new NoRealizationFoundException("Can't find any realization. Please confirm with providers. SQL digest: " + sqlDigest.toString());
         }
 
+        Candidate chosen = candidates.get(0);
+        adjustForDimensionAsMeasure(chosen, olapContext);
+
         logger.info("The realizations remaining: ");
-        logger.info(RoutingRule.getPrintableText(realizations));
-        logger.info("The realization being chosen: " + realizations.get(0).getName());
+        logger.info(RoutingRule.getPrintableText(candidates));
+        logger.info("The realization being chosen: " + chosen.realization.getName());
 
-        return realizations.get(0);
+        return chosen.realization;
     }
+
+    private static void adjustForDimensionAsMeasure(Candidate chosen, OLAPContext olapContext) {
+        CapabilityResult capability = chosen.getCapability();
+        for (CapabilityInfluence inf : capability.influences) {
+            // convert the metric to dimension
+            if (inf instanceof DimensionAsMeasure) {
+                FunctionDesc functionDesc = ((DimensionAsMeasure) inf).getMeasureFunction();
+                functionDesc.setDimensionAsMetric(true);
+                olapContext.rewriteFields.remove(functionDesc.getRewriteFieldName());
+                for (TblColRef col : functionDesc.getParameter().getColRefs()) {
+                    if (col != null) {
+                        olapContext.metricsColumns.remove(col);
+                        olapContext.groupByColumns.add(col);
+                    }
+                }
+                logger.info("Adjust DimensionAsMeasure for " + functionDesc);
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
index 230950e..715f6d1 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * 
  *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * 
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
- */
+*/
 
 package org.apache.kylin.query.routing;
 
@@ -23,17 +23,14 @@ import java.util.List;
 
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRules.AdjustForWeeklyMatchedRealization;
-import org.apache.kylin.query.routing.RoutingRules.RealizationSortRule;
-import org.apache.kylin.query.routing.RoutingRules.RemoveUncapableRealizationsRule;
+import org.apache.kylin.query.routing.rules.RealizationSortRule;
+import org.apache.kylin.query.routing.rules.RemoveUncapableRealizationsRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 
 /**
- * Created by Hongbin Ma(Binmahone) on 1/5/15.
  */
 public abstract class RoutingRule {
     private static final Logger logger = LoggerFactory.getLogger(QueryRouter.class);
@@ -45,26 +42,23 @@ public abstract class RoutingRule {
     static {
         rules.add(new RemoveUncapableRealizationsRule());
         rules.add(new RealizationSortRule());
-        rules.add(new AdjustForWeeklyMatchedRealization());//this rule might modify olapcontext content, better put it at last
     }
 
-    public static void applyRules(List<IRealization> realizations, OLAPContext olapContext) {
+    public static void applyRules(List<Candidate> candidates) {
         for (RoutingRule rule : rules) {
-            logger.info("Initial realizations order:");
-            logger.info(getPrintableText(realizations));
-            logger.info("Applying rule " + rule);
-
-            rule.apply(realizations, olapContext);
-
-            logger.info(getPrintableText(realizations));
+            logger.info("Realizations order before: " + getPrintableText(candidates));
+            logger.info("Applying rule : " + rule);
+            rule.apply(candidates);
+            logger.info("Realizations order after: " + getPrintableText(candidates));
             logger.info("===================================================");
         }
     }
 
-    public static String getPrintableText(List<IRealization> realizations) {
+    public static String getPrintableText(List<Candidate> candidates) {
         StringBuffer sb = new StringBuffer();
         sb.append("[");
-        for (IRealization r : realizations) {
+        for (Candidate candidate : candidates) {
+            IRealization r = candidate.realization;
             sb.append(r.getName());
             sb.append(",");
         }
@@ -112,6 +106,6 @@ public abstract class RoutingRule {
         return this.getClass().toString();
     }
 
-    public abstract void apply(List<IRealization> realizations, OLAPContext olapContext);
+    public abstract void apply(List<Candidate> candidates);
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
deleted file mode 100644
index f457c7d..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.kylin.cube.CubeCapabilityChecker;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class AdjustForWeeklyMatchedRealization extends RoutingRule {
-    private static final Logger logger = LoggerFactory.getLogger(AdjustForWeeklyMatchedRealization.class);
-
-    @Override
-    public void apply(List<IRealization> realizations, OLAPContext olapContext) {
-        if (realizations.size() > 0) {
-            IRealization first = realizations.get(0);
-
-            if (first instanceof HybridInstance) {
-                HybridInstance hybrid = (HybridInstance) first;
-
-                if (hybrid.getRealizations()[0] instanceof CubeInstance)
-                    first = hybrid.getRealizations()[0];
-            }
-
-            if (first instanceof CubeInstance) {
-                CubeInstance cube = (CubeInstance) first;
-                adjustOLAPContextIfNecessary(cube, olapContext);
-            }
-
-            if (first instanceof IIInstance) {
-                IIInstance ii = (IIInstance) first;
-                adjustOLAPContextIfNecessary(ii, olapContext);
-            }
-        }
-    }
-
-    private static void adjustOLAPContextIfNecessary(IIInstance ii, OLAPContext olapContext) {
-        IIDesc iiDesc = ii.getDescriptor();
-        Collection<FunctionDesc> iiFuncs = iiDesc.listAllFunctions();
-        convertAggregationToDimension(olapContext, iiFuncs, iiDesc.getFactTableName());
-    }
-
-    private static void adjustOLAPContextIfNecessary(CubeInstance cube, OLAPContext olapContext) {
-        if (CubeCapabilityChecker.check(cube, olapContext.getSQLDigest(), false))
-            return;
-
-        CubeDesc cubeDesc = cube.getDescriptor();
-        Collection<FunctionDesc> cubeFuncs = cubeDesc.listAllFunctions();
-        convertAggregationToDimension(olapContext, cubeFuncs, cubeDesc.getFactTable());
-    }
-
-    private static void convertAggregationToDimension(OLAPContext olapContext, Collection<FunctionDesc> availableAggregations, String factTableName) {
-        Iterator<FunctionDesc> it = olapContext.aggregations.iterator();
-        while (it.hasNext()) {
-            FunctionDesc functionDesc = it.next();
-            if (!availableAggregations.contains(functionDesc)) {
-                // try to convert the metric to dimension to see if it works
-                TblColRef col = functionDesc.getParameter().getColRefs().get(0);
-                functionDesc.setDimensionAsMetric(true);
-                olapContext.rewriteFields.remove(functionDesc.getRewriteFieldName());
-                if (col != null) {
-                    olapContext.metricsColumns.remove(col);
-                    olapContext.groupByColumns.add(col);
-                }
-                logger.info("Adjust OLAPContext for " + functionDesc);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/CubesSortRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/CubesSortRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/CubesSortRule.java
deleted file mode 100644
index 1271344..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/CubesSortRule.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.kylin.common.util.PartialSorter;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-
-/**
- * Created by Hongbin Ma(Binmahone) on 1/5/15.
- */
-public class CubesSortRule extends RoutingRule {
-    @Override
-    public void apply(List<IRealization> realizations, OLAPContext olapContext) {
-
-        // sort cube candidates, 0) the cost indicator, 1) the lesser header
-        // columns the better, 2) the lesser body columns the better
-        List<Integer> items = super.findRealizationsOf(realizations, RealizationType.CUBE);
-        PartialSorter.partialSort(realizations, items, new Comparator<IRealization>() {
-            @Override
-            public int compare(IRealization o1, IRealization o2) {
-                CubeInstance c1 = (CubeInstance) o1;
-                CubeInstance c2 = (CubeInstance) o2;
-                int comp = 0;
-                comp = c1.getCost() - c2.getCost();
-                if (comp != 0) {
-                    return comp;
-                }
-
-                CubeDesc schema1 = c1.getDescriptor();
-                CubeDesc schema2 = c2.getDescriptor();
-
-                comp = schema1.listDimensionColumnsIncludingDerived().size() - schema2.listDimensionColumnsIncludingDerived().size();
-                if (comp != 0)
-                    return comp;
-
-                comp = schema1.getMeasures().size() - schema2.getMeasures().size();
-                return comp;
-            }
-        });
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
deleted file mode 100644
index 7513efd..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-
-import com.google.common.collect.Maps;
-
-/**
- * Created by Hongbin Ma(Binmahone) on 1/5/15.
- */
-public class RealizationPriorityRule extends RoutingRule {
-    static Map<RealizationType, Integer> priorities = Maps.newHashMap();
-
-    static {
-        priorities.put(RealizationType.CUBE, 1);
-        priorities.put(RealizationType.HYBRID, 1);
-        priorities.put(RealizationType.INVERTED_INDEX, 2);
-    }
-
-    public static void setPriorities(Map<RealizationType, Integer> priorities) {
-        RealizationPriorityRule.priorities = priorities;
-    }
-
-    public void apply(List<IRealization> realizations, OLAPContext olapContext) {
-
-        Collections.sort(realizations, new Comparator<IRealization>() {
-            @Override
-            public int compare(IRealization o1, IRealization o2) {
-                int i1 = priorities.get(o1.getType());
-                int i2 = priorities.get(o2.getType());
-                return i1 - i2;
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
deleted file mode 100644
index 8a1a228..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-
-/**
- */
-public class RealizationSortRule extends RoutingRule {
-    @Override
-    public void apply(List<IRealization> realizations, final OLAPContext olapContext) {
-
-        // sort cube candidates, 0) the priority 1) the cost indicator, 2) the lesser header
-        // columns the better, 3) the lesser body columns the better 4) the larger date range the better
-
-        Collections.sort(realizations, new Comparator<IRealization>() {
-            @Override
-            public int compare(IRealization o1, IRealization o2) {
-                int i1 = RealizationPriorityRule.priorities.get(o1.getType());
-                int i2 = RealizationPriorityRule.priorities.get(o2.getType());
-                int comp = i1 - i2;
-                if (comp != 0) {
-                    return comp;
-                }
-
-                comp = o1.getCost(olapContext.getSQLDigest()) - o2.getCost(olapContext.getSQLDigest());
-                if (comp != 0) {
-                    return comp;
-                }
-
-                if (o1 instanceof HybridInstance)
-                    return -1;
-                else if (o2 instanceof HybridInstance)
-                    return 1;
-
-                return 0;
-            }
-        });
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java
deleted file mode 100644
index 40b0491..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-
-/**
- * Created by Hongbin Ma(Binmahone) on 1/5/15.
- */
-public class RemoveUncapableRealizationsRule extends RoutingRule {
-    @Override
-    public void apply(List<IRealization> realizations, OLAPContext olapContext) {
-        for (Iterator<IRealization> iterator = realizations.iterator(); iterator.hasNext();) {
-            IRealization realization = iterator.next();
-            if (!realization.isCapable(olapContext.getSQLDigest())) {
-                iterator.remove();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/SimpleQueryMoreColumnsCubeFirstRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/SimpleQueryMoreColumnsCubeFirstRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/SimpleQueryMoreColumnsCubeFirstRule.java
deleted file mode 100644
index b3628b3..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/SimpleQueryMoreColumnsCubeFirstRule.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.kylin.common.util.PartialSorter;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-
-/**
- * Created by Hongbin Ma(Binmahone) on 1/5/15.
- */
-public class SimpleQueryMoreColumnsCubeFirstRule extends RoutingRule {
-    @Override
-    public void apply(List<IRealization> realizations, OLAPContext olapContext) {
-        List<Integer> itemIndexes = super.findRealizationsOf(realizations, RealizationType.CUBE);
-
-        if (olapContext.isSimpleQuery()) {
-            PartialSorter.partialSort(realizations, itemIndexes, new Comparator<IRealization>() {
-                @Override
-                public int compare(IRealization o1, IRealization o2) {
-                    CubeInstance c1 = (CubeInstance) o1;
-                    CubeInstance c2 = (CubeInstance) o2;
-                    return c1.getDescriptor().listDimensionColumnsIncludingDerived().size() - c2.getDescriptor().listDimensionColumnsIncludingDerived().size();
-                }
-            });
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java b/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java
new file mode 100644
index 0000000..d3c67d7
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.routing.rules;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kylin.query.routing.Candidate;
+import org.apache.kylin.query.routing.RoutingRule;
+
+/**
+ */
+public class RealizationSortRule extends RoutingRule {
+    @Override
+    public void apply(List<Candidate> candidates) {
+        Collections.sort(candidates);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java b/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java
new file mode 100644
index 0000000..576b47f
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.routing.rules;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.query.routing.Candidate;
+import org.apache.kylin.query.routing.RoutingRule;
+
+/**
+ */
+public class RemoveUncapableRealizationsRule extends RoutingRule {
+    @Override
+    public void apply(List<Candidate> candidates) {
+        for (Iterator<Candidate> iterator = candidates.iterator(); iterator.hasNext();) {
+            Candidate candidate = iterator.next();
+
+            CapabilityResult capability = candidate.getRealization().isCapable(candidate.getSqlDigest());
+            if (capability.capable)
+                candidate.setCapability(capability);
+            else
+                iterator.remove();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
index bbf1024..8b1ad29 100644
--- a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
+++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
@@ -175,7 +175,7 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab
                 metFields.add(fieldName);
                 ColumnDesc fakeCountCol = new ColumnDesc();
                 fakeCountCol.setName(fieldName);
-                fakeCountCol.setDatatype(func.getSQLType().toString());
+                fakeCountCol.setDatatype(func.getRewriteFieldType().toString());
                 if (func.isCount())
                     fakeCountCol.setNullable(false);
                 fakeCountCol.init(sourceTable);
@@ -188,11 +188,11 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab
         HashSet<ColumnDesc> updateColumns = Sets.newHashSet();
         for (MeasureDesc m : mgr.listEffectiveMeasures(olapSchema.getProjectName(), sourceTable.getIdentity(), false)) {
             if (m.getFunction().isSum()) {
-                FunctionDesc functionDesc = m.getFunction();
-                if (functionDesc.getReturnDataType() != functionDesc.getSQLType() && //
-                        functionDesc.getReturnDataType().isBigInt() && //
-                        functionDesc.getSQLType().isIntegerFamily()) {
-                    updateColumns.add(functionDesc.getParameter().getColRefs().get(0).getColumnDesc());
+                FunctionDesc func = m.getFunction();
+                if (//func.getReturnDataType() != func.getRewriteFieldType() && //
+                func.getReturnDataType().isBigInt() && //
+                        func.getRewriteFieldType().isIntegerFamily()) {
+                    updateColumns.add(func.getParameter().getColRefs().get(0).getColumnDesc());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java b/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
deleted file mode 100644
index 9716ff6..0000000
--- a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.query.sqlfunc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang
- */
-public class HLLDistinctCountAggFunc {
-
-    private static final Logger logger = LoggerFactory.getLogger(HLLDistinctCountAggFunc.class);
-
-    public static HyperLogLogPlusCounter init() {
-        return null;
-    }
-
-    public static HyperLogLogPlusCounter initAdd(Object v) {
-        if (v instanceof Long) { // holistic case
-            long l = (Long) v;
-            return new FixedValueHLLCMockup(l);
-        } else {
-            HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
-            return new HyperLogLogPlusCounter(c);
-        }
-    }
-
-    public static HyperLogLogPlusCounter add(HyperLogLogPlusCounter counter, Object v) {
-        if (v instanceof Long) { // holistic case
-            long l = (Long) v;
-            if (counter == null) {
-                return new FixedValueHLLCMockup(l);
-            } else {
-                if (!(counter instanceof FixedValueHLLCMockup))
-                    throw new IllegalStateException("counter is not FixedValueHLLCMockup");
-
-                ((FixedValueHLLCMockup) counter).set(l);
-                return counter;
-            }
-        } else {
-            HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
-            if (counter == null) {
-                return new HyperLogLogPlusCounter(c);
-            } else {
-                counter.merge(c);
-                return counter;
-            }
-        }
-    }
-
-    public static HyperLogLogPlusCounter merge(HyperLogLogPlusCounter counter0, Object counter1) {
-        return add(counter0, counter1);
-    }
-
-    public static long result(HyperLogLogPlusCounter counter) {
-        return counter == null ? 0L : counter.getCountEstimate();
-    }
-
-    private static class FixedValueHLLCMockup extends HyperLogLogPlusCounter {
-
-        private Long value = null;
-
-        FixedValueHLLCMockup(long value) {
-            this.value = value;
-        }
-
-        public void set(long value) {
-            if (this.value == null) {
-                this.value = value;
-            } else {
-                long oldValue = Math.abs(this.value.longValue());
-                long take = Math.max(oldValue, value);
-                logger.warn("Error to aggregate holistic count distinct, old value " + oldValue + ", new value " + value + ", taking " + take);
-                this.value = -take; // make it obvious that this value is wrong
-            }
-        }
-
-        @Override
-        public void clear() {
-            this.value = null;
-        }
-
-        @Override
-        protected void add(long hash) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void merge(HyperLogLogPlusCounter another) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public long getCountEstimate() {
-            return value;
-        }
-
-        @Override
-        public void writeRegisters(ByteBuffer out) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void readRegisters(ByteBuffer in) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = super.hashCode();
-            result = prime * result + (int) (value ^ (value >>> 32));
-            return result;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj)
-                return true;
-            if (!super.equals(obj))
-                return false;
-            if (getClass() != obj.getClass())
-                return false;
-            FixedValueHLLCMockup other = (FixedValueHLLCMockup) obj;
-            if (!value.equals(other.value))
-                return false;
-            return true;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
index 9356663..e8c03ae 100644
--- a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
@@ -21,7 +21,7 @@ package org.apache.kylin.query.test;
 import java.util.Map;
 
 import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.routing.RoutingRules.RealizationPriorityRule;
+import org.apache.kylin.query.routing.Candidate;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -35,10 +35,10 @@ public class IIQueryTest extends KylinQueryTest {
         KylinQueryTest.setUp();//invoke super class
 
         Map<RealizationType, Integer> priorities = Maps.newHashMap();
-        priorities.put(RealizationType.INVERTED_INDEX, 1);
-        priorities.put(RealizationType.CUBE, 2);
-        priorities.put(RealizationType.HYBRID, 2);
-        RealizationPriorityRule.setPriorities(priorities);
+        priorities.put(RealizationType.INVERTED_INDEX, 0);
+        priorities.put(RealizationType.CUBE, 1);
+        priorities.put(RealizationType.HYBRID, 1);
+        Candidate.setPriorities(priorities);
 
     }
 
@@ -47,10 +47,10 @@ public class IIQueryTest extends KylinQueryTest {
         KylinQueryTest.tearDown();//invoke super class
 
         Map<RealizationType, Integer> priorities = Maps.newHashMap();
-        priorities.put(RealizationType.INVERTED_INDEX, 2);
-        priorities.put(RealizationType.CUBE, 1);
-        priorities.put(RealizationType.HYBRID, 1);
-        RealizationPriorityRule.setPriorities(priorities);
+        priorities.put(RealizationType.INVERTED_INDEX, 1);
+        priorities.put(RealizationType.CUBE, 0);
+        priorities.put(RealizationType.HYBRID, 0);
+        Candidate.setPriorities(priorities);
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
index 84f1042..f9e575b 100644
--- a/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/KylinQueryTest.java
@@ -150,7 +150,7 @@ public class KylinQueryTest extends KylinTestBase {
     @Test
     public void testSingleExecuteQuery() throws Exception {
 
-        String queryFileName = "src/test/resources/query/sql/query39.sql";
+        String queryFileName = "src/test/resources/query/sql/query00.sql";
 
         File sqlFile = new File(queryFileName);
         String sql = getTextFromFile(sqlFile);

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java b/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
index 4ca9b47..95a11f8 100644
--- a/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
+++ b/query/src/test/java/org/apache/kylin/query/test/KylinTestBase.java
@@ -209,7 +209,7 @@ public class KylinTestBase {
         if (needSort) {
             queryTable = new SortedTable(queryTable, columnNames);
         }
-        // printResult(queryTable);
+         printResult(queryTable);
 
         return queryTable;
     }
@@ -265,7 +265,7 @@ public class KylinTestBase {
         if (needSort) {
             queryTable = new SortedTable(queryTable, columnNames);
         }
-        // printResult(queryTable);
+         printResult(queryTable);
         return queryTable;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/server/src/main/java/org/apache/kylin/rest/service/JobService.java
----------------------------------------------------------------------
diff --git a/server/src/main/java/org/apache/kylin/rest/service/JobService.java b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
index 25f245b..0903f83 100644
--- a/server/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeBuildTypeEnum;
@@ -143,13 +142,8 @@ public class JobService extends BasicService {
         builder.setSubmitter(submitter);
 
         if (buildType == CubeBuildTypeEnum.BUILD) {
-            if (cube.getDescriptor().hasHolisticCountDistinctMeasures() && cube.getSegments().size() > 0) {
-                Pair<CubeSegment, CubeSegment> segs = getCubeManager().appendAndMergeSegments(cube, endDate);
-                job = builder.buildAndMergeJob(segs.getFirst(), segs.getSecond());
-            } else {
-                CubeSegment newSeg = getCubeManager().appendSegments(cube, endDate);
-                job = builder.buildJob(newSeg);
-            }
+            CubeSegment newSeg = getCubeManager().appendSegments(cube, endDate);
+            job = builder.buildJob(newSeg);
         } else if (buildType == CubeBuildTypeEnum.MERGE) {
             CubeSegment newSeg = getCubeManager().mergeSegments(cube, startDate, endDate, forceMergeEmptySeg);
             job = builder.mergeJob(newSeg);

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java b/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java
index 49f1c32..f529145 100644
--- a/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/filter/BitMapFilterEvaluator.java
@@ -20,7 +20,7 @@ package org.apache.kylin.storage.filter;
 
 import java.util.List;
 
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/ColumnValueRange.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/ColumnValueRange.java b/storage/src/main/java/org/apache/kylin/storage/hbase/ColumnValueRange.java
index 9ae4a6d..7533800 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/ColumnValueRange.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/ColumnValueRange.java
@@ -24,7 +24,7 @@ import java.util.Iterator;
 import java.util.Set;
 
 import org.apache.kylin.cube.kv.RowKeyColumnOrder;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
index fa4ccd7..9627efb 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
@@ -22,11 +22,11 @@ import java.io.IOException;
 import java.text.MessageFormat;
 import java.util.BitSet;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.NoSuchElementException;
 
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
@@ -42,6 +42,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.StorageException;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -51,19 +52,21 @@ import org.apache.kylin.cube.kv.RowKeyDecoder;
 import org.apache.kylin.cube.kv.RowValueDecoder;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.metadata.tuple.TupleInfo;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler;
-import org.apache.kylin.storage.tuple.Tuple;
-import org.apache.kylin.storage.tuple.Tuple.IDerivedColumnFiller;
-import org.apache.kylin.storage.tuple.TupleInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 /**
  * @author xjiang
@@ -78,7 +81,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
     private final Collection<TblColRef> dimensions;
     private final TupleFilter filter;
     private final Collection<TblColRef> groupBy;
-    private final Collection<RowValueDecoder> rowValueDecoders;
+    private final List<RowValueDecoder> rowValueDecoders;
     private final StorageContext context;
     private final String tableName;
     private final HTableInterface table;
@@ -88,12 +91,21 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
     private Scan scan;
     private ResultScanner scanner;
     private Iterator<Result> resultIterator;
+    private CubeTupleConverter cubeTupleConverter;
     private TupleInfo tupleInfo;
-    private Tuple tuple;
+    private Tuple oneTuple;
+    private Tuple next;
     private int scanCount;
     private int scanCountDelta;
 
-    public CubeSegmentTupleIterator(CubeSegment cubeSeg, Collection<HBaseKeyRange> keyRanges, HConnection conn, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
+    final List<MeasureType<?>> measureTypes;
+    final List<MeasureType.IAdvMeasureFiller> advMeasureFillers;
+    final List<Pair<Integer, Integer>> advMeasureIndexInRV;//first=> which rowValueDecoders,second => metric index
+
+    private int advMeasureRowsRemaining;
+    private int advMeasureRowIndex;
+
+    public CubeSegmentTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context) {
         this.cube = cubeSeg.getCubeInstance();
         this.cubeSeg = cubeSeg;
         this.dimensions = dimensions;
@@ -103,14 +115,22 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         this.context = context;
         this.tableName = cubeSeg.getStorageLocationIdentifier();
         this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg);
+        
+        measureTypes = Lists.newArrayList();
+        advMeasureFillers = Lists.newArrayListWithCapacity(1);
+        advMeasureIndexInRV = Lists.newArrayListWithCapacity(1);
+
+        this.cubeTupleConverter = new CubeTupleConverter();
+        this.tupleInfo = buildTupleInfo(keyRanges.get(0).getCuboid());
+        this.oneTuple = new Tuple(this.tupleInfo);
+        this.rangeIterator = keyRanges.iterator();
+
 
         try {
             this.table = conn.getTable(tableName);
         } catch (Throwable t) {
             throw new StorageException("Error when open connection to table " + tableName, t);
         }
-        this.rangeIterator = keyRanges.iterator();
-        scanNextRange();
     }
 
     @Override
@@ -121,7 +141,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
 
     private void closeScanner() {
         flushScanCountDelta();
-        
+
         if (logger.isDebugEnabled() && scan != null) {
             logger.debug("Scan " + scan.toString());
             byte[] metricsBytes = scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
@@ -150,59 +170,90 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         }
     }
 
+    private void flushScanCountDelta() {
+        context.increaseTotalScanCount(scanCountDelta);
+        scanCountDelta = 0;
+    }
+
     @Override
-    public boolean hasNext() {
-        return rangeIterator.hasNext() || resultIterator.hasNext();
+    public void remove() {
+        throw new UnsupportedOperationException();
     }
 
     @Override
-    public Tuple next() {
-        // get next result from hbase
-        Result result = null;
-        while (hasNext()) {
-            if (resultIterator.hasNext()) {
-                result = this.resultIterator.next();
-                scanCount++;
-                if (++scanCountDelta >= 1000)
-                    flushScanCountDelta();
-                break;
-            } else {
-                scanNextRange();
+    public boolean hasNext() {
+
+        if (next != null)
+            return true;
+
+        // consume any left rows from advanced measure filler
+        if (advMeasureRowsRemaining > 0) {
+            for (MeasureType.IAdvMeasureFiller filler : advMeasureFillers) {
+                filler.fillTuplle(oneTuple, advMeasureRowIndex);
             }
+            advMeasureRowIndex++;
+            advMeasureRowsRemaining--;
+            next = oneTuple;
+            return true;
         }
-        if (result == null) {
-            return null;
+
+        if (resultIterator == null) {
+            if (rangeIterator.hasNext() == false)
+                return false;
+
+            resultIterator = doScan(rangeIterator.next());
+        }
+
+        if (resultIterator.hasNext() == false) {
+            closeScanner();
+            resultIterator = null;
+            return hasNext();
         }
-        // translate result to tuple
+
+        Result result = resultIterator.next();
+        scanCount++;
+        if (++scanCountDelta >= 1000)
+            flushScanCountDelta();
+
+        // translate into tuple
+        List<MeasureType.IAdvMeasureFiller> retFillers = null;
         try {
-            translateResult(result, this.tuple);
+            retFillers = translateResult(result, oneTuple);
         } catch (IOException e) {
-            throw new IllegalStateException("Can't translate result " + result, e);
+            throw new RuntimeException(e);
         }
-        return this.tuple;
-    }
 
-    private void flushScanCountDelta() {
-        context.increaseTotalScanCount(scanCountDelta);
-        scanCountDelta = 0;
-    }
+        // the simple case
+        if (retFillers == null) {
+            next = oneTuple;
+            return true;
+        }
 
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException();
-    }
+        // advanced measure filling, like TopN, will produce multiple tuples out of one record
+        advMeasureRowsRemaining = -1;
+        for (MeasureType.IAdvMeasureFiller filler : retFillers) {
+            if (advMeasureRowsRemaining < 0)
+                advMeasureRowsRemaining = filler.getNumOfRows();
+            if (advMeasureRowsRemaining != filler.getNumOfRows())
+                throw new IllegalStateException();
+        }
+        if (advMeasureRowsRemaining < 0)
+            throw new IllegalStateException();
 
-    private void scanNextRange() {
-        if (this.rangeIterator.hasNext()) {
-            closeScanner();
-            HBaseKeyRange keyRange = this.rangeIterator.next();
-            this.tupleInfo = buildTupleInfo(keyRange.getCuboid());
-            this.tuple = new Tuple(this.tupleInfo);
+        advMeasureRowIndex = 0;
+        return hasNext();
+    }
 
-            this.resultIterator = doScan(keyRange);
-        } else {
-            this.resultIterator = Collections.<Result> emptyList().iterator();
+    @Override
+    public Tuple next() {
+        if (next == null) {
+            hasNext();
+            if (next == null)
+                throw new NoSuchElementException();
         }
+        Tuple r = next;
+        next = null;
+        return r;
     }
 
     private final Iterator<Result> doScan(HBaseKeyRange keyRange) {
@@ -272,7 +323,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         scan.setCacheBlocks(true);
 
         // cache less when there are memory hungry measures
-        if (RowValueDecoder.hasMemHungryCountDistinct(rowValueDecoders)) {
+        if (RowValueDecoder.hasMemHungryMeasures(rowValueDecoders)) {
             scan.setCaching(scan.getCaching() / 10);
         }
     }
@@ -306,6 +357,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
     }
 
     private TupleInfo buildTupleInfo(Cuboid cuboid) {
+
         TupleInfo info = new TupleInfo();
         int index = 0;
         rowKeyDecoder.setCuboid(cuboid);
@@ -331,21 +383,48 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
                     info.setField(derivedField, derivedCol, derivedCol.getType().getName(), index++);
                 }
                 // add filler
-                info.addDerivedColumnFiller(Tuple.newDerivedColumnFiller(rowColumns, hostCols, deriveInfo, info, CubeManager.getInstance(this.cube.getConfig()), cubeSeg));
+                cubeTupleConverter.addDerivedColumnFiller(CubeTupleConverter.newDerivedColumnFiller(rowColumns, hostCols, deriveInfo, info, CubeManager.getInstance(this.cube.getConfig()), cubeSeg));
             }
         }
 
-        for (RowValueDecoder rowValueDecoder : this.rowValueDecoders) {
-            List<String> names = rowValueDecoder.getNames();
+        for (int i = 0; i < rowValueDecoders.size(); i++) {
+            RowValueDecoder rowValueDecoder = rowValueDecoders.get(i);
+            List<String> measureNames = rowValueDecoder.getNames();
             MeasureDesc[] measures = rowValueDecoder.getMeasures();
-            for (int i = 0; i < measures.length; i++) {
-                String dataType = measures[i].getFunction().getSQLType().getName();
-                info.setField(names.get(i), null, dataType, index++);
+
+            BitSet projectionIndex = rowValueDecoder.getProjectionIndex();
+            for (int mi = projectionIndex.nextSetBit(0); mi >= 0; mi = projectionIndex.nextSetBit(mi + 1)) {
+                FunctionDesc aggrFunc = measures[mi].getFunction();
+                String dataType = measures[mi].getFunction().getRewriteFieldType().getName();
+                info.setField(measureNames.get(mi), null, dataType, index++);
+
+                MeasureType<?> measureType = aggrFunc.getMeasureType();
+                if (measureType.needAdvancedTupleFilling()) {
+                    Map<TblColRef, Dictionary<String>> dictionaryMap = buildDictionaryMap(measureType.getColumnsNeedDictionary(aggrFunc));
+                    advMeasureFillers.add(measureType.getAdvancedTupleFiller(aggrFunc, tupleInfo, dictionaryMap));
+                    advMeasureIndexInRV.add(Pair.newPair(i, mi));
+                    measureTypes.add(null);
+                } else {
+                    measureTypes.add(measureType);
+                }
             }
+            //            for (int i = 0; i < measures.length; i++) {
+            //                String dataType = measures[i].getFunction().getRewriteFieldType().getName();
+            //                info.setField(measureNames.get(i), null, dataType, index++);
+            //            }
         }
         return info;
     }
 
+    // load only needed dictionaries
+    private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) {
+        Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+        for (TblColRef col : columnsNeedDictionary) {
+            result.put(col, cubeSeg.getDictionary(col));
+        }
+        return result;
+    }
+
     private String getFieldName(TblColRef column, Map<TblColRef, String> aliasMap) {
         String name = null;
         if (aliasMap != null) {
@@ -357,7 +436,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         return name;
     }
 
-    private void translateResult(Result res, Tuple tuple) throws IOException {
+    private List<MeasureType.IAdvMeasureFiller> translateResult(Result res, Tuple tuple) throws IOException {
         // groups
         byte[] rowkey = res.getRow();
         rowKeyDecoder.decode(rowkey);
@@ -373,11 +452,12 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
         }
 
         // derived
-        for (IDerivedColumnFiller filler : tupleInfo.getDerivedColumnFillers()) {
+        for (CubeTupleConverter.IDerivedColumnFiller filler : cubeTupleConverter.getDerivedColumnFillers()) {
             filler.fillDerivedColumns(dimensionValues, tuple);
         }
 
         // aggregations
+        int measureIndex = 0;
         for (RowValueDecoder rowValueDecoder : this.rowValueDecoders) {
             HBaseColumnDesc hbaseColumn = rowValueDecoder.getHBaseColumn();
             String columnFamily = hbaseColumn.getColumnFamilyName();
@@ -389,8 +469,23 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
             Object[] measureValues = rowValueDecoder.getValues();
             BitSet projectionIndex = rowValueDecoder.getProjectionIndex();
             for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
-                tuple.setMeasureValue(measureNames.get(i), measureValues[i]);
+                if (measureTypes.get(measureIndex) != null) {
+                    measureTypes.get(measureIndex).fillTupleSimply(tuple, tupleInfo.getFieldIndex(measureNames.get(i)), measureValues[i]);
+                }
+            }
+            measureIndex++;
+        }
+
+        // advanced measure filling, due to possible row split, will complete at caller side
+        if (advMeasureFillers.isEmpty()) {
+            return null;
+        } else {
+            for (int i = 0; i < advMeasureFillers.size(); i++) {
+                Pair<Integer, Integer> metricLocation = advMeasureIndexInRV.get(i);
+                Object measureValue = rowValueDecoders.get(metricLocation.getFirst()).getValues()[metricLocation.getSecond()];
+                advMeasureFillers.get(i).reload(measureValue);
             }
+            return advMeasureFillers;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
index bbf088e..e612eb1 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
@@ -35,6 +35,7 @@ import java.util.TreeSet;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
@@ -45,8 +46,8 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.cube.model.HBaseMappingDesc;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
@@ -87,6 +88,8 @@ public class CubeStorageEngine implements IStorageEngine {
 
     @Override
     public ITupleIterator search(StorageContext context, SQLDigest sqlDigest) {
+        // allow custom measures hack
+        notifyBeforeStorageQuery(sqlDigest);
 
         Collection<TblColRef> groups = sqlDigest.groupbyColumns;
         TupleFilter filter = sqlDigest.filter;
@@ -354,12 +357,6 @@ public class CubeStorageEngine implements IStorageEngine {
             for (HBaseColumnDesc hbCol : hbCols) {
                 bestHBCol = hbCol;
                 bestIndex = hbCol.findMeasureIndex(aggrFunc);
-                MeasureDesc measure = hbCol.getMeasures()[bestIndex];
-                // criteria for holistic measure: Exact Aggregation && Exact Cuboid
-                if (measure.isHolisticCountDistinct() && context.isExactAggregation()) {
-                    logger.info("Holistic count distinct chosen for " + aggrFunc);
-                    break;
-                }
             }
 
             RowValueDecoder codec = codecMap.get(bestHBCol);
@@ -628,7 +625,7 @@ public class CubeStorageEngine implements IStorageEngine {
     }
 
     private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) {
-        if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) {
+        if (RowValueDecoder.hasMemHungryMeasures(valueDecoders) == false) {
             return;
         }
 
@@ -638,7 +635,7 @@ public class CubeStorageEngine implements IStorageEngine {
             BitSet projectionIndex = decoder.getProjectionIndex();
             for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
                 FunctionDesc func = measures[i].getFunction();
-                rowSizeEst += func.getReturnDataType().getSpaceEstimate();
+                rowSizeEst += func.getReturnDataType().getStorageBytesEstimate();
             }
         }
 
@@ -665,4 +662,13 @@ public class CubeStorageEngine implements IStorageEngine {
         ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
     }
 
+
+    private void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
+        for (MeasureDesc measure : cubeDesc.getMeasures()) {
+            MeasureType<?> measureType = measure.getFunction().getMeasureType();
+            measureType.adjustSqlDigest(measure, sqlDigest);
+        }
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java
new file mode 100644
index 0000000..11987a4
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+
+public class CubeTupleConverter {
+
+    private final List<IDerivedColumnFiller> derivedColumnFillers;
+
+    public CubeTupleConverter() {
+        derivedColumnFillers = new ArrayList<IDerivedColumnFiller>();
+    }
+
+    public void addDerivedColumnFiller(IDerivedColumnFiller derivedColumnFiller) {
+        derivedColumnFillers.add(derivedColumnFiller);
+    }
+
+    public List<IDerivedColumnFiller> getDerivedColumnFillers() {
+        return derivedColumnFillers;
+    }
+
+    // ============================================================================
+
+    public static IDerivedColumnFiller newDerivedColumnFiller(List<TblColRef> rowColumns, TblColRef[] hostCols, CubeDesc.DeriveInfo deriveInfo, TupleInfo tupleInfo, CubeManager cubeMgr, CubeSegment cubeSegment) {
+
+        int[] hostIndex = new int[hostCols.length];
+        for (int i = 0; i < hostCols.length; i++) {
+            hostIndex[i] = rowColumns.indexOf(hostCols[i]);
+        }
+        String[] derivedFieldNames = new String[deriveInfo.columns.length];
+        for (int i = 0; i < deriveInfo.columns.length; i++) {
+            derivedFieldNames[i] = tupleInfo.getFieldName(deriveInfo.columns[i]);
+        }
+
+        switch (deriveInfo.type) {
+        case LOOKUP:
+            LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSegment, deriveInfo.dimension);
+            return new LookupFiller(hostIndex, lookupTable, deriveInfo, derivedFieldNames);
+        case PK_FK:
+            // composite key are split, see CubeDesc.initDimensionColumns()
+            return new PKFKFiller(hostIndex[0], derivedFieldNames[0]);
+        default:
+            throw new IllegalArgumentException();
+        }
+    }
+
+    public interface IDerivedColumnFiller {
+        public void fillDerivedColumns(List<String> rowValues, Tuple tuple);
+    }
+
+    static class PKFKFiller implements IDerivedColumnFiller {
+        final int hostIndex;
+        final String derivedFieldName;
+
+        public PKFKFiller(int hostIndex, String derivedFieldName) {
+            this.hostIndex = hostIndex;
+            this.derivedFieldName = derivedFieldName;
+        }
+
+        @Override
+        public void fillDerivedColumns(List<String> rowValues, Tuple tuple) {
+            String value = rowValues.get(hostIndex);
+            tuple.setDimensionValue(derivedFieldName, value);
+        }
+    }
+
+    static class LookupFiller implements IDerivedColumnFiller {
+
+        final int[] hostIndex;
+        final int hostLen;
+        final Array<String> lookupKey;
+        final LookupStringTable lookupTable;
+        final int[] derivedIndex;
+        final int derivedLen;
+        final String[] derivedFieldNames;
+
+        public LookupFiller(int[] hostIndex, LookupStringTable lookupTable, CubeDesc.DeriveInfo deriveInfo, String[] derivedFieldNames) {
+            this.hostIndex = hostIndex;
+            this.hostLen = hostIndex.length;
+            this.lookupKey = new Array<String>(new String[hostLen]);
+            this.lookupTable = lookupTable;
+            this.derivedIndex = new int[deriveInfo.columns.length];
+            this.derivedLen = derivedIndex.length;
+            this.derivedFieldNames = derivedFieldNames;
+
+            for (int i = 0; i < derivedLen; i++) {
+                derivedIndex[i] = deriveInfo.columns[i].getColumn().getZeroBasedIndex();
+            }
+        }
+
+        @Override
+        public void fillDerivedColumns(List<String> rowValues, Tuple tuple) {
+            for (int i = 0; i < hostLen; i++) {
+                lookupKey.data[i] = rowValues.get(hostIndex[i]);
+            }
+
+            String[] lookupRow = lookupTable.getRow(lookupKey);
+
+            if (lookupRow != null) {
+                for (int i = 0; i < derivedLen; i++) {
+                    String value = lookupRow[derivedIndex[i]];
+                    tuple.setDimensionValue(derivedFieldNames[i], value);
+                }
+            } else {
+                for (int i = 0; i < derivedLen; i++) {
+                    tuple.setDimensionValue(derivedFieldNames[i], null);
+                }
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
index a115753..d188a44 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
@@ -52,7 +52,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
     private ITupleIterator segmentIterator;
     private int scanCount;
 
-    public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, Collection<RowValueDecoder> rowValueDecoders, StorageContext context) {
+    public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, Collection<TblColRef> dimensions, TupleFilter filter, Collection<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context) {
 
         this.context = context;
         int limit = context.getLimit();

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
index 47a011a..26f95b7 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/AggregationCache.java
@@ -18,11 +18,12 @@
 
 package org.apache.kylin.storage.hbase.coprocessor;
 
-import java.util.SortedMap;
 
-import org.apache.kylin.metadata.measure.MeasureAggregator;
 
 import com.google.common.collect.Maps;
+import org.apache.kylin.measure.MeasureAggregator;
+
+import java.util.SortedMap;
 
 /**
  * Created by Hongbin Ma(Binmahone) on 11/27/14.
@@ -60,7 +61,7 @@ public abstract class AggregationCache {
                 rowMemBytes = 0;
                 MeasureAggregator[] measureAggregators = aggBufMap.get(aggBufMap.firstKey());
                 for (MeasureAggregator agg : measureAggregators) {
-                    rowMemBytes += agg.getMemBytes();
+                    rowMemBytes += agg.getMemBytesEstimate();
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorFilter.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorFilter.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorFilter.java
index b3e2d31..2f6bf67 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorFilter.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorFilter.java
@@ -24,7 +24,7 @@ import java.util.Set;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.cube.kv.RowKeyColumnIO;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.dict.ISegment;
 import org.apache.kylin.metadata.filter.*;
 import org.apache.kylin.metadata.filter.TupleFilterSerializer.Decorator;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorTupleFilterTranslator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorTupleFilterTranslator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorTupleFilterTranslator.java
index 6affc18..bd2b912 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorTupleFilterTranslator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorTupleFilterTranslator.java
@@ -19,7 +19,7 @@
 package org.apache.kylin.storage.hbase.coprocessor;
 
 import org.apache.kylin.cube.kv.RowKeyColumnIO;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java
index 83e84a9..c33d192 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationCache.java
@@ -21,7 +21,7 @@ package org.apache.kylin.storage.hbase.coprocessor.endpoint;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.storage.hbase.coprocessor.AggrKey;
 import org.apache.kylin.storage.hbase.coprocessor.AggregationCache;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/c721d679/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
index 028da1e..3b4a9b2 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
@@ -29,9 +29,10 @@ import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.invertedindex.index.RawTableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
-import org.apache.kylin.metadata.measure.MeasureAggregator;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.hllc.HLLCMeasureType;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
-import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorConstants;
 
@@ -91,7 +92,7 @@ public class EndpointAggregators {
                     throw new IllegalStateException("Column " + functionDesc.getParameter().getValue() + " is not found in II");
                 }
 
-                if (functionDesc.isCountDistinct()) {
+                if (HLLCMeasureType.isCountDistinct(functionDesc)) {
                     metricInfos[i] = new MetricInfo(MetricType.DistinctCount, index, functionDesc.getReturnDataType().getPrecision());
                 } else {
                     metricInfos[i] = new MetricInfo(MetricType.Normal, index);
@@ -143,10 +144,10 @@ public class EndpointAggregators {
         MeasureAggregator[] aggrs = new MeasureAggregator[funcNames.length];
         for (int i = 0; i < aggrs.length; i++) {
             if (metricInfos[i].type == MetricType.DistinctCount) {
-                aggrs[i] = MeasureAggregator.create(funcNames[i], dataTypes[i]);
+                aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getInstance(dataTypes[i]));
             } else {
                 //all other fixed length measures can be aggregated as long
-                aggrs[i] = MeasureAggregator.create(funcNames[i], "long");
+                aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getInstance("long"));
             }
         }
         return aggrs;