You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/10/16 15:48:20 UTC

[2/2] kylin git commit: KYLIN-2088 Refactor MeasureType to allow mutliple UDAF defined on a measure type

KYLIN-2088 Refactor MeasureType to allow mutliple UDAF defined on a measure type


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0018a212
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0018a212
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0018a212

Branch: refs/heads/master
Commit: 0018a212470bfe1e8edd3603d383da80f2ffd322
Parents: b4c970a
Author: Yang Li <li...@apache.org>
Authored: Sun Oct 16 23:44:49 2016 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Oct 16 23:44:49 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  1 -
 .../org/apache/kylin/measure/MeasureType.java   | 16 +++--
 .../kylin/measure/MeasureTypeFactory.java       | 46 +++++++++++---
 .../kylin/measure/basic/BasicMeasureType.java   |  6 --
 .../kylin/measure/bitmap/BitmapMeasureType.java | 43 ++++++++-----
 .../dim/DimCountDistinctMeasureType.java        | 10 ++-
 .../ExtendedColumnMeasureType.java              |  6 +-
 .../kylin/measure/hllc/HLLCMeasureType.java     | 10 ++-
 .../kylin/measure/raw/RawMeasureType.java       |  6 +-
 .../kylin/measure/topn/TopNMeasureType.java     |  5 --
 .../kylin/metadata/realization/SQLDigest.java   | 43 ++++++++-----
 .../kylin/engine/mr/steps/CubeReducerTest.java  |  5 +-
 .../kylin/storage/hbase/ITStorageTest.java      |  3 +-
 .../kylin/query/relnode/ColumnRowType.java      |  7 +++
 .../kylin/query/relnode/OLAPAggregateRel.java   | 66 +++++++++++---------
 .../apache/kylin/query/relnode/OLAPContext.java | 15 +++--
 .../kylin/query/schema/OLAPSchemaFactory.java   | 13 +++-
 .../cube/MeasureTypeOnlyAggrInBaseTest.java     |  5 --
 18 files changed, 187 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7dacd06..4942081 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -766,7 +766,6 @@ abstract public class KylinConfigBase implements Serializable {
 
     public Map<String, String> getUDFs() {
         Map<String, String> udfMap = getPropertiesByPrefix("kylin.query.udf.");
-        udfMap.put("intersect_count", "org.apache.kylin.measure.bitmap.BitmapIntersectDistinctCountAggFunc");
         return udfMap;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index e7312f2..de1b442 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -102,22 +102,20 @@ abstract public class MeasureType<T> {
      * Query Rewrite
      * ---------------------------------------------------------------------------- */
 
-    // TODO support user defined Calcite aggr function
-
     /** Whether or not Calcite rel-tree needs rewrite to do last around of aggregation */
     abstract public boolean needRewrite();
 
-    /** Does the rewrite involves an extra field for the pre-calculated */
+    /** Does the rewrite involves an extra field to hold the pre-calculated */
     public boolean needRewriteField() {
         return true;
     }
 
-    /** Returns a Calcite aggregation function implementation class */
-    abstract public Class<?> getRewriteCalciteAggrFunctionClass();
-
-    /** Some measure may return different class depends on call name, eg. BitmapMeasureType */
-    public Class<?> getRewriteCalciteAggrFunctionClass(String callName) {
-        return getRewriteCalciteAggrFunctionClass();
+    /**
+     * Returns a map from UDAF to Calcite aggregation function implementation class.
+     * There can be zero or more UDAF defined on a measure type.
+     */
+    public Map<String, Class<?>> getRewriteCalciteAggrFunctions() {
+        return null;
     }
 
     /* ============================================================================

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
index 17d841a..c5bd482 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
@@ -32,6 +32,7 @@ import org.apache.kylin.measure.raw.RawMeasureType;
 import org.apache.kylin.measure.topn.TopNMeasureType;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.FunctionDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,8 +87,10 @@ abstract public class MeasureTypeFactory<T> {
 
     // ============================================================================
 
-    private static Map<String, List<MeasureTypeFactory<?>>> factories = Maps.newHashMap();
-    private static List<MeasureTypeFactory<?>> defaultFactory = Lists.newArrayListWithCapacity(2);
+    final private static Map<String, List<MeasureTypeFactory<?>>> factories = Maps.newHashMap();
+    final private static Map<String, Class<?>> udafMap = Maps.newHashMap(); // a map from UDAF to Calcite aggregation function implementation class
+    final private static Map<String, MeasureTypeFactory> udafFactories = Maps.newHashMap(); // a map from UDAF to its owner factory
+    final private static List<MeasureTypeFactory<?>> defaultFactory = Lists.newArrayListWithCapacity(2);
 
     static {
         init();
@@ -110,7 +113,8 @@ abstract public class MeasureTypeFactory<T> {
         logger.info("Checking custom measure types from kylin config");
 
         try {
-            for (String customFactory : KylinConfig.getInstanceFromEnv().getCubeCustomMeasureTypes().values()) {
+            Map<String, String> customMeasureTypes = KylinConfig.getInstanceFromEnv().getCubeCustomMeasureTypes();
+            for (String customFactory : customMeasureTypes.values()) {
                 try {
                     logger.info("Checking custom measure types from kylin config: " + customFactory);
                     factoryInsts.add((MeasureTypeFactory<?>) Class.forName(customFactory).newInstance());
@@ -132,9 +136,10 @@ abstract public class MeasureTypeFactory<T> {
                 throw new IllegalArgumentException("Aggregation data type name '" + dataTypeName + "' must be in lower case");
             Class<? extends DataTypeSerializer<?>> serializer = factory.getAggrDataTypeSerializer();
 
-            logger.info("registering " + dataTypeName);
+            logger.info("registering " + funcName + "(" + dataTypeName + "), " + factory.getClass());
             DataType.register(dataTypeName);
             DataTypeSerializer.register(dataTypeName, serializer);
+            registerUDAF(factory);
             List<MeasureTypeFactory<?>> list = factories.get(funcName);
             if (list == null)
                 factories.put(funcName, list = Lists.newArrayListWithCapacity(2));
@@ -144,13 +149,40 @@ abstract public class MeasureTypeFactory<T> {
         defaultFactory.add(new BasicMeasureType.Factory());
     }
 
+    private static void registerUDAF(MeasureTypeFactory<?> factory) {
+        MeasureType<?> type = factory.createMeasureType(factory.getAggrFunctionName(), DataType.getType(factory.getAggrDataTypeName()));
+        Map<String, Class<?>> udafs = type.getRewriteCalciteAggrFunctions();
+        if (type.needRewrite() == false || udafs == null)
+            return;
+
+        for (String udaf : udafs.keySet()) {
+            udaf = udaf.toUpperCase();
+            if (udaf.equals(FunctionDesc.FUNC_COUNT_DISTINCT))
+                continue; // skip built-in function
+
+            if (udafFactories.containsKey(udaf))
+                throw new IllegalStateException("UDAF '" + udaf + "' was dup declared by " + udafFactories.get(udaf) + " and " + factory);
+
+            udafFactories.put(udaf, factory);
+            udafMap.put(udaf, udafs.get(udaf));
+        }
+    }
+
+    public static Map<String, Class<?>> getUDAFs() {
+        return udafMap;
+    }
+
+    public static Map<String, MeasureTypeFactory> getUDAFFactories() {
+        return udafFactories;
+    }
+
     public static MeasureType<?> create(String funcName, String dataType) {
         return create(funcName, DataType.getType(dataType));
     }
 
     public static MeasureType<?> createNoRewriteFieldsMeasureType(String funcName, DataType dataType) {
         // currently only has DimCountDistinctAgg
-        if (funcName.equalsIgnoreCase("COUNT_DISTINCT")) {
+        if (funcName.equalsIgnoreCase(FunctionDesc.FUNC_COUNT_DISTINCT)) {
             return new DimCountDistinctMeasureType.DimCountDistinctMeasureTypeFactory().createMeasureType(funcName, dataType);
         }
 
@@ -213,8 +245,8 @@ abstract public class MeasureTypeFactory<T> {
         }
 
         @Override
-        public Class getRewriteCalciteAggrFunctionClass() {
-            throw new UnsupportedOperationException();
+        public Map<String, Class<?>> getRewriteCalciteAggrFunctions() {
+            return null;
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
index 4ab4584..ed493a1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/basic/BasicMeasureType.java
@@ -143,10 +143,4 @@ public class BasicMeasureType extends MeasureType {
     public boolean needRewrite() {
         return !isSum();
     }
-
-    @Override
-    public Class<?> getRewriteCalciteAggrFunctionClass() {
-        return null;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
index 2b88e21..8e2b2f7 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
@@ -33,12 +33,16 @@ import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.realization.SQLDigest.SQLCall;
+
+import com.google.common.collect.ImmutableMap;
 
 /**
  * Created by sunyerui on 15/12/10.
  */
 public class BitmapMeasureType extends MeasureType<BitmapCounter> {
-    public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
+    public static final String FUNC_COUNT_DISTINCT = FunctionDesc.FUNC_COUNT_DISTINCT;
     public static final String FUNC_INTERSECT_COUNT_DISTINCT = "INTERSECT_COUNT";
     public static final String DATATYPE_BITMAP = "bitmap";
 
@@ -151,30 +155,37 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> {
         }
     }
 
-    @Override
-    public boolean needRewrite() {
+    // In order to keep compatibility with old version, tinyint/smallint/int column use value directly, without dictionary
+    private boolean needDictionaryColumn(FunctionDesc functionDesc) {
+        DataType dataType = functionDesc.getParameter().getColRefs().get(0).getType();
+        if (dataType.isIntegerFamily() && !dataType.isBigInt()) {
+            return false;
+        }
         return true;
     }
 
     @Override
-    public Class<?> getRewriteCalciteAggrFunctionClass() {
-        return BitmapDistinctCountAggFunc.class;
+    public boolean needRewrite() {
+        return true;
     }
 
+    static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.<String, Class<?>> of(//
+            FUNC_COUNT_DISTINCT, BitmapDistinctCountAggFunc.class, //
+            FUNC_INTERSECT_COUNT_DISTINCT, BitmapIntersectDistinctCountAggFunc.class);
+
     @Override
-    public Class<?> getRewriteCalciteAggrFunctionClass(String callName) {
-        if (callName != null && callName.equalsIgnoreCase(FUNC_INTERSECT_COUNT_DISTINCT)) {
-            return BitmapIntersectDistinctCountAggFunc.class;
-        }
-        return BitmapDistinctCountAggFunc.class;
+    public Map<String, Class<?>> getRewriteCalciteAggrFunctions() {
+        return UDAF_MAP;
     }
 
-    // In order to keep compatibility with old version, tinyint/smallint/int column use value directly, without dictionary
-    private boolean needDictionaryColumn(FunctionDesc functionDesc) {
-        DataType dataType = functionDesc.getParameter().getColRefs().get(0).getType();
-        if (dataType.isIntegerFamily() && !dataType.isBigInt()) {
-            return false;
+    @Override
+    public void adjustSqlDigest(List<MeasureDesc> measureDescs, SQLDigest sqlDigest) {
+        for (SQLCall call : sqlDigest.aggrSqlCalls) {
+            if (FUNC_INTERSECT_COUNT_DISTINCT.equals(call.function)) {
+                TblColRef col = (TblColRef) call.args.get(1);
+                if (!sqlDigest.groupbyColumns.contains(col))
+                    sqlDigest.groupbyColumns.add(col);
+            }
         }
-        return true;
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java
index 9fe1075..0b3fd94 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java
@@ -19,15 +19,19 @@
 package org.apache.kylin.measure.dim;
 
 import java.util.List;
+import java.util.Map;
 
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.MeasureTypeFactory;
 import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.realization.SQLDigest;
 
+import com.google.common.collect.ImmutableMap;
+
 /**
  * Created by dongli on 4/20/16.
  */
@@ -76,9 +80,11 @@ public class DimCountDistinctMeasureType extends MeasureType<Object> {
         return false;
     }
 
+    static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.<String, Class<?>> of(FunctionDesc.FUNC_COUNT_DISTINCT, DimCountDistinctAggFunc.class);
+
     @Override
-    public Class<?> getRewriteCalciteAggrFunctionClass() {
-        return DimCountDistinctAggFunc.class;
+    public Map<String, Class<?>> getRewriteCalciteAggrFunctions() {
+        return UDAF_MAP;
     }
 
     public void adjustSqlDigest(List<MeasureDesc> measureDescs, SQLDigest sqlDigest) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
index 796f1f7..c8f01ad 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/extendedcolumn/ExtendedColumnMeasureType.java
@@ -227,6 +227,7 @@ public class ExtendedColumnMeasureType extends MeasureType<ByteArray> {
         };
     }
 
+    @SuppressWarnings("serial")
     @Override
     public MeasureAggregator<ByteArray> newAggregator() {
         return new MeasureAggregator<ByteArray>() {
@@ -268,9 +269,4 @@ public class ExtendedColumnMeasureType extends MeasureType<ByteArray> {
     public boolean needRewrite() {
         return false;
     }
-
-    @Override
-    public Class<?> getRewriteCalciteAggrFunctionClass() {
-        return null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
index bd5013e..0e58dca 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/hllc/HLLCMeasureType.java
@@ -31,9 +31,11 @@ import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
+import com.google.common.collect.ImmutableMap;
+
 public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounter> {
 
-    public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
+    public static final String FUNC_COUNT_DISTINCT = FunctionDesc.FUNC_COUNT_DISTINCT;
     public static final String DATATYPE_HLLC = "hllc";
 
     public static class Factory extends MeasureTypeFactory<HyperLogLogPlusCounter> {
@@ -116,9 +118,11 @@ public class HLLCMeasureType extends MeasureType<HyperLogLogPlusCounter> {
         return true;
     }
 
+    static final Map<String, Class<?>> UDAF_MAP = ImmutableMap.<String, Class<?>> of(FUNC_COUNT_DISTINCT, HLLDistinctCountAggFunc.class);
+    
     @Override
-    public Class<?> getRewriteCalciteAggrFunctionClass() {
-        return HLLDistinctCountAggFunc.class;
+    public Map<String, Class<?>> getRewriteCalciteAggrFunctions() {
+        return UDAF_MAP;
     }
 
     public static boolean isCountDistinct(FunctionDesc func) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java
index 50715ec..3a49d31 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/raw/RawMeasureType.java
@@ -75,6 +75,7 @@ public class RawMeasureType extends MeasureType<List<ByteArray>> {
         }
     }
 
+    @SuppressWarnings("unused")
     private final DataType dataType;
 
     public RawMeasureType(String funcName, DataType dataType) {
@@ -191,11 +192,6 @@ public class RawMeasureType extends MeasureType<List<ByteArray>> {
     }
 
     @Override
-    public Class<?> getRewriteCalciteAggrFunctionClass() {
-        return null;
-    }
-
-    @Override
     public void adjustSqlDigest(List<MeasureDesc> measureDescs, SQLDigest sqlDigest) {
 
         if (sqlDigest.isRawQuery) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
index 800ca88..33ab314 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNMeasureType.java
@@ -288,11 +288,6 @@ public class TopNMeasureType extends MeasureType<TopNCounter<ByteArray>> {
     }
 
     @Override
-    public Class<?> getRewriteCalciteAggrFunctionClass() {
-        return null;
-    }
-
-    @Override
     public void adjustSqlDigest(List<MeasureDesc> measureDescs, SQLDigest sqlDigest) {
         for (MeasureDesc measureDesc : measureDescs) {
             FunctionDesc topnFunc = measureDesc.getFunction();

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
index d2bba66..4887abb 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/realization/SQLDigest.java
@@ -18,7 +18,8 @@
 
 package org.apache.kylin.metadata.realization;
 
-import java.util.Collection;
+import java.util.List;
+import java.util.Set;
 
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -26,6 +27,8 @@ import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
+import com.google.common.collect.ImmutableList;
+
 /**
  */
 public class SQLDigest {
@@ -34,29 +37,41 @@ public class SQLDigest {
         ASCENDING, DESCENDING
     }
 
+    public static class SQLCall {
+        public final String function;
+        public final List<Object> args;
+
+        public SQLCall(String function, Iterable<Object> args) {
+            this.function = function;
+            this.args = ImmutableList.copyOf(args);
+        }
+    }
+
     public String factTable;
     public TupleFilter filter;
-    public Collection<JoinDesc> joinDescs;
-    public Collection<TblColRef> allColumns;
-    public Collection<TblColRef> groupbyColumns;
-    public Collection<TblColRef> filterColumns;
-    public Collection<TblColRef> metricColumns;
-    public Collection<FunctionDesc> aggregations;
-    public Collection<MeasureDesc> sortMeasures;
-    public Collection<OrderEnum> sortOrders;
+    public List<JoinDesc> joinDescs;
+    public Set<TblColRef> allColumns;
+    public List<TblColRef> groupbyColumns;
+    public Set<TblColRef> filterColumns;
+    public Set<TblColRef> metricColumns;
+    public List<FunctionDesc> aggregations; // storage level measure type, on top of which various sql aggr function may apply
+    public List<SQLCall> aggrSqlCalls; // sql level aggregation function call
+    public List<MeasureDesc> sortMeasures;
+    public List<OrderEnum> sortOrders;
     public boolean isRawQuery;
 
-    //initialized when org.apache.kylin.query.routing.QueryRouter.selectRealization()
-    public SQLDigest(String factTable, TupleFilter filter, Collection<JoinDesc> joinDescs, Collection<TblColRef> allColumns, //
-            Collection<TblColRef> groupbyColumns, Collection<TblColRef> filterColumns, Collection<TblColRef> aggregatedColumns, Collection<FunctionDesc> aggregateFunnc, Collection<MeasureDesc> sortMeasures, Collection<OrderEnum> sortOrders) {
+    public SQLDigest(String factTable, TupleFilter filter, List<JoinDesc> joinDescs, Set<TblColRef> allColumns, //
+            List<TblColRef> groupbyColumns, Set<TblColRef> filterColumns, Set<TblColRef> metricColumns, //
+            List<FunctionDesc> aggregations, List<SQLCall> aggrSqlCalls, List<MeasureDesc> sortMeasures, List<OrderEnum> sortOrders) {
         this.factTable = factTable;
         this.filter = filter;
         this.joinDescs = joinDescs;
         this.allColumns = allColumns;
         this.groupbyColumns = groupbyColumns;
         this.filterColumns = filterColumns;
-        this.metricColumns = aggregatedColumns;
-        this.aggregations = aggregateFunnc;
+        this.metricColumns = metricColumns;
+        this.aggregations = aggregations;
+        this.aggrSqlCalls = aggrSqlCalls;
         this.sortMeasures = sortMeasures;
         this.sortOrders = sortOrders;
         this.isRawQuery = isRawQuery();

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
index 5e687a5..97dd750 100644
--- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
+++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/CubeReducerTest.java
@@ -27,6 +27,7 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.io.Text;
@@ -197,8 +198,8 @@ public class CubeReducerTest extends LocalFileMetadataTestCase {
         }
 
         @Override
-        public Class<?> getRewriteCalciteAggrFunctionClass() {
-            return origMeasureType.getRewriteCalciteAggrFunctionClass();
+        public Map<String, Class<?>> getRewriteCalciteAggrFunctions() {
+            return origMeasureType.getRewriteCalciteAggrFunctions();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
index 136342d..4121c02 100644
--- a/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/storage/hbase/ITStorageTest.java
@@ -33,6 +33,7 @@ import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.realization.SQLDigest.SQLCall;
 import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.storage.IStorageQuery;
@@ -144,7 +145,7 @@ public class ITStorageTest extends HBaseMetadataTestCase {
         int count = 0;
         ITupleIterator iterator = null;
         try {
-            SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations, new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>());
+            SQLDigest sqlDigest = new SQLDigest("default.test_kylin_fact", filter, null, Collections.<TblColRef> emptySet(), groups, Collections.<TblColRef> emptySet(), Collections.<TblColRef> emptySet(), aggregations, Collections.<SQLCall> emptyList(), new ArrayList<MeasureDesc>(), new ArrayList<SQLDigest.OrderEnum>());
             iterator = storageEngine.search(context, sqlDigest, StorageMockUtils.newTupleInfo(groups, aggregations));
             while (iterator.hasNext()) {
                 ITuple tuple = iterator.next();

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java b/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java
index 095b5e2..f2894e7 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/ColumnRowType.java
@@ -49,6 +49,13 @@ public class ColumnRowType {
         return columns.get(index);
     }
 
+    public TblColRef getColumnByIndexNullable(int index) {
+        if (index < 0 || index >= columns.size())
+            return null;
+        else
+            return columns.get(index);
+    }
+
     public int getIndexByName(String columnName) {
         for (int i = 0; i < columns.size(); i++) {
             if (columns.get(i).getName().equals(columnName)) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 8ecb808..9c4f287 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -55,14 +55,14 @@ import org.apache.calcite.sql.type.SqlTypeFamily;
 import org.apache.calcite.sql.validate.SqlUserDefinedAggFunction;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.calcite.util.Util;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.measure.bitmap.BitmapMeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest.SQLCall;
 import org.apache.kylin.query.schema.OLAPTable;
 
 import com.google.common.base.Preconditions;
@@ -80,23 +80,28 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         AGGR_FUNC_MAP.put("$SUM0", "SUM");
         AGGR_FUNC_MAP.put("COUNT", "COUNT");
         AGGR_FUNC_MAP.put("COUNT_DISTINCT", "COUNT_DISTINCT");
-        AGGR_FUNC_MAP.put(BitmapMeasureType.FUNC_INTERSECT_COUNT_DISTINCT, "COUNT_DISTINCT");
         AGGR_FUNC_MAP.put("MAX", "MAX");
         AGGR_FUNC_MAP.put("MIN", "MIN");
 
-        for (String customAggrFunc : KylinConfig.getInstanceFromEnv().getCubeCustomMeasureTypes().keySet()) {
-            AGGR_FUNC_MAP.put(customAggrFunc.trim().toUpperCase(), customAggrFunc.trim().toUpperCase());
+        Map<String, MeasureTypeFactory> udafFactories = MeasureTypeFactory.getUDAFFactories();
+        for (String udaf : udafFactories.keySet()) {
+            AGGR_FUNC_MAP.put(udaf, udafFactories.get(udaf).getAggrFunctionName());
         }
     }
 
-    private static String getFuncName(AggregateCall aggCall) {
-        String aggName = aggCall.getAggregation().getName();
+    private static String getSqlFuncName(AggregateCall aggCall) {
+        String sqlName = aggCall.getAggregation().getName();
         if (aggCall.isDistinct()) {
-            aggName = aggName + "_DISTINCT";
+            sqlName = sqlName + "_DISTINCT";
         }
-        String funcName = AGGR_FUNC_MAP.get(aggName);
+        return sqlName;
+    }
+
+    private static String getAggrFuncName(AggregateCall aggCall) {
+        String sqlName = getSqlFuncName(aggCall);
+        String funcName = AGGR_FUNC_MAP.get(sqlName);
         if (funcName == null) {
-            throw new IllegalStateException("Don't suppoprt aggregation " + aggName);
+            throw new IllegalStateException("Don't suppoprt aggregation " + sqlName);
         }
         return funcName;
     }
@@ -151,7 +156,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
 
         // only translate the innermost aggregation
         if (!this.afterAggregate) {
-            translateGroupBy();
+            this.context.groupByColumns.addAll(this.groups);
             this.context.aggregations.addAll(this.aggregations);
             this.context.afterAggregate = true;
         } else {
@@ -226,15 +231,6 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
             Set<TblColRef> columns = inputColumnRowType.getSourceColumnsByIndex(i);
             this.groups.addAll(columns);
         }
-        // Some UDAF may group data by itself, add group key into groups, prevents aggregate at cube storage server side
-        for (AggregateCall aggCall : this.rewriteAggCalls) {
-            String aggregateName = aggCall.getAggregation().getName();
-            if (aggregateName.equalsIgnoreCase(BitmapMeasureType.FUNC_INTERSECT_COUNT_DISTINCT)) {
-                int index = aggCall.getArgList().get(1);
-                TblColRef column = inputColumnRowType.getColumnByIndex(index);
-                groups.add(column);
-            }
-        }
     }
 
     private void buildAggregations() {
@@ -254,17 +250,13 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
                 }
             }
             FunctionDesc aggFunc = new FunctionDesc();
-            String funcName = getFuncName(aggCall);
+            String funcName = getAggrFuncName(aggCall);
             aggFunc.setExpression(funcName);
             aggFunc.setParameter(parameter);
             this.aggregations.add(aggFunc);
         }
     }
 
-    private void translateGroupBy() {
-        context.groupByColumns.addAll(this.groups);
-    }
-
     @Override
     public void implementRewrite(RewriteImplementor implementor) {
         // only rewrite the innermost aggregation
@@ -286,6 +278,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
                     aggCall = rewriteAggregateCall(aggCall, cubeFunc);
                 }
                 this.rewriteAggCalls.add(aggCall);
+                this.context.aggrSqlCalls.add(toSqlCall(aggCall));
             }
         }
 
@@ -295,6 +288,18 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
 
     }
 
+    private SQLCall toSqlCall(AggregateCall aggCall) {
+        ColumnRowType inputColumnRowType = ((OLAPRel) getInput()).getColumnRowType();
+
+        String function = getSqlFuncName(aggCall);
+        List<Object> args = Lists.newArrayList();
+        for (Integer index : aggCall.getArgList()) {
+            TblColRef col = inputColumnRowType.getColumnByIndexNullable(index);
+            args.add(col);
+        }
+        return new SQLCall(function, args);
+    }
+
     private void translateAggregation() {
         // now the realization is known, replace aggregations with what's defined on MeasureDesc
         List<MeasureDesc> measures = this.context.realization.getMeasures();
@@ -371,8 +376,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
 
     private AggregateCall rewriteAggregateCall(AggregateCall aggCall, FunctionDesc func) {
 
-        //if it's not a cube, then the "needRewriteField func" should not resort to any rewrite fields, 
-        // which do not exist at all
+        // if it's not a cube, then the "needRewriteField func" should not resort to any rewrite fields, which do not exist at all
         if (noPrecaculatedFieldsAvailable() && func.needRewriteField()) {
             logger.info(func + "skip rewriteAggregateCall because no pre-aggregated field available");
             return aggCall;
@@ -391,16 +395,18 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         }
 
         // rebuild function
-        String callName = aggCall.getAggregation().getName();
+        String callName = getSqlFuncName(aggCall);
         RelDataType fieldType = aggCall.getType();
         SqlAggFunction newAgg = aggCall.getAggregation();
+        Map<String, Class<?>> udafMap = func.getMeasureType().getRewriteCalciteAggrFunctions();
         if (func.isCount()) {
             newAgg = SqlStdOperatorTable.SUM0;
-        } else if (func.getMeasureType().getRewriteCalciteAggrFunctionClass() != null) {
-            newAgg = createCustomAggFunction(callName, fieldType, func.getMeasureType().getRewriteCalciteAggrFunctionClass(callName));
+        } else if (udafMap != null && udafMap.containsKey(callName)) {
+            newAgg = createCustomAggFunction(callName, fieldType, udafMap.get(callName));
         }
 
         // rebuild aggregate call
+        @SuppressWarnings("deprecation")
         AggregateCall newAggCall = new AggregateCall(newAgg, false, newArgList, fieldType, callName);
         return newAggCall;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
index 41a3b4d..cdec665 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPContext.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rel.type.RelDataTypeField;
@@ -36,6 +37,7 @@ import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.IRealization;
 import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.realization.SQLDigest.SQLCall;
 import org.apache.kylin.metadata.tuple.TupleInfo;
 import org.apache.kylin.query.schema.OLAPSchema;
 import org.apache.kylin.storage.StorageContext;
@@ -116,11 +118,12 @@ public class OLAPContext {
     // cube metadata
     public IRealization realization;
 
-    public Collection<TblColRef> allColumns = new HashSet<TblColRef>();
-    public Collection<TblColRef> groupByColumns = new ArrayList<TblColRef>();
-    public Collection<TblColRef> metricsColumns = new HashSet<TblColRef>();
-    public List<FunctionDesc> aggregations = new ArrayList<FunctionDesc>();
-    public Collection<TblColRef> filterColumns = new HashSet<TblColRef>();
+    public Set<TblColRef> allColumns = new HashSet<TblColRef>();
+    public List<TblColRef> groupByColumns = new ArrayList<TblColRef>();
+    public Set<TblColRef> metricsColumns = new HashSet<TblColRef>();
+    public List<FunctionDesc> aggregations = new ArrayList<FunctionDesc>(); // storage level measure type, on top of which various sql aggr function may apply
+    public List<SQLCall> aggrSqlCalls = new ArrayList<SQLCall>(); // sql level aggregation function call
+    public Set<TblColRef> filterColumns = new HashSet<TblColRef>();
     public TupleFilter filter;
     public List<JoinDesc> joins = new LinkedList<JoinDesc>();
     private List<MeasureDesc> sortMeasures;
@@ -144,7 +147,7 @@ public class OLAPContext {
 
     public SQLDigest getSQLDigest() {
         if (sqlDigest == null)
-            sqlDigest = new SQLDigest(firstTableScan.getTableName(), filter, joins, allColumns, groupByColumns, filterColumns, metricsColumns, aggregations, sortMeasures, sortOrders);
+            sqlDigest = new SQLDigest(firstTableScan.getTableName(), filter, joins, allColumns, groupByColumns, filterColumns, metricsColumns, aggregations, aggrSqlCalls, sortMeasures, sortOrders);
         return sqlDigest;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
index e42d9be..93f06dd 100644
--- a/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
+++ b/query/src/main/java/org/apache/kylin/query/schema/OLAPSchemaFactory.java
@@ -25,6 +25,7 @@ import java.io.Writer;
 import java.nio.charset.Charset;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.calcite.schema.Schema;
@@ -34,6 +35,7 @@ import org.apache.calcite.util.ConversionUtil;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.measure.MeasureTypeFactory;
 import org.apache.kylin.metadata.model.DatabaseDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.project.ProjectInstance;
@@ -41,6 +43,8 @@ import org.apache.kylin.metadata.project.ProjectManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
+
 /**
  */
 public class OLAPSchemaFactory implements SchemaFactory {
@@ -138,9 +142,14 @@ public class OLAPSchemaFactory implements SchemaFactory {
     }
 
     private static void createOLAPSchemaFunctions(Writer out) throws IOException {
-        out.write("            \"functions\": [\n");
-        Map<String, String> udfs = KylinConfig.getInstanceFromEnv().getUDFs();
+        Map<String, String> udfs = Maps.newHashMap();
+        udfs.putAll(KylinConfig.getInstanceFromEnv().getUDFs());
+        for (Entry<String, Class<?>> entry : MeasureTypeFactory.getUDAFs().entrySet()) {
+            udfs.put(entry.getKey(), entry.getValue().getName());
+        }
+
         int index = 0;
+        out.write("            \"functions\": [\n");
         for (Map.Entry<String, String> udf : udfs.entrySet()) {
             String udfName = udf.getKey().trim().toUpperCase();
             String udfClassName = udf.getValue().trim();

http://git-wip-us.apache.org/repos/asf/kylin/blob/0018a212/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/MeasureTypeOnlyAggrInBaseTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/MeasureTypeOnlyAggrInBaseTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/MeasureTypeOnlyAggrInBaseTest.java
index 1353862..e07ea91 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/MeasureTypeOnlyAggrInBaseTest.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/MeasureTypeOnlyAggrInBaseTest.java
@@ -137,11 +137,6 @@ public class MeasureTypeOnlyAggrInBaseTest extends LocalFileMetadataTestCase {
         }
 
         @Override
-        public Class<?> getRewriteCalciteAggrFunctionClass() {
-            return null;
-        }
-
-        @Override
         public boolean onlyAggrInBaseCuboid() {
             return true;
         }