You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/02 04:15:33 UTC

[1/2] kylin git commit: KYLIN-976 Extract influenceCapabilityCheck()

Repository: kylin
Updated Branches:
  refs/heads/KYLIN-976 dac248804 -> 63f376a5a


KYLIN-976 Extract influenceCapabilityCheck()


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/6c85ead9
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/6c85ead9
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/6c85ead9

Branch: refs/heads/KYLIN-976
Commit: 6c85ead9595df519f60279d92e6d2b009894df00
Parents: dac2488
Author: Li, Yang <ya...@ebay.com>
Authored: Mon Nov 30 17:55:08 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Nov 30 17:55:08 2015 +0800

----------------------------------------------------------------------
 .../kylin/cube/CubeCapabilityChecker.java       | 215 ++++++++-----------
 .../apache/kylin/cube/CubeDimensionDeriver.java |  46 ----
 .../org/apache/kylin/cube/CubeInstance.java     |  23 +-
 .../org/apache/kylin/measure/MeasureType.java   |  13 +-
 .../kylin/measure/basic/BasicMeasureType.java   |   9 -
 .../kylin/measure/hllc/HLLCMeasureType.java     |   7 -
 .../kylin/measure/topn/TopNMeasureType.java     |  46 +++-
 .../metadata/realization/CapabilityResult.java  |  63 ++++++
 .../metadata/realization/IRealization.java      |  18 +-
 .../kylin/storage/hybrid/HybridInstance.java    |  32 +--
 .../storage/hybrid/HybridStorageQuery.java      |   2 +-
 .../apache/kylin/invertedindex/IIInstance.java  |  29 ++-
 .../apache/kylin/query/routing/Candidate.java   | 100 +++++++++
 .../apache/kylin/query/routing/QueryRouter.java |  56 ++++-
 .../apache/kylin/query/routing/RoutingRule.java |  22 +-
 .../AdjustForWeaklyMatchedRealization.java      |  99 ---------
 .../RoutingRules/RealizationPriorityRule.java   |  59 -----
 .../RoutingRules/RealizationSortRule.java       |  65 ------
 .../RemoveUncapableRealizationsRule.java        |  41 ----
 .../routing/rules/RealizationSortRule.java      |  34 +++
 .../rules/RemoveUncapableRealizationsRule.java  |  44 ++++
 .../apache/kylin/query/test/ITIIQueryTest.java  |   7 +-
 22 files changed, 499 insertions(+), 531 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 343bf11..0c1b3c9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -18,158 +18,104 @@
 
 package org.apache.kylin.cube;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.basic.BasicMeasureType;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
 import org.apache.kylin.metadata.realization.SQLDigest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import com.google.common.collect.Sets;
 
 /**
  */
 public class CubeCapabilityChecker {
     private static final Logger logger = LoggerFactory.getLogger(CubeCapabilityChecker.class);
 
-    public static boolean check(CubeInstance cube, SQLDigest digest, boolean allowWeakMatch) {
+    public static CapabilityResult check(CubeInstance cube, SQLDigest digest) {
+        CapabilityResult result = new CapabilityResult();
+        result.capable = false;
 
         // retrieve members from olapContext
-        Collection<TblColRef> dimensionColumns = CubeDimensionDeriver.getDimensionColumns(digest);
-        Collection<FunctionDesc> functions = digest.aggregations;
-        Collection<TblColRef> metricsColumns = digest.metricColumns;
+        Collection<TblColRef> dimensionColumns = getDimensionColumns(digest);
+        Collection<FunctionDesc> aggrFunctions = digest.aggregations;
         Collection<JoinDesc> joins = digest.joinDescs;
-        boolean hasTopN = hasTopNMeasure(cube.getDescriptor());
 
         // match dimensions & aggregations & joins
 
-        boolean isOnline = cube.isReady();
-
-        boolean matchDimensions = isMatchedWithDimensions(dimensionColumns, cube);
-        boolean matchAggregation = isMatchedWithAggregations(functions, cube);
-        boolean matchJoin = isMatchedWithJoins(joins, cube);
-
-        // Some cubes are not "perfectly" match, but still save them in case of usage
-        if (allowWeakMatch && isOnline && matchDimensions && !matchAggregation && matchJoin) {
-            // sometimes metrics are indeed dimensions
-            // e.g. select min(cal_dt) from ..., where cal_dt is actually a dimension
-            if (isWeaklyMatchedWithAggregations(functions, metricsColumns, cube)) {
-                logger.info("Weakly matched cube found " + cube.getName());
-                return true;
-            }
-        }
+        Collection<TblColRef> unmatchedDimensions = unmatchedDimensions(dimensionColumns, cube);
+        Collection<FunctionDesc> unmatchedAggregations = unmatchedAggregations(aggrFunctions, cube);
+        boolean isJoinMatch = isJoinMatch(joins, cube);
 
-        // for topn, the group column can come from measure
-        if (hasTopN & matchJoin && !matchDimensions && functions.size() == 1) {
-            boolean matchedTopN = isMatchedWithTopN(dimensionColumns, cube, digest);
-            matchDimensions = matchedTopN;
-            matchAggregation = matchedTopN;
+        if (!isJoinMatch) {
+            logger.info("Exclude cube " + cube.getName() + " because unmatched joins");
+            return result;
         }
 
-        if (!isOnline || !matchDimensions || !matchAggregation || !matchJoin) {
-            logger.info("Exclude cube " + cube.getName() + " because " + " isOnlne=" + isOnline + ",matchDimensions=" + matchDimensions + ",matchAggregation=" + matchAggregation + ",matchJoin=" + matchJoin);
-            return false;
+        // try custom measure types
+        if (!unmatchedDimensions.isEmpty() || !unmatchedAggregations.isEmpty()) {
+            tryCustomMeasureTypes(unmatchedDimensions, unmatchedAggregations, digest, cube, result);
         }
-
-        return true;
-    }
-
-    /**
-     * Check whether the cube can match the sql digest with TopN measure
-     * @param cube
-     * @param digest
-     * @return
-     */
-    public static boolean isMatchedWithTopN(CubeInstance cube, SQLDigest digest) {
-
-        boolean hasTopN = hasTopNMeasure(cube.getDescriptor());
         
-        if (hasTopN == false) {
-            return false;
+        // try dimension-as-measure
+        if (!unmatchedAggregations.isEmpty()) {
+            tryDimensionAsMeasures(unmatchedAggregations, digest, cube, result);
         }
         
-        Collection<TblColRef> dimensionColumns = CubeDimensionDeriver.getDimensionColumns(digest);
-        
-        boolean matchDimensions = isMatchedWithDimensions(dimensionColumns, cube);
-
-        if (matchDimensions == true) {
-            return false;
-        }
-
-        Collection<FunctionDesc> functions = digest.aggregations;
-        if (functions == null || functions.size() != 1) {
-            // topN only allow one measure
-            return false;
+        if (!unmatchedDimensions.isEmpty()) {
+            logger.info("Exclude cube " + cube.getName() + " because unmatched dimensions");
+            return result;
         }
-
-        return isMatchedWithTopN(dimensionColumns, cube, digest);
-    }
-
-    /**
-     * This is the method for internal, consumer need do the check as the public method did
-     * @param dimensionColumns
-     * @param cube
-     * @param digest
-     * @return
-     */
-    private static boolean isMatchedWithTopN(Collection<TblColRef> dimensionColumns, CubeInstance cube, SQLDigest digest) {
-        
-        CubeDesc cubeDesc = cube.getDescriptor();
-        Collection<FunctionDesc> functions = digest.aggregations;
         
-        FunctionDesc onlyFunction = functions.iterator().next();
-        if (onlyFunction.isSum() == false) {
-            // topN only support SUM expression
-            return false;
+        if (!unmatchedAggregations.isEmpty()) {
+            logger.info("Exclude cube " + cube.getName() + " because unmatched aggregations");
+            return result;
         }
 
-        Collection<TblColRef> dimensionColumnsCopy = new ArrayList<TblColRef>(dimensionColumns);
-        for (MeasureDesc measure : cubeDesc.getMeasures()) {
-            if (measure.getFunction().isTopN()) {
-                List<TblColRef> cols = measure.getFunction().getParameter().getColRefs();
-                TblColRef literalCol = cols.get(cols.size() - 1);
-                if (digest.groupbyColumns.contains(literalCol)) {
-                    dimensionColumnsCopy.remove(literalCol);
-                    if (isMatchedWithDimensions(dimensionColumnsCopy, cube)) {
-                        if (measure.getFunction().isTopNCompatibleSum(onlyFunction)) {
-                            return true;
-                        }
-                    }
-                    dimensionColumnsCopy.add(literalCol);
-                }
-            }
-        }
-
-        return false;
+        // cost will be minded by caller
+        result.capable = true;
+        return result;
     }
 
-    private static boolean hasTopNMeasure(CubeDesc cubeDesc) {
-        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
-            if (measureDesc.getFunction().isTopN())
-                return true;
-        }
+    private static Collection<TblColRef> getDimensionColumns(SQLDigest sqlDigest) {
+        Collection<TblColRef> groupByColumns = sqlDigest.groupbyColumns;
+        Collection<TblColRef> filterColumns = sqlDigest.filterColumns;
 
-        return false;
+        Collection<TblColRef> dimensionColumns = new HashSet<TblColRef>();
+        dimensionColumns.addAll(groupByColumns);
+        dimensionColumns.addAll(filterColumns);
+        return dimensionColumns;
     }
 
-    private static boolean isMatchedWithDimensions(Collection<TblColRef> dimensionColumns, CubeInstance cube) {
+    private static Set<TblColRef> unmatchedDimensions(Collection<TblColRef> dimensionColumns, CubeInstance cube) {
+        HashSet<TblColRef> result = Sets.newHashSet(dimensionColumns);
         CubeDesc cubeDesc = cube.getDescriptor();
-        boolean matchAgg = cubeDesc.listDimensionColumnsIncludingDerived().containsAll(dimensionColumns);
-        return matchAgg;
+        result.removeAll(cubeDesc.listDimensionColumnsIncludingDerived());
+        return result;
     }
 
-    private static boolean isMatchedWithAggregations(Collection<FunctionDesc> aggregations, CubeInstance cube) {
+    private static Set<FunctionDesc> unmatchedAggregations(Collection<FunctionDesc> aggregations, CubeInstance cube) {
+        HashSet<FunctionDesc> result = Sets.newHashSet(aggregations);
         CubeDesc cubeDesc = cube.getDescriptor();
-        boolean matchAgg = cubeDesc.listAllFunctions().containsAll(aggregations);
-        return matchAgg;
+        result.removeAll(cubeDesc.listAllFunctions());
+        return result;
     }
 
-    private static boolean isMatchedWithJoins(Collection<JoinDesc> joins, CubeInstance cube) {
+    private static boolean isJoinMatch(Collection<JoinDesc> joins, CubeInstance cube) {
         CubeDesc cubeDesc = cube.getDescriptor();
 
         List<JoinDesc> cubeJoins = new ArrayList<JoinDesc>(cubeDesc.getDimensions().size());
@@ -204,30 +150,55 @@ public class CubeCapabilityChecker {
         return true;
     }
 
-    private static boolean isWeaklyMatchedWithAggregations(Collection<FunctionDesc> aggregations, Collection<TblColRef> metricColumns, CubeInstance cube) {
+    private static void tryDimensionAsMeasures(Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, CubeInstance cube, CapabilityResult result) {
         CubeDesc cubeDesc = cube.getDescriptor();
         Collection<FunctionDesc> cubeFuncs = cubeDesc.listAllFunctions();
 
-        boolean matched = true;
-        for (FunctionDesc functionDesc : aggregations) {
-            if (cubeFuncs.contains(functionDesc))
+        Iterator<FunctionDesc> it = unmatchedAggregations.iterator();
+        while (it.hasNext()) {
+            FunctionDesc functionDesc = it.next();
+            
+            if (cubeFuncs.contains(functionDesc)) {
+                it.remove();
                 continue;
+            }
 
             // only inverted-index cube does not have count, and let calcite handle in this case
-            if (functionDesc.isCount())
+            if (functionDesc.isCount()) {
+                it.remove();
                 continue;
+            }
 
-            if (functionDesc.isCountDistinct()) // calcite can not handle distinct count
-                matched = false;
-
-            TblColRef col = null;
-            if (functionDesc.getParameter().getColRefs().size() > 0)
-                col = functionDesc.getParameter().getColRefs().get(0);
+            // calcite can not handle distinct count
+            if (functionDesc.isCountDistinct()) {
+                continue;
+            }
 
-            if (col == null || !cubeDesc.listDimensionColumnsIncludingDerived().contains(col)) {
-                matched = false;
+            // calcite can do aggregation from columns on-the-fly
+            List<TblColRef> neededCols = functionDesc.getParameter().getColRefs();
+            if (neededCols.size() > 0 && cubeDesc.listDimensionColumnsIncludingDerived().containsAll(neededCols)) {
+                result.influences.add(new CapabilityResult.DimensionAsMeasure(functionDesc));
+                it.remove();
+                continue;
             }
         }
-        return matched;
     }
+
+    // custom measure types can cover unmatched dimensions or measures
+    private static void tryCustomMeasureTypes(Collection<TblColRef> unmatchedDimensions, Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, CubeInstance cube, CapabilityResult result) {
+        CubeDesc cubeDesc = cube.getDescriptor();
+        for (MeasureDesc measure : cubeDesc.getMeasures()) {
+            if (unmatchedDimensions.isEmpty() && unmatchedAggregations.isEmpty())
+                break;
+            
+            MeasureType measureType = MeasureType.create(measure.getFunction());
+            if (measureType instanceof BasicMeasureType)
+                continue;
+            
+            CapabilityInfluence inf = measureType.influenceCapabilityCheck(unmatchedDimensions, unmatchedAggregations, digest, measure);
+            if (inf != null)
+                result.influences.add(inf);
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
deleted file mode 100644
index 138d01e..0000000
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDimensionDeriver.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube;
-
-import java.util.Collection;
-import java.util.HashSet;
-
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.SQLDigest;
-
-/**
- *
- * the unified logic for defining a sql's dimension
- */
-public class CubeDimensionDeriver {
-
-    public static Collection<TblColRef> getDimensionColumns(SQLDigest sqlDigest) {
-        Collection<TblColRef> groupByColumns = sqlDigest.groupbyColumns;
-        Collection<TblColRef> filterColumns = sqlDigest.filterColumns;
-
-        Collection<MeasureDesc> sortMeasures = sqlDigest.sortMeasures;
-        Collection<SQLDigest.OrderEnum> sortOrders = sqlDigest.sortOrders;
-                
-        Collection<TblColRef> dimensionColumns = new HashSet<TblColRef>();
-        dimensionColumns.addAll(groupByColumns);
-        dimensionColumns.addAll(filterColumns);
-        return dimensionColumns;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 2208136..81c7909 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -32,6 +32,8 @@ import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
@@ -339,12 +341,20 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
     }
 
     @Override
-    public boolean isCapable(SQLDigest digest) {
-        return CubeCapabilityChecker.check(this, digest, true);
+    public CapabilityResult isCapable(SQLDigest digest) {
+        CapabilityResult result = CubeCapabilityChecker.check(this, digest);
+        if (result.capable) {
+            result.cost = getCost(digest);
+            for (CapabilityInfluence i : result.influences) {
+                result.cost *= (i.suggestCostMultiplier() == 0) ? 1.0 : i.suggestCostMultiplier();
+            }
+        } else {
+            result.cost = -1;
+        }
+        return result;
     }
 
-    @Override
-    public int getCost(SQLDigest digest) {
+    private int getCost(SQLDigest digest) {
         int calculatedCost = cost;
 
         calculatedCost += getAllDimensions().size() * COST_WEIGHT_DIMENSION + getMeasures().size() * COST_WEIGHT_MEASURE;
@@ -357,11 +367,6 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
             }
         }
         
-        if (CubeCapabilityChecker.isMatchedWithTopN(this, digest)) {
-            // this is topN query
-            calculatedCost = calculatedCost / 3;
-        }
-
         return calculatedCost;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index 4fe59c0..8e7de6f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.measure;
 
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -29,6 +30,8 @@ import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
+import org.apache.kylin.metadata.realization.SQLDigest;
 
 import com.google.common.collect.Maps;
 
@@ -83,12 +86,18 @@ abstract public class MeasureType {
     
     abstract public MeasureAggregator<?> newAggregator();
  
-    abstract public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc);
-    
+    public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
+        return null;
+    }
+
     /* ============================================================================
      * Cube Selection
      * ---------------------------------------------------------------------------- */
     
+    public CapabilityInfluence influenceCapabilityCheck(Collection<TblColRef> unmatchedDimensions, Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, MeasureDesc measureDesc) {
+        return null;
+    }
+    
     /* ============================================================================
      * Query
      * ---------------------------------------------------------------------------- */

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
index fe53bab..f314870 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
@@ -18,8 +18,6 @@
 
 package org.apache.kylin.measure.basic;
 
-import java.util.List;
-
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.measure.MeasureType;
@@ -30,7 +28,6 @@ import org.apache.kylin.metadata.datatype.DoubleSerializer;
 import org.apache.kylin.metadata.datatype.LongSerializer;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
 
 public class BasicMeasureType extends MeasureType {
     
@@ -119,10 +116,4 @@ public class BasicMeasureType extends MeasureType {
         return FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName);
     }
     
-    @Override
-    public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
index 2ad7630..4a73478 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.measure.hllc;
 
-import java.util.List;
 import java.util.Map;
 
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
@@ -90,10 +89,4 @@ public class HLLCMeasureType extends MeasureType {
             return new LDCAggregator();
     }
 
-    @Override
-    public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index 561f9f1..d6c5a6f 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.measure.topn;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -33,8 +34,11 @@ import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.hllc.HLLCSerializer;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
+import org.apache.kylin.metadata.realization.SQLDigest;
 
 public class TopNMeasureType extends MeasureType {
 
@@ -43,9 +47,9 @@ public class TopNMeasureType extends MeasureType {
     public TopNMeasureType(DataType dataType) {
         if ("topn".equals(dataType.getName()) == false)
             throw new IllegalArgumentException();
-        
+
         this.dataType = dataType;
-        
+
         if (this.dataType.getPrecision() < 1 || this.dataType.getPrecision() > 1000)
             throw new IllegalArgumentException("TopN precision must be between 1 and 1000");
     }
@@ -59,11 +63,11 @@ public class TopNMeasureType extends MeasureType {
     public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
         return HLLCSerializer.class;
     }
-    
+
     @Override
     public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
         // TODO Auto-generated method stub
-        
+
     }
 
     @SuppressWarnings("rawtypes")
@@ -74,10 +78,10 @@ public class TopNMeasureType extends MeasureType {
             public TopNCounter valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
                 if (values.length != 2)
                     throw new IllegalArgumentException();
-                
+
                 double counter = values[0] == null ? 0 : Double.parseDouble(values[0]);
                 String literal = values[1];
-                
+
                 // encode literal using dictionary
                 TblColRef literalCol = measureDesc.getFunction().getTopNLiteralColumn();
                 Dictionary<String> dictionary = dictionaryMap.get(literalCol);
@@ -90,7 +94,7 @@ public class TopNMeasureType extends MeasureType {
                 topNCounter.offer(key, counter);
                 return topNCounter;
             }
-            
+
             @SuppressWarnings("unchecked")
             @Override
             public TopNCounter reEncodeDictionary(TopNCounter value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) {
@@ -135,4 +139,32 @@ public class TopNMeasureType extends MeasureType {
         return Collections.singletonList(literalCol);
     }
 
+    @Override
+    public CapabilityInfluence influenceCapabilityCheck(Collection<TblColRef> unmatchedDimensions, Collection<FunctionDesc> unmatchedAggregations, SQLDigest digest, MeasureDesc topN) {
+        // TopN measure can (and only can) provide one numeric measure and one literal dimension
+        // e.g. select seller, sum(gmv) from ... group by seller order by 2 desc limit 100
+
+        // check digest requires only one measure
+        if (digest.aggregations.size() != 1)
+            return null;
+
+        // the measure function must be SUM
+        FunctionDesc onlyFunction = digest.aggregations.iterator().next();
+        if (onlyFunction.isSum() == false)
+            return null;
+
+        TblColRef literalCol = topN.getFunction().getTopNLiteralColumn();
+        if (unmatchedDimensions.contains(literalCol) && topN.getFunction().isTopNCompatibleSum(onlyFunction)) {
+            unmatchedDimensions.remove(literalCol);
+            unmatchedAggregations.remove(onlyFunction);
+            return new CapabilityInfluence() {
+                @Override
+                public double suggestCostMultiplier() {
+                    return 0.3; // make sure TopN get ahead of other matched realizations
+                }
+            };
+        } else
+            return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java
new file mode 100644
index 0000000..cc5bae7
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/CapabilityResult.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.metadata.realization;
+
+import java.util.List;
+
+import org.apache.kylin.metadata.model.FunctionDesc;
+
+import com.google.common.collect.Lists;
+
+public class CapabilityResult {
+    
+    /** Is capable or not */
+    public boolean capable;
+
+    /** The smaller the cost, the more capable the realization */
+    public int cost;
+    
+    /**
+     * For info purpose, marker objects to indicate all special features
+     * (dimension-as-measure, topN etc.) that have decided the capability.
+     */
+    public List<CapabilityInfluence> influences = Lists.newArrayListWithCapacity(1);
+
+    public static interface CapabilityInfluence {
+        /** Suggest a multiplier to influence query cost */
+        double suggestCostMultiplier();
+    }
+    
+    public static class DimensionAsMeasure implements CapabilityInfluence {
+        
+        final FunctionDesc function;
+        
+        public DimensionAsMeasure(FunctionDesc function) {
+            this.function = function;
+        }
+
+        @Override
+        public double suggestCostMultiplier() {
+            return 1.0;
+        }
+        
+        public FunctionDesc getMeasureFunction() {
+            return function;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
index ccfa956..1857c85 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/IRealization.java
@@ -26,26 +26,14 @@ import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
 public interface IRealization extends IStorageAware {
-
-    public boolean isCapable(SQLDigest digest);
-
+    
     /**
-     * Given the features of a query, return an integer indicating how capable the realization
-     * is to answer the query.
-     *
-     * @return -1 if the realization cannot fulfill the query;
-     * or a number between 0-100 if the realization can answer the query, the smaller
-     * the number, the more efficient the realization.
-     * Especially,
-     * 0 - means the realization has the exact result pre-calculated, no less no more;
-     * 100 - means the realization will scan the full table with little or no indexing.
+     * Given the features of a query, check how capable the realization is to answer the query.
      */
-    public int getCost(SQLDigest digest);
+    public CapabilityResult isCapable(SQLDigest digest);
 
     /**
      * Get whether this specific realization is a cube or InvertedIndex
-     *
-     * @return
      */
     public RealizationType getType();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
index 2a6e5d3..98330ec 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridInstance.java
@@ -29,6 +29,7 @@ import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.RealizationEntry;
+import org.apache.kylin.metadata.realization.CapabilityResult;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationRegistry;
 import org.apache.kylin.metadata.realization.RealizationType;
@@ -152,24 +153,23 @@ public class HybridInstance extends RootPersistentEntity implements IRealization
     }
 
     @Override
-    public boolean isCapable(SQLDigest digest) {
+    public CapabilityResult isCapable(SQLDigest digest) {
+        CapabilityResult result = new CapabilityResult();
+        result.cost = Integer.MAX_VALUE;
+        
         for (IRealization realization : getRealizations()) {
-            if (realization.isCapable(digest))
-                return true;
-        }
-        return false;
-    }
-
-    @Override
-    public int getCost(SQLDigest digest) {
-        cost = Integer.MAX_VALUE;
-        for (IRealization realization : this.getRealizations()) {
-            if (realization.isCapable(digest))
-                cost = Math.min(cost, realization.getCost(digest));
+            CapabilityResult child = realization.isCapable(digest);
+            if (child.capable) {
+                result.capable = true;
+                result.cost = Math.min(result.cost, child.cost);
+                result.influences.addAll(child.influences);
+            }
         }
-
-        // Make hybrid always win its children
-        return cost - 1;
+        
+        if (result.cost > 0)
+            result.cost--; // let hybrid win its children
+        
+        return result;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageQuery.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageQuery.java b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageQuery.java
index 49f9fd9..8911c0e 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageQuery.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/hybrid/HybridStorageQuery.java
@@ -49,7 +49,7 @@ public class HybridStorageQuery implements IStorageQuery {
     public ITupleIterator search(final StorageContext context, final SQLDigest sqlDigest, final TupleInfo returnTupleInfo) {
         List<ITupleIterator> tupleIterators = Lists.newArrayList();
         for (int i = 0; i < realizations.length; i++) {
-            if (realizations[i].isReady() && realizations[i].isCapable(sqlDigest)) {
+            if (realizations[i].isReady() && realizations[i].isCapable(sqlDigest).capable) {
                 ITupleIterator dataIterator = storageEngines[i].search(context, sqlDigest, returnTupleInfo);
                 tupleIterators.add(dataIterator);
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
index 0ba07ee..114904a 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/IIInstance.java
@@ -26,7 +26,14 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.*;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.IBuildable;
+import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.metadata.model.LookupDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.CapabilityResult;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
 import org.apache.kylin.metadata.realization.RealizationType;
@@ -270,15 +277,21 @@ public class IIInstance extends RootPersistentEntity implements IRealization, IB
     }
 
     @Override
-    public boolean isCapable(SQLDigest digest) {
-        if (!digest.factTable.equalsIgnoreCase(this.getFactTable()))
-            return false;
-
-        return IICapabilityChecker.check(this, digest);
+    public CapabilityResult isCapable(SQLDigest digest) {
+        CapabilityResult result = new CapabilityResult();
+        
+        if (!digest.factTable.equalsIgnoreCase(this.getFactTable())) {
+            result.capable = false;
+        } else {
+            result.capable = IICapabilityChecker.check(this, digest);
+            if (result.capable)
+                result.cost = getCost(digest);
+        }
+        
+        return result;
     }
 
-    @Override
-    public int getCost(SQLDigest digest) {
+    private int getCost(SQLDigest digest) {
 
         int calculatedCost = cost;
         for (LookupDesc lookupDesc : this.getDescriptor().getModel().getLookups()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/Candidate.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/Candidate.java b/query/src/main/java/org/apache/kylin/query/routing/Candidate.java
new file mode 100644
index 0000000..5e4dcf6
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/routing/Candidate.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.routing;
+
+import java.util.Map;
+
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.storage.hybrid.HybridInstance;
+
+import com.google.common.collect.Maps;
+
+public class Candidate implements Comparable<Candidate> {
+
+    static final Map<RealizationType, Integer> PRIORITIES = Maps.newHashMap();
+
+    static {
+        PRIORITIES.put(RealizationType.HYBRID, 0);
+        PRIORITIES.put(RealizationType.CUBE, 0);
+        PRIORITIES.put(RealizationType.INVERTED_INDEX, 1);
+    }
+    
+    /** for test only */
+    public static void setPriorities(Map<RealizationType, Integer> priorities) {
+        PRIORITIES.clear();
+        PRIORITIES.putAll(priorities);
+    }
+
+    // ============================================================================
+
+    IRealization realization;
+    SQLDigest sqlDigest;
+    int priority;
+    CapabilityResult capability;
+
+    public Candidate(IRealization realization, SQLDigest sqlDigest) {
+        this.realization = realization;
+        this.sqlDigest = sqlDigest;
+        this.priority = PRIORITIES.get(realization.getType());
+    }
+    
+    public IRealization getRealization() {
+        return realization;
+    }
+    
+    public SQLDigest getSqlDigest() {
+        return sqlDigest;
+    }
+
+    public int getPriority() {
+        return priority;
+    }
+
+    public CapabilityResult getCapability() {
+        return capability;
+    }
+
+    public void setCapability(CapabilityResult capability) {
+        this.capability = capability;
+    }
+
+    @Override
+    public int compareTo(Candidate o) {
+        int comp = this.priority - o.priority;
+        if (comp != 0) {
+            return comp;
+        }
+
+        comp = this.capability.cost - o.capability.cost;
+        if (comp != 0) {
+            return comp;
+        }
+
+        if (this.realization instanceof HybridInstance)
+            return -1;
+        else if (o.realization instanceof HybridInstance)
+            return 1;
+        else
+            return 0;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java b/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
index 7f2bb49..7493e08 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/QueryRouter.java
@@ -19,10 +19,17 @@
 package org.apache.kylin.query.routing;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.project.ProjectManager;
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.metadata.realization.CapabilityResult.CapabilityInfluence;
+import org.apache.kylin.metadata.realization.CapabilityResult.DimensionAsMeasure;
 import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.SQLDigest;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,20 +49,51 @@ public class QueryRouter {
         logger.info("The project manager's reference is " + prjMgr);
         String factTableName = olapContext.firstTableScan.getTableName();
         String projectName = olapContext.olapSchema.getProjectName();
-        List<IRealization> realizations = Lists.newArrayList(prjMgr.getRealizationsByTable(projectName, factTableName));
-        logger.info("Find candidates by table " + factTableName + " and project=" + projectName + " : " + StringUtils.join(realizations, ","));
+        Set<IRealization> realizations = prjMgr.getRealizationsByTable(projectName, factTableName);
+        SQLDigest sqlDigest = olapContext.getSQLDigest();
 
-        //rule based realization selection, rules might reorder realizations or remove specific realization
-        RoutingRule.applyRules(realizations, olapContext);
+        List<Candidate> candidates = Lists.newArrayListWithCapacity(realizations.size());
+        for (IRealization real : realizations) {
+            if (real.isReady())
+                candidates.add(new Candidate(real, sqlDigest));
+        }
+
+        logger.info("Find candidates by table " + factTableName + " and project=" + projectName + " : " + StringUtils.join(candidates, ","));
 
-        if (realizations.size() == 0) {
-            throw new NoRealizationFoundException("Can't find any realization. Please confirm with providers. SQL digest: " + olapContext.getSQLDigest().toString());
+        // rule based realization selection, rules might reorder realizations or remove specific realization
+        RoutingRule.applyRules(candidates);
+
+        if (candidates.size() == 0) {
+            throw new NoRealizationFoundException("Can't find any realization. Please confirm with providers. SQL digest: " + sqlDigest.toString());
         }
 
+        Candidate chosen = candidates.get(0);
+        adjustForDimensionAsMeasure(chosen, olapContext);
+
         logger.info("The realizations remaining: ");
-        logger.info(RoutingRule.getPrintableText(realizations));
-        logger.info("The realization being chosen: " + realizations.get(0).getName());
+        logger.info(RoutingRule.getPrintableText(candidates));
+        logger.info("The realization being chosen: " + chosen.realization.getName());
 
-        return realizations.get(0);
+        return chosen.realization;
     }
+
+    private static void adjustForDimensionAsMeasure(Candidate chosen, OLAPContext olapContext) {
+        CapabilityResult capability = chosen.getCapability();
+        for (CapabilityInfluence inf : capability.influences) {
+            // convert the metric to dimension
+            if (inf instanceof DimensionAsMeasure) {
+                FunctionDesc functionDesc = ((DimensionAsMeasure) inf).getMeasureFunction();
+                functionDesc.setDimensionAsMetric(true);
+                olapContext.rewriteFields.remove(functionDesc.getRewriteFieldName());
+                for (TblColRef col : functionDesc.getParameter().getColRefs()) {
+                    if (col != null) {
+                        olapContext.metricsColumns.remove(col);
+                        olapContext.groupByColumns.add(col);
+                    }
+                }
+                logger.info("Adjust DimensionAsMeasure for " + functionDesc);
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
index 1cd55d4..715f6d1 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRule.java
@@ -23,10 +23,8 @@ import java.util.List;
 
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRules.AdjustForWeaklyMatchedRealization;
-import org.apache.kylin.query.routing.RoutingRules.RealizationSortRule;
-import org.apache.kylin.query.routing.RoutingRules.RemoveUncapableRealizationsRule;
+import org.apache.kylin.query.routing.rules.RealizationSortRule;
+import org.apache.kylin.query.routing.rules.RemoveUncapableRealizationsRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,23 +42,23 @@ public abstract class RoutingRule {
     static {
         rules.add(new RemoveUncapableRealizationsRule());
         rules.add(new RealizationSortRule());
-        rules.add(new AdjustForWeaklyMatchedRealization());//this rule might modify olapcontext content, better put it at last
     }
 
-    public static void applyRules(List<IRealization> realizations, OLAPContext olapContext) {
+    public static void applyRules(List<Candidate> candidates) {
         for (RoutingRule rule : rules) {
-            logger.info("Realizations order before: " + getPrintableText(realizations));
+            logger.info("Realizations order before: " + getPrintableText(candidates));
             logger.info("Applying rule : " + rule);
-            rule.apply(realizations, olapContext);
-            logger.info("Realizations order after: " + getPrintableText(realizations));
+            rule.apply(candidates);
+            logger.info("Realizations order after: " + getPrintableText(candidates));
             logger.info("===================================================");
         }
     }
 
-    public static String getPrintableText(List<IRealization> realizations) {
+    public static String getPrintableText(List<Candidate> candidates) {
         StringBuffer sb = new StringBuffer();
         sb.append("[");
-        for (IRealization r : realizations) {
+        for (Candidate candidate : candidates) {
+            IRealization r = candidate.realization;
             sb.append(r.getName());
             sb.append(",");
         }
@@ -108,6 +106,6 @@ public abstract class RoutingRule {
         return this.getClass().toString();
     }
 
-    public abstract void apply(List<IRealization> realizations, OLAPContext olapContext);
+    public abstract void apply(List<Candidate> candidates);
 
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeaklyMatchedRealization.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeaklyMatchedRealization.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeaklyMatchedRealization.java
deleted file mode 100644
index 250f2a6..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeaklyMatchedRealization.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.kylin.cube.CubeCapabilityChecker;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.invertedindex.IIInstance;
-import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- */
-public class AdjustForWeaklyMatchedRealization extends RoutingRule {
-    private static final Logger logger = LoggerFactory.getLogger(AdjustForWeaklyMatchedRealization.class);
-
-    @Override
-    public void apply(List<IRealization> realizations, OLAPContext olapContext) {
-        if (realizations.size() > 0) {
-            IRealization first = realizations.get(0);
-
-            if (first instanceof HybridInstance) {
-                HybridInstance hybrid = (HybridInstance) first;
-                first = hybrid.getLatestRealization();
-            }
-
-            if (first instanceof CubeInstance) {
-                CubeInstance cube = (CubeInstance) first;
-                adjustOLAPContextIfNecessary(cube, olapContext);
-            }
-
-            if (first instanceof IIInstance) {
-                IIInstance ii = (IIInstance) first;
-                adjustOLAPContextIfNecessary(ii, olapContext);
-            }
-        }
-    }
-
-    private static void adjustOLAPContextIfNecessary(IIInstance ii, OLAPContext olapContext) {
-        IIDesc iiDesc = ii.getDescriptor();
-        Collection<FunctionDesc> iiFuncs = iiDesc.listAllFunctions();
-        convertAggregationToDimension(olapContext, iiFuncs);
-    }
-
-    private static void adjustOLAPContextIfNecessary(CubeInstance cube, OLAPContext olapContext) {
-        if (CubeCapabilityChecker.check(cube, olapContext.getSQLDigest(), false))
-            return;
-
-        CubeDesc cubeDesc = cube.getDescriptor();
-        Collection<FunctionDesc> cubeFuncs = cubeDesc.listAllFunctions();
-        convertAggregationToDimension(olapContext, cubeFuncs);
-    }
-
-    private static void convertAggregationToDimension(OLAPContext olapContext, Collection<FunctionDesc> availableAggregations) {
-        Iterator<FunctionDesc> it = olapContext.aggregations.iterator();
-        while (it.hasNext()) {
-            FunctionDesc functionDesc = it.next();
-            if (!availableAggregations.contains(functionDesc)) {
-                // try to convert the metric to dimension to see if it works
-                TblColRef col = functionDesc.getParameter().getColRefs().get(0);
-                functionDesc.setDimensionAsMetric(true);
-                olapContext.rewriteFields.remove(functionDesc.getRewriteFieldName());
-                if (col != null) {
-                    olapContext.metricsColumns.remove(col);
-                    olapContext.groupByColumns.add(col);
-                }
-                logger.info("Adjust OLAPContext for " + functionDesc);
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
deleted file mode 100644
index 6e196d9..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationPriorityRule.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-
-import com.google.common.collect.Maps;
-
-/**
- */
-public class RealizationPriorityRule extends RoutingRule {
-    static Map<RealizationType, Integer> priorities = Maps.newHashMap();
-
-    static {
-        priorities.put(RealizationType.HYBRID, 0);
-        priorities.put(RealizationType.CUBE, 0);
-        priorities.put(RealizationType.INVERTED_INDEX, 1);
-    }
-
-    public static void setPriorities(Map<RealizationType, Integer> priorities) {
-        RealizationPriorityRule.priorities = priorities;
-    }
-
-    public void apply(List<IRealization> realizations, OLAPContext olapContext) {
-
-        Collections.sort(realizations, new Comparator<IRealization>() {
-            @Override
-            public int compare(IRealization o1, IRealization o2) {
-                int i1 = priorities.get(o1.getType());
-                int i2 = priorities.get(o2.getType());
-                return i1 - i2;
-            }
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
deleted file mode 100644
index d101288..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RealizationSortRule.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-import org.apache.kylin.storage.hybrid.HybridInstance;
-
-/**
- */
-public class RealizationSortRule extends RoutingRule {
-    @Override
-    public void apply(List<IRealization> realizations, final OLAPContext olapContext) {
-
-        // sort cube candidates, 0) the priority 1) the cost indicator, 2) the lesser header
-        // columns the better, 3) the lesser body columns the better 4) the larger date range the better
-
-        Collections.sort(realizations, new Comparator<IRealization>() {
-            @Override
-            public int compare(IRealization o1, IRealization o2) {
-                int i1 = RealizationPriorityRule.priorities.get(o1.getType());
-                int i2 = RealizationPriorityRule.priorities.get(o2.getType());
-                int comp = i1 - i2;
-                if (comp != 0) {
-                    return comp;
-                }
-
-                comp = o1.getCost(olapContext.getSQLDigest()) - o2.getCost(olapContext.getSQLDigest());
-                if (comp != 0) {
-                    return comp;
-                }
-
-                if (o1 instanceof HybridInstance)
-                    return -1;
-                else if (o2 instanceof HybridInstance)
-                    return 1;
-
-                return 0;
-            }
-        });
-
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java
deleted file mode 100644
index c710ff1..0000000
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/RemoveUncapableRealizationsRule.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.routing.RoutingRules;
-
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.query.relnode.OLAPContext;
-import org.apache.kylin.query.routing.RoutingRule;
-
-/**
- */
-public class RemoveUncapableRealizationsRule extends RoutingRule {
-    @Override
-    public void apply(List<IRealization> realizations, OLAPContext olapContext) {
-        for (Iterator<IRealization> iterator = realizations.iterator(); iterator.hasNext();) {
-            IRealization realization = iterator.next();
-            if (!realization.isCapable(olapContext.getSQLDigest())) {
-                iterator.remove();
-            }
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java b/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java
new file mode 100644
index 0000000..d3c67d7
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/routing/rules/RealizationSortRule.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.routing.rules;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kylin.query.routing.Candidate;
+import org.apache.kylin.query.routing.RoutingRule;
+
+/**
+ */
+public class RealizationSortRule extends RoutingRule {
+    @Override
+    public void apply(List<Candidate> candidates) {
+        Collections.sort(candidates);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java b/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java
new file mode 100644
index 0000000..576b47f
--- /dev/null
+++ b/query/src/main/java/org/apache/kylin/query/routing/rules/RemoveUncapableRealizationsRule.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.routing.rules;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.kylin.metadata.realization.CapabilityResult;
+import org.apache.kylin.query.routing.Candidate;
+import org.apache.kylin.query.routing.RoutingRule;
+
+/**
+ */
+public class RemoveUncapableRealizationsRule extends RoutingRule {
+    @Override
+    public void apply(List<Candidate> candidates) {
+        for (Iterator<Candidate> iterator = candidates.iterator(); iterator.hasNext();) {
+            Candidate candidate = iterator.next();
+
+            CapabilityResult capability = candidate.getRealization().isCapable(candidate.getSqlDigest());
+            if (capability.capable)
+                candidate.setCapability(capability);
+            else
+                iterator.remove();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/6c85ead9/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java
index 78b3d1c..344433a 100644
--- a/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/ITIIQueryTest.java
@@ -23,10 +23,9 @@ import java.util.Collection;
 import java.util.Map;
 
 import org.apache.kylin.metadata.realization.RealizationType;
-import org.apache.kylin.query.routing.RoutingRules.RealizationPriorityRule;
+import org.apache.kylin.query.routing.Candidate;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -45,7 +44,7 @@ public class ITIIQueryTest extends ITKylinQueryTest {
         priorities.put(RealizationType.INVERTED_INDEX, 0);
         priorities.put(RealizationType.CUBE, 1);
         priorities.put(RealizationType.HYBRID, 1);
-        RealizationPriorityRule.setPriorities(priorities);
+        Candidate.setPriorities(priorities);
 
     }
 
@@ -57,7 +56,7 @@ public class ITIIQueryTest extends ITKylinQueryTest {
         priorities.put(RealizationType.INVERTED_INDEX, 1);
         priorities.put(RealizationType.CUBE, 0);
         priorities.put(RealizationType.HYBRID, 0);
-        RealizationPriorityRule.setPriorities(priorities);
+        Candidate.setPriorities(priorities);
     }
 
     @Parameterized.Parameters


[2/2] kylin git commit: KYLIN-976 Extract Rewrite related methdos

Posted by li...@apache.org.
KYLIN-976 Extract Rewrite related methdos


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/63f376a5
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/63f376a5
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/63f376a5

Branch: refs/heads/KYLIN-976
Commit: 63f376a5aaea84fa66e02ef27b50f11c008790b8
Parents: 6c85ead
Author: Li, Yang <ya...@ebay.com>
Authored: Tue Dec 1 11:39:31 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Tue Dec 1 11:39:31 2015 +0800

----------------------------------------------------------------------
 .../kylin/cube/CubeCapabilityChecker.java       |   2 +-
 .../org/apache/kylin/cube/model/CubeDesc.java   |   2 +-
 .../apache/kylin/measure/MeasureIngester.java   |   6 +-
 .../org/apache/kylin/measure/MeasureType.java   |  10 +-
 .../kylin/measure/basic/BasicMeasureType.java   |  11 ++
 .../kylin/measure/basic/BigDecimalIngester.java |   5 -
 .../kylin/measure/basic/DoubleIngester.java     |   5 -
 .../kylin/measure/basic/LongIngester.java       |   5 -
 .../kylin/measure/hllc/HLLCMeasureType.java     |  16 +-
 .../measure/hllc/HLLDistinctCountAggFunc.java   | 153 +++++++++++++++++++
 .../kylin/measure/topn/TopNMeasureType.java     |  10 ++
 .../kylin/metadata/model/FunctionDesc.java      |  39 +++--
 .../mr/steps/MergeCuboidFromStorageMapper.java  |   2 +-
 .../engine/mr/steps/MergeCuboidMapper.java      |   2 +-
 .../kylin/query/relnode/OLAPAggregateRel.java   |  13 +-
 .../apache/kylin/query/schema/OLAPTable.java    |  10 +-
 .../query/sqlfunc/HLLDistinctCountAggFunc.java  | 152 ------------------
 17 files changed, 237 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 0c1b3c9..624bb0b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -191,7 +191,7 @@ public class CubeCapabilityChecker {
             if (unmatchedDimensions.isEmpty() && unmatchedAggregations.isEmpty())
                 break;
             
-            MeasureType measureType = MeasureType.create(measure.getFunction());
+            MeasureType measureType = measure.getFunction().getMeasureType();
             if (measureType instanceof BasicMeasureType)
                 continue;
             

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 3e8ee13..050aef2 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -819,7 +819,7 @@ public class CubeDesc extends RootPersistentEntity {
         }
 
         for (MeasureDesc measure : measures) {
-            MeasureType aggrType = MeasureType.create(measure.getFunction());
+            MeasureType aggrType = measure.getFunction().getMeasureType();
             result.addAll(aggrType.getColumnsNeedDictionary(measure));
         }
         return result;

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
index 9c7b406..bc387fe 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureIngester.java
@@ -28,7 +28,7 @@ import org.apache.kylin.metadata.model.TblColRef;
 abstract public class MeasureIngester<V> {
     
     public static MeasureIngester<?> create(MeasureDesc measure) {
-        return MeasureType.create(measure.getFunction()).newIngester();
+        return measure.getFunction().getMeasureType().newIngester();
     }
     
     public static MeasureIngester<?>[] create(Collection<MeasureDesc> measures) {
@@ -42,5 +42,7 @@ abstract public class MeasureIngester<V> {
 
     abstract public V valueOf(String[] values, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> dictionaryMap);
     
-    abstract public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts);
+    public V reEncodeDictionary(V value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDicts, Map<TblColRef, Dictionary<String>> newDicts) {
+        throw new UnsupportedOperationException();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index 8e7de6f..0891d1e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -45,10 +45,6 @@ abstract public class MeasureType {
         factoryRegistry.put(FunctionDesc.FUNC_TOP_N, new TopNMeasureFactory());
     }
     
-    public static MeasureType create(FunctionDesc function) {
-        return create(function.getExpression(), function.getReturnType());
-    }
-
     public static MeasureType create(String funcName, String dataType) {
         funcName = funcName.toUpperCase();
         dataType = dataType.toLowerCase();
@@ -102,6 +98,12 @@ abstract public class MeasureType {
      * Query
      * ---------------------------------------------------------------------------- */
     
+    // TODO support user defined calcite aggr function
+    
+    abstract public boolean needRewrite();
+    
+    abstract public Class<?> getRewriteAggregationFunctionClass();
+    
     /* ============================================================================
      * Storage
      * ---------------------------------------------------------------------------- */

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
index f314870..a6b36bb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
@@ -115,5 +115,16 @@ public class BasicMeasureType extends MeasureType {
     private boolean isMin() {
         return FunctionDesc.FUNC_MIN.equalsIgnoreCase(funcName);
     }
+
+    @Override
+    public boolean needRewrite() {
+        return !isSum();
+    }
+
+    @Override
+    public Class<?> getRewriteAggregationFunctionClass() {
+        // TODO Auto-generated method stub
+        return null;
+    }
     
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
index ea1495c..721ba00 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BigDecimalIngester.java
@@ -38,9 +38,4 @@ public class BigDecimalIngester extends MeasureIngester<BigDecimal> {
         else
             return new BigDecimal(values[0]);
     }
-
-    @Override
-    public BigDecimal reEncodeDictionary(BigDecimal value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) {
-        throw new UnsupportedOperationException();
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
index aaa754a..70ca727 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/DoubleIngester.java
@@ -43,9 +43,4 @@ public class DoubleIngester extends MeasureIngester<DoubleMutable> {
             l.set(Double.parseDouble(values[0]));
         return l;
     }
-
-    @Override
-    public DoubleMutable reEncodeDictionary(DoubleMutable value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) {
-        throw new UnsupportedOperationException();
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
index bdc1704..2547162 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/LongIngester.java
@@ -43,9 +43,4 @@ public class LongIngester extends MeasureIngester<LongMutable> {
             l.set(Long.parseLong(values[0]));
         return l;
     }
-
-    @Override
-    public LongMutable reEncodeDictionary(LongMutable value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) {
-        throw new UnsupportedOperationException();
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
index 4a73478..fd71c00 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
@@ -73,11 +73,6 @@ public class HLLCMeasureType extends MeasureType {
                     hllc.add(v == null ? "__nUlL__" : v);
                 return hllc;
             }
-
-            @Override
-            public HyperLogLogPlusCounter reEncodeDictionary(HyperLogLogPlusCounter value, MeasureDesc measureDesc, Map<TblColRef, Dictionary<String>> oldDict, Map<TblColRef, Dictionary<String>> newDict) {
-                throw new UnsupportedOperationException();
-            }
         };
     }
 
@@ -89,4 +84,15 @@ public class HLLCMeasureType extends MeasureType {
             return new LDCAggregator();
     }
 
+    @Override
+    public boolean needRewrite() {
+        return true;
+    }
+
+    @Override
+    public Class<?> getRewriteAggregationFunctionClass() {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
new file mode 100644
index 0000000..b00da5f
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLDistinctCountAggFunc.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.hllc;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * @author xjiang
+ */
+public class HLLDistinctCountAggFunc {
+
+    private static final Logger logger = LoggerFactory.getLogger(HLLDistinctCountAggFunc.class);
+
+    public static HyperLogLogPlusCounter init() {
+        return null;
+    }
+
+    public static HyperLogLogPlusCounter initAdd(Object v) {
+        if (v instanceof Long) { // holistic case
+            long l = (Long) v;
+            return new FixedValueHLLCMockup(l);
+        } else {
+            HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
+            return new HyperLogLogPlusCounter(c);
+        }
+    }
+
+    public static HyperLogLogPlusCounter add(HyperLogLogPlusCounter counter, Object v) {
+        if (v instanceof Long) { // holistic case
+            long l = (Long) v;
+            if (counter == null) {
+                return new FixedValueHLLCMockup(l);
+            } else {
+                if (!(counter instanceof FixedValueHLLCMockup))
+                    throw new IllegalStateException("counter is not FixedValueHLLCMockup");
+
+                ((FixedValueHLLCMockup) counter).set(l);
+                return counter;
+            }
+        } else {
+            HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
+            if (counter == null) {
+                return new HyperLogLogPlusCounter(c);
+            } else {
+                counter.merge(c);
+                return counter;
+            }
+        }
+    }
+
+    public static HyperLogLogPlusCounter merge(HyperLogLogPlusCounter counter0, Object counter1) {
+        return add(counter0, counter1);
+    }
+
+    public static long result(HyperLogLogPlusCounter counter) {
+        return counter == null ? 0L : counter.getCountEstimate();
+    }
+
+    @SuppressWarnings("serial")
+    private static class FixedValueHLLCMockup extends HyperLogLogPlusCounter {
+
+        private Long value = null;
+
+        FixedValueHLLCMockup(long value) {
+            this.value = value;
+        }
+
+        public void set(long value) {
+            if (this.value == null) {
+                this.value = value;
+            } else {
+                long oldValue = Math.abs(this.value.longValue());
+                long take = Math.max(oldValue, value);
+                logger.warn("Error to aggregate holistic count distinct, old value " + oldValue + ", new value " + value + ", taking " + take);
+                this.value = -take; // make it obvious that this value is wrong
+            }
+        }
+
+        @Override
+        public void clear() {
+            this.value = null;
+        }
+
+        @Override
+        protected void add(long hash) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void merge(HyperLogLogPlusCounter another) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public long getCountEstimate() {
+            return value;
+        }
+
+        @Override
+        public void writeRegisters(ByteBuffer out) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public void readRegisters(ByteBuffer in) throws IOException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public int hashCode() {
+            final int prime = 31;
+            int result = super.hashCode();
+            result = prime * result + (int) (value ^ (value >>> 32));
+            return result;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (!super.equals(obj))
+                return false;
+            if (getClass() != obj.getClass())
+                return false;
+            FixedValueHLLCMockup other = (FixedValueHLLCMockup) obj;
+            if (!value.equals(other.value))
+                return false;
+            return true;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index d6c5a6f..77319be 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -167,4 +167,14 @@ public class TopNMeasureType extends MeasureType {
             return null;
     }
 
+    @Override
+    public boolean needRewrite() {
+        return false;
+    }
+
+    @Override
+    public Class<?> getRewriteAggregationFunctionClass() {
+        return null;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index 0c36873..17debda 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -21,6 +21,7 @@ package org.apache.kylin.metadata.model;
 import java.util.ArrayList;
 import java.util.Collection;
 
+import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.datatype.DataType;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -51,6 +52,7 @@ public class FunctionDesc {
     private String returnType;
 
     private DataType returnDataType;
+    private MeasureType measureType;
     private boolean isDimensionAsMetric = false;
 
     public void init(TableDesc factTable) {
@@ -77,6 +79,23 @@ public class FunctionDesc {
             setReturnType(colRefs.get(0).getDatatype());
         }
     }
+    
+    public MeasureType getMeasureType() {
+        if (isDimensionAsMetric)
+            return null;
+        
+        if (measureType == null) {
+            measureType = MeasureType.create(getExpression(), getReturnType());
+        }
+        return measureType;
+    }
+
+    public boolean needRewrite() {
+        if (isDimensionAsMetric)
+            return false;
+        
+        return getMeasureType().needRewrite();
+    }
 
     public String getRewriteFieldName() {
         if (isSum()) {
@@ -88,14 +107,19 @@ public class FunctionDesc {
         }
     }
 
-    public boolean needRewrite() {
-        return !isSum() && !isDimensionAsMetric() && !isTopN();
+    public DataType getRewriteFieldType() {
+        if (isCountDistinct() || isTopN())
+            return DataType.ANY;
+        else if (isSum() || isMax() || isMin())
+            return parameter.getColRefs().get(0).getType();
+        else
+            return returnDataType;
     }
 
     public ColumnDesc newFakeRewriteColumn(TableDesc sourceTable) {
         ColumnDesc fakeCol = new ColumnDesc();
         fakeCol.setName(getRewriteFieldName());
-        fakeCol.setDatatype(getSQLType().toString());
+        fakeCol.setDatatype(getRewriteFieldType().toString());
         if (isCount())
             fakeCol.setNullable(false);
         fakeCol.init(sourceTable);
@@ -179,15 +203,6 @@ public class FunctionDesc {
         return count;
     }
     
-    public DataType getSQLType() {
-        if (isCountDistinct() || isTopN())
-            return DataType.ANY;
-        else if (isSum() || isMax() || isMin())
-            return parameter.getColRefs().get(0).getType();
-        else
-            return returnDataType;
-    }
-
     public String getReturnType() {
         return returnType;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index b4682dd..bb0c073 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -122,7 +122,7 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
         dictMeasures = Lists.newArrayList();
         for (int i = 0; i < measureDescs.size(); i++) {
             MeasureDesc measureDesc = measureDescs.get(i);
-            MeasureType measureType = MeasureType.create(measureDesc.getFunction());
+            MeasureType measureType = measureDesc.getFunction().getMeasureType();
             if (measureType.getColumnsNeedDictionary(measureDesc).isEmpty() == false) {
                 dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester()));
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 4fc7236..401237b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -124,7 +124,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         dictMeasures = Lists.newArrayList();
         for (int i = 0; i < measureDescs.size(); i++) {
             MeasureDesc measureDesc = measureDescs.get(i);
-            MeasureType measureType = MeasureType.create(measureDesc.getFunction());
+            MeasureType measureType = measureDesc.getFunction().getMeasureType();
             if (measureType.getColumnsNeedDictionary(measureDesc).isEmpty() == false) {
                 dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester()));
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index cbc0c56..9aa70ca 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -59,7 +59,6 @@ import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.query.sqlfunc.HLLDistinctCountAggFunc;
 
 import com.google.common.base.Preconditions;
 
@@ -301,10 +300,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         // rebuild function
         RelDataType fieldType = aggCall.getType();
         SqlAggFunction newAgg = aggCall.getAggregation();
-        if (func.isCountDistinct()) {
-            newAgg = createHyperLogLogAggFunction(fieldType);
-        } else if (func.isCount()) {
+        if (func.isCount()) {
             newAgg = SqlStdOperatorTable.SUM0;
+        } else if (func.getMeasureType().getRewriteAggregationFunctionClass() != null) {
+            newAgg = createCustomAggFunction(fieldType, func.getMeasureType().getRewriteAggregationFunctionClass());
         }
 
         // rebuild aggregate call
@@ -312,10 +311,10 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         return newAggCall;
     }
 
-    private SqlAggFunction createHyperLogLogAggFunction(RelDataType returnType) {
+    private SqlAggFunction createCustomAggFunction(RelDataType returnType, Class<?> customAggFuncClz) {
         RelDataTypeFactory typeFactory = getCluster().getTypeFactory();
-        SqlIdentifier sqlIdentifier = new SqlIdentifier("HLL_COUNT", new SqlParserPos(1, 1));
-        AggregateFunction aggFunction = AggregateFunctionImpl.create(HLLDistinctCountAggFunc.class);
+        SqlIdentifier sqlIdentifier = new SqlIdentifier(customAggFuncClz.getSimpleName(), new SqlParserPos(1, 1));
+        AggregateFunction aggFunction = AggregateFunctionImpl.create(customAggFuncClz);
         List<RelDataType> argTypes = new ArrayList<RelDataType>();
         List<SqlTypeFamily> typeFamilies = new ArrayList<SqlTypeFamily>();
         for (FunctionParameter o : aggFunction.getParameters()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
index 94a91bf..a8789ea 100644
--- a/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
+++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPTable.java
@@ -184,11 +184,11 @@ public class OLAPTable extends AbstractQueryableTable implements TranslatableTab
         HashSet<ColumnDesc> updateColumns = Sets.newHashSet();
         for (MeasureDesc m : mgr.listEffectiveMeasures(olapSchema.getProjectName(), sourceTable.getIdentity())) {
             if (m.getFunction().isSum()) {
-                FunctionDesc functionDesc = m.getFunction();
-                if (functionDesc.getReturnDataType() != functionDesc.getSQLType() && //
-                        functionDesc.getReturnDataType().isBigInt() && //
-                        functionDesc.getSQLType().isIntegerFamily()) {
-                    updateColumns.add(functionDesc.getParameter().getColRefs().get(0).getColumnDesc());
+                FunctionDesc func = m.getFunction();
+                if (func.getReturnDataType() != func.getRewriteFieldType() && //
+                        func.getReturnDataType().isBigInt() && //
+                        func.getRewriteFieldType().isIntegerFamily()) {
+                    updateColumns.add(func.getParameter().getColRefs().get(0).getColumnDesc());
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/63f376a5/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java b/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
deleted file mode 100644
index 7881c42..0000000
--- a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.query.sqlfunc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang
- */
-public class HLLDistinctCountAggFunc {
-
-    private static final Logger logger = LoggerFactory.getLogger(HLLDistinctCountAggFunc.class);
-
-    public static HyperLogLogPlusCounter init() {
-        return null;
-    }
-
-    public static HyperLogLogPlusCounter initAdd(Object v) {
-        if (v instanceof Long) { // holistic case
-            long l = (Long) v;
-            return new FixedValueHLLCMockup(l);
-        } else {
-            HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
-            return new HyperLogLogPlusCounter(c);
-        }
-    }
-
-    public static HyperLogLogPlusCounter add(HyperLogLogPlusCounter counter, Object v) {
-        if (v instanceof Long) { // holistic case
-            long l = (Long) v;
-            if (counter == null) {
-                return new FixedValueHLLCMockup(l);
-            } else {
-                if (!(counter instanceof FixedValueHLLCMockup))
-                    throw new IllegalStateException("counter is not FixedValueHLLCMockup");
-
-                ((FixedValueHLLCMockup) counter).set(l);
-                return counter;
-            }
-        } else {
-            HyperLogLogPlusCounter c = (HyperLogLogPlusCounter) v;
-            if (counter == null) {
-                return new HyperLogLogPlusCounter(c);
-            } else {
-                counter.merge(c);
-                return counter;
-            }
-        }
-    }
-
-    public static HyperLogLogPlusCounter merge(HyperLogLogPlusCounter counter0, Object counter1) {
-        return add(counter0, counter1);
-    }
-
-    public static long result(HyperLogLogPlusCounter counter) {
-        return counter == null ? 0L : counter.getCountEstimate();
-    }
-
-    private static class FixedValueHLLCMockup extends HyperLogLogPlusCounter {
-
-        private Long value = null;
-
-        FixedValueHLLCMockup(long value) {
-            this.value = value;
-        }
-
-        public void set(long value) {
-            if (this.value == null) {
-                this.value = value;
-            } else {
-                long oldValue = Math.abs(this.value.longValue());
-                long take = Math.max(oldValue, value);
-                logger.warn("Error to aggregate holistic count distinct, old value " + oldValue + ", new value " + value + ", taking " + take);
-                this.value = -take; // make it obvious that this value is wrong
-            }
-        }
-
-        @Override
-        public void clear() {
-            this.value = null;
-        }
-
-        @Override
-        protected void add(long hash) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void merge(HyperLogLogPlusCounter another) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public long getCountEstimate() {
-            return value;
-        }
-
-        @Override
-        public void writeRegisters(ByteBuffer out) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public void readRegisters(ByteBuffer in) throws IOException {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
-        public int hashCode() {
-            final int prime = 31;
-            int result = super.hashCode();
-            result = prime * result + (int) (value ^ (value >>> 32));
-            return result;
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            if (this == obj)
-                return true;
-            if (!super.equals(obj))
-                return false;
-            if (getClass() != obj.getClass())
-                return false;
-            FixedValueHLLCMockup other = (FixedValueHLLCMockup) obj;
-            if (!value.equals(other.value))
-                return false;
-            return true;
-        }
-    }
-
-}