You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/04/20 10:34:35 UTC

[2/2] kylin git commit: KYLIN-1016 Count distinct on any dimension should work even not a predefined measure

KYLIN-1016 Count distinct on any dimension should work even not a predefined measure


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/155291fd
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/155291fd
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/155291fd

Branch: refs/heads/master
Commit: 155291fda3ec1c9d5ccf017861526e66c1ba364d
Parents: 3a0ba36
Author: lidongsjtu <li...@apache.org>
Authored: Wed Apr 20 16:28:58 2016 +0800
Committer: lidongsjtu <li...@apache.org>
Committed: Wed Apr 20 16:28:58 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  3 +
 .../org/apache/kylin/measure/MeasureType.java   |  6 +-
 .../kylin/measure/MeasureTypeFactory.java       | 10 +++
 .../measure/dim/DimCountDistinctAggFunc.java    | 86 ++++++++++++++++++++
 .../dim/DimCountDistinctMeasureType.java        | 68 ++++++++++++++++
 .../kylin/metadata/model/FunctionDesc.java      | 33 +++++++-
 .../apache/kylin/query/ITKylinQueryTest.java    |  2 +-
 .../src/test/resources/query/debug/query78.sql  | 22 -----
 .../src/test/resources/query/sql/query100.sql   | 31 +++++++
 .../src/test/resources/query/sql/query101.sql   | 21 +++++
 .../kylin/query/relnode/OLAPAggregateRel.java   | 26 +++---
 .../org/apache/kylin/query/relnode/OLAPRel.java |  3 +-
 .../aggregate/DimCountDistinctAggFuncTest.java  | 82 +++++++++++++++++++
 13 files changed, 352 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/155291fd/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index e00e446..931fbba 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -631,4 +631,7 @@ abstract public class KylinConfigBase implements Serializable {
         return getOptional("kylin.cube.mr.engine.v2.class", "org.apache.kylin.engine.mr.MRBatchCubingEngine2");
     }
 
+    public int getDimCountDistinctMaxCardinality() {
+        return Integer.parseInt(getOptional("kylin.query.dim.distinct.max", "5000000"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/155291fd/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
index 98aa752..4dea46b 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureType.java
@@ -106,7 +106,11 @@ abstract public class MeasureType<T> {
     
     /** Whether or not Calcite rel-tree needs rewrite to do last around of aggregation */
     abstract public boolean needRewrite();
-    
+
+    public boolean needRewriteField() {
+        return true;
+    }
+
     /** Returns a Calcite aggregation function implementation class */
     abstract public Class<?> getRewriteCalciteAggrFunctionClass();
     

http://git-wip-us.apache.org/repos/asf/kylin/blob/155291fd/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
index bd235d6..1d264e1 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/MeasureTypeFactory.java
@@ -23,6 +23,7 @@ import java.util.Map;
 
 import org.apache.kylin.measure.basic.BasicMeasureType;
 import org.apache.kylin.measure.bitmap.BitmapMeasureType;
+import org.apache.kylin.measure.dim.DimCountDistinctMeasureType;
 import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType;
 import org.apache.kylin.measure.hllc.HLLCMeasureType;
 import org.apache.kylin.measure.raw.RawMeasureType;
@@ -132,6 +133,15 @@ abstract public class MeasureTypeFactory<T> {
         return create(funcName, DataType.getType(dataType));
     }
 
+    public static MeasureType<?> createNoRewriteFieldsMeasureType(String funcName, DataType dataType) {
+        // currently only has DimCountDistinctAgg
+        if (funcName.equalsIgnoreCase("COUNT_DISTINCT")) {
+            return new DimCountDistinctMeasureType.DimCountDistinctMeasureTypeFactory().createMeasureType(funcName, dataType);
+        }
+
+        throw new UnsupportedOperationException("No measure type found.");
+    }
+
     public static MeasureType<?> create(String funcName, DataType dataType) {
         funcName = funcName.toUpperCase();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/155291fd/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctAggFunc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctAggFunc.java
new file mode 100644
index 0000000..825b1aa
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctAggFunc.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.measure.dim;
+
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+public class DimCountDistinctAggFunc {
+    private static final Logger logger = LoggerFactory.getLogger(DimCountDistinctAggFunc.class);
+
+    public static DimDistinctCounter init() {
+        return null;
+    }
+
+    public static DimDistinctCounter initAdd(Object v) {
+        DimDistinctCounter counter = new DimDistinctCounter();
+        counter.add(v);
+        return counter;
+    }
+
+    public static DimDistinctCounter add(DimDistinctCounter counter, Object v) {
+        if (counter == null) {
+            counter = new DimDistinctCounter();
+        }
+        counter.add(v);
+        return counter;
+    }
+
+    public static DimDistinctCounter merge(DimDistinctCounter counter0, DimDistinctCounter counter1) {
+        counter0.addAll(counter1);
+        return counter0;
+    }
+
+    public static long result(DimDistinctCounter counter) {
+        return counter == null ? 0L : counter.result();
+    }
+
+    public static class DimDistinctCounter {
+        private final Set container;
+        private final int MAX_LENGTH;
+
+        public DimDistinctCounter() {
+            container = Sets.newHashSet();
+            MAX_LENGTH = KylinConfig.getInstanceFromEnv().getDimCountDistinctMaxCardinality();
+        }
+
+        public void add(Object v) {
+            if (container.size() >= MAX_LENGTH) {
+                throw new RuntimeException("Cardinality of dimension exceeds the threshold: " + MAX_LENGTH);
+            }
+            container.add(v);
+        }
+
+        public void addAll(DimDistinctCounter counter) {
+            if (container.size() + counter.container.size() >= MAX_LENGTH) {
+                throw new RuntimeException("Cardinality of dimension exceeds the threshold: " + MAX_LENGTH);
+            }
+            container.addAll(counter.container);
+        }
+
+        public long result() {
+            return container.size();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/155291fd/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java
new file mode 100644
index 0000000..ae75b19
--- /dev/null
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/dim/DimCountDistinctMeasureType.java
@@ -0,0 +1,68 @@
+package org.apache.kylin.measure.dim;
+
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.realization.SQLDigest;
+
+/**
+ * Created by dongli on 4/20/16.
+ */
+public class DimCountDistinctMeasureType extends MeasureType<Object> {
+    public static class DimCountDistinctMeasureTypeFactory extends MeasureTypeFactory<Object> {
+
+        @Override
+        public MeasureType<Object> createMeasureType(String funcName, DataType dataType) {
+            return new DimCountDistinctMeasureType();
+        }
+
+        @Override
+        public String getAggrFunctionName() {
+            return null;
+        }
+
+        @Override
+        public String getAggrDataTypeName() {
+            return null;
+        }
+
+        @Override
+        public Class getAggrDataTypeSerializer() {
+            return null;
+        }
+
+    }
+    @Override
+    public MeasureIngester newIngester() {
+        throw new UnsupportedOperationException("No ingester for this measure type.");
+    }
+
+    @Override
+    public MeasureAggregator newAggregator() {
+        throw new UnsupportedOperationException("No aggregator for this measure type.");
+    }
+
+    @Override
+    public boolean needRewrite() {
+        return true;
+    }
+
+    @Override
+    public boolean needRewriteField() {
+        return false;
+    }
+
+    @Override
+    public Class<?> getRewriteCalciteAggrFunctionClass() {
+        return DimCountDistinctAggFunc.class;
+    }
+
+    public void adjustSqlDigest(MeasureDesc measureDesc, SQLDigest sqlDigest) {
+        sqlDigest.groupbyColumns.addAll(measureDesc.getFunction().getParameter().getColRefs());
+        sqlDigest.aggregations.remove(measureDesc.getFunction());
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/155291fd/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index f3a81d6..dcb2374 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -43,6 +43,7 @@ public class FunctionDesc {
     public static final String FUNC_MIN = "MIN";
     public static final String FUNC_MAX = "MAX";
     public static final String FUNC_COUNT = "COUNT";
+    public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
     public static final Set<String> BUILT_IN_AGGREGATIONS = Sets.newHashSet();
 
     static {
@@ -50,6 +51,7 @@ public class FunctionDesc {
         BUILT_IN_AGGREGATIONS.add(FUNC_MAX);
         BUILT_IN_AGGREGATIONS.add(FUNC_MIN);
         BUILT_IN_AGGREGATIONS.add(FUNC_SUM);
+        BUILT_IN_AGGREGATIONS.add(FUNC_COUNT_DISTINCT);
     }
 
     public static final String PARAMETER_TYPE_CONSTANT = "constant";
@@ -101,23 +103,41 @@ public class FunctionDesc {
         throw new IllegalStateException("Column is not found in any table from the model: " + columnName);
     }
 
+    private void reInitMeasure() {
+        if (isDimensionAsMetric && isCountDistinct()) {
+            // create DimCountDis
+            measureType = MeasureTypeFactory.createNoRewriteFieldsMeasureType(getExpression(), getReturnDataType());
+        } else {
+            measureType = MeasureTypeFactory.create(getExpression(), getReturnDataType());
+        }
+    }
+
     public MeasureType<?> getMeasureType() {
-        if (isDimensionAsMetric)
+        if (isDimensionAsMetric && !isCountDistinct()) {
             return null;
+        }
 
         if (measureType == null) {
-            measureType = MeasureTypeFactory.create(getExpression(), getReturnDataType());
+            reInitMeasure();
         }
+
         return measureType;
     }
 
     public boolean needRewrite() {
-        if (isDimensionAsMetric)
+        if (getMeasureType() == null)
             return false;
 
         return getMeasureType().needRewrite();
     }
 
+    public boolean needRewriteField() {
+        if (!needRewrite())
+            return false;
+
+        return getMeasureType().needRewriteField();
+    }
+
     public String getRewriteFieldName() {
         if (isSum()) {
             return getParameter().getValue();
@@ -163,6 +183,10 @@ public class FunctionDesc {
         return FUNC_COUNT.equalsIgnoreCase(expression);
     }
 
+    public boolean isCountDistinct() {
+        return FUNC_COUNT_DISTINCT.equalsIgnoreCase(expression);
+    }
+
     /**
      * Get Full Expression such as sum(amount), count(1), count(*)...
      */
@@ -182,6 +206,9 @@ public class FunctionDesc {
 
     public void setDimensionAsMetric(boolean isDimensionAsMetric) {
         this.isDimensionAsMetric = isDimensionAsMetric;
+        if (measureType != null) {
+            reInitMeasure();
+        }
     }
 
     public String getExpression() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/155291fd/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index 7de0484..031835b 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -107,7 +107,7 @@ public class ITKylinQueryTest extends KylinTestBase {
     @Test
     public void testSingleRunQuery() throws Exception {
 
-        String queryFileName = "src/test/resources/query/temp/query01.sql";
+        String queryFileName = "src/test/resources/query/sql/query100.sql";
 
         File sqlFile = new File(queryFileName);
         if (sqlFile.exists()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/155291fd/kylin-it/src/test/resources/query/debug/query78.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/debug/query78.sql b/kylin-it/src/test/resources/query/debug/query78.sql
deleted file mode 100644
index 299f1a4..0000000
--- a/kylin-it/src/test/resources/query/debug/query78.sql
+++ /dev/null
@@ -1,22 +0,0 @@
---
--- Licensed to the Apache Software Foundation (ASF) under one
--- or more contributor license agreements.  See the NOTICE file
--- distributed with this work for additional information
--- regarding copyright ownership.  The ASF licenses this file
--- to you under the Apache License, Version 2.0 (the
--- "License"); you may not use this file except in compliance
--- with the License.  You may obtain a copy of the License at
---
---     http://www.apache.org/licenses/LICENSE-2.0
---
--- Unless required by applicable law or agreed to in writing, software
--- distributed under the License is distributed on an "AS IS" BASIS,
--- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- See the License for the specific language governing permissions and
--- limitations under the License.
---
-
-select count(*) as c,sum(PRICE) as GMV, LSTG_FORMAT_NAME as FORMAT_NAME
-from test_kylin_fact
-where (LSTG_FORMAT_NAME in ('ABIN')) or  (LSTG_FORMAT_NAME>='FP-GTC' and LSTG_FORMAT_NAME<='Others')
-group by LSTG_FORMAT_NAME

http://git-wip-us.apache.org/repos/asf/kylin/blob/155291fd/kylin-it/src/test/resources/query/sql/query100.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql/query100.sql b/kylin-it/src/test/resources/query/sql/query100.sql
new file mode 100644
index 0000000..ca7367d
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql/query100.sql
@@ -0,0 +1,31 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+SELECT
+ test_kylin_fact.seller_id
+ ,count(DISTINCT test_cal_dt.cal_dt) as CNT_DT
+ ,count(DISTINCT test_kylin_fact.leaf_categ_id) as CNT_CATE_LEAF
+ ,count(DISTINCT test_category_groupings.meta_categ_name) as CNT_CATE
+ ,count(DISTINCT test_kylin_fact.lstg_format_name) as CNT_LSTG
+ ,count(test_cal_dt.cal_dt) as TRANS_CNT
+ FROM test_kylin_fact
+ inner JOIN edw.test_cal_dt as test_cal_dt
+ ON test_kylin_fact.cal_dt = test_cal_dt.cal_dt
+ inner JOIN test_category_groupings
+ ON test_kylin_fact.lstg_site_id = test_category_groupings.site_id  AND test_kylin_fact.leaf_categ_id = test_category_groupings.leaf_categ_id
+ group by  test_kylin_fact.seller_id
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/155291fd/kylin-it/src/test/resources/query/sql/query101.sql
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/resources/query/sql/query101.sql b/kylin-it/src/test/resources/query/sql/query101.sql
new file mode 100644
index 0000000..a5350c4
--- /dev/null
+++ b/kylin-it/src/test/resources/query/sql/query101.sql
@@ -0,0 +1,21 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one
+-- or more contributor license agreements.  See the NOTICE file
+-- distributed with this work for additional information
+-- regarding copyright ownership.  The ASF licenses this file
+-- to you under the Apache License, Version 2.0 (the
+-- "License"); you may not use this file except in compliance
+-- with the License.  You may obtain a copy of the License at
+--
+--     http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+select seller_id, lstg_site_id, count(DISTINCT leaf_categ_id) as CategCount
+from test_kylin_fact
+group by seller_id, lstg_site_id
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/155291fd/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index 9414757..9a2fa5f 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -163,7 +163,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
         for (int i = 0; i < this.aggregations.size(); i++) {
             FunctionDesc aggFunc = this.aggregations.get(i);
             TblColRef aggCol = null;
-            if (aggFunc.needRewrite()) {
+            if (aggFunc.needRewriteField()) {
                 aggCol = buildRewriteColumn(aggFunc);
             } else {
                 AggregateCall aggCall = this.rewriteAggCalls.get(i);
@@ -179,7 +179,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
 
     private TblColRef buildRewriteColumn(FunctionDesc aggFunc) {
         TblColRef colRef;
-        if (aggFunc.needRewrite()) {
+        if (aggFunc.needRewriteField()) {
             ColumnDesc column = new ColumnDesc();
             column.setName(aggFunc.getRewriteFieldName());
             TableDesc table = this.context.firstTableScan.getOlapTable().getSourceTable();
@@ -234,7 +234,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
             translateAggregation();
             buildRewriteFieldsAndMetricsColumns();
         }
-        
+
         implementor.visitChild(this, getInput());
 
         // only rewrite the innermost aggregation
@@ -280,18 +280,18 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
 
     private void buildRewriteFieldsAndMetricsColumns() {
         fillbackOptimizedColumn();
-        
+
         ColumnRowType inputColumnRowType = ((OLAPRel) getInput()).getColumnRowType();
         RelDataTypeFactory typeFactory = getCluster().getTypeFactory();
         for (int i = 0; i < this.aggregations.size(); i++) {
             FunctionDesc aggFunc = this.aggregations.get(i);
-            
+
             if (aggFunc.isDimensionAsMetric()) {
                 this.context.groupByColumns.addAll(aggFunc.getParameter().getColRefs());
                 continue; // skip rewrite, let calcite handle
             }
-            
-            if (aggFunc.needRewrite()) {
+
+            if (aggFunc.needRewriteField()) {
                 String rewriteFieldName = aggFunc.getRewriteFieldName();
                 RelDataType rewriteFieldType = OLAPTable.createSqlType(typeFactory, aggFunc.getRewriteFieldType(), true);
                 this.context.rewriteFields.put(rewriteFieldName, rewriteFieldType);
@@ -327,12 +327,14 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
     }
 
     private AggregateCall rewriteAggregateCall(AggregateCall aggCall, FunctionDesc func) {
-
         // rebuild parameters
-        List<Integer> newArgList = new ArrayList<Integer>(1);
-        String fieldName = func.getRewriteFieldName();
-        RelDataTypeField field = getInput().getRowType().getField(fieldName, true, false);
-        newArgList.add(field.getIndex());
+        List<Integer> newArgList = Lists.newArrayListWithCapacity(1);
+        if (func.needRewriteField()) {
+            RelDataTypeField field = getInput().getRowType().getField(func.getRewriteFieldName(), true, false);
+            newArgList.add(field.getIndex());
+        } else {
+            newArgList = aggCall.getArgList();
+        }
 
         // rebuild function
         RelDataType fieldType = aggCall.getType();

http://git-wip-us.apache.org/repos/asf/kylin/blob/155291fd/query/src/main/java/org/apache/kylin/query/relnode/OLAPRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPRel.java
index 59179df..f2b52a7 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPRel.java
@@ -127,8 +127,7 @@ public interface OLAPRel extends RelNode {
 
         public static boolean needRewrite(OLAPContext ctx) {
             boolean hasFactTable = ctx.hasJoin || ctx.firstTableScan.getTableName().equals(ctx.realization.getFactTable());
-            boolean hasRewriteFields = !ctx.rewriteFields.isEmpty();
-            return hasRewriteFields && hasFactTable;
+            return hasFactTable;
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/155291fd/query/src/test/java/org/apache/kylin/query/aggregate/DimCountDistinctAggFuncTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/aggregate/DimCountDistinctAggFuncTest.java b/query/src/test/java/org/apache/kylin/query/aggregate/DimCountDistinctAggFuncTest.java
new file mode 100644
index 0000000..8f34e99
--- /dev/null
+++ b/query/src/test/java/org/apache/kylin/query/aggregate/DimCountDistinctAggFuncTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.query.aggregate;
+
+import static org.junit.Assert.*;
+
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.measure.dim.DimCountDistinctAggFunc;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class DimCountDistinctAggFuncTest extends LocalFileMetadataTestCase {
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testBasic() {
+        DimCountDistinctAggFunc.DimDistinctCounter counter = DimCountDistinctAggFunc.init();
+
+        for (int i = 0; i < 10; i++) {
+            for (int j = 0; j < 2; j++) {
+                counter = DimCountDistinctAggFunc.add(counter, i);
+                counter = DimCountDistinctAggFunc.add(counter, (double) i);
+                counter = DimCountDistinctAggFunc.add(counter, (char) i);
+                counter = DimCountDistinctAggFunc.add(counter, Integer.toString(i));
+            }
+        }
+
+        assertEquals(40, DimCountDistinctAggFunc.result(counter));
+    }
+
+    @Test
+    public void testEmpty() {
+        DimCountDistinctAggFunc.DimDistinctCounter counter = DimCountDistinctAggFunc.init();
+        assertEquals(0, DimCountDistinctAggFunc.result(counter));
+    }
+
+    @Test
+    public void testThreshold() {
+        System.setProperty("kylin.query.dim.distinct.max", "100");
+
+        DimCountDistinctAggFunc.DimDistinctCounter counter = DimCountDistinctAggFunc.init();
+
+        thrown.expect(RuntimeException.class);
+        thrown.expectMessage("Cardinality of dimension exceeds the threshold: 100");
+
+        for (int i = 0; i < 200; i++) {
+            counter = DimCountDistinctAggFunc.add(counter, i);
+        }
+
+        System.clearProperty("kylin.query.dim.distinct.max");
+    }
+}