You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/27 07:25:27 UTC

[14/41] incubator-kylin git commit: KYLIN-608 hll works

KYLIN-608 hll works


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/579e793a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/579e793a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/579e793a

Branch: refs/heads/inverted-index
Commit: 579e793a16bff1c0a3cb5063051d8c0c8cbc0c19
Parents: 16b184d
Author: honma <ho...@ebay.com>
Authored: Thu Feb 12 17:09:51 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Thu Feb 12 17:39:23 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/util/BasicTest.java |  2 +
 .../invertedindex/index/TableRecordInfo.java    |  7 ++--
 .../kylin/invertedindex/model/IIDesc.java       | 40 ++++++++++++++------
 .../measure/fixedlen/FixedHLLCodec.java         |  2 +-
 .../measure/fixedlen/FixedLenMeasureCodec.java  |  8 ++--
 .../apache/kylin/metadata/model/DataType.java   |  1 +
 .../kylin/metadata/model/FunctionDesc.java      | 12 +++---
 .../AdjustForWeeklyMatchedRealization.java      |  8 ++--
 .../apache/kylin/query/test/IIQueryTest.java    |  9 ++++-
 .../resources/query/sql_fast_common/query00.sql |  5 +++
 .../kylin/storage/hbase/CubeStorageEngine.java  |  2 +-
 .../hbase/coprocessor/CoprocessorConstants.java |  2 +-
 .../endpoint/EndpointAggregators.java           | 25 +++++++-----
 .../endpoint/EndpointTupleIterator.java         | 22 ++++++++---
 14 files changed, 97 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 0a33f9f..59ed5f3 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -67,5 +67,7 @@ public class BasicTest {
     @Test
     @Ignore("fix it later")
     public void test2() throws IOException, ConfigurationException {
+        int m = 1 << 15;
+        System.out.println(m);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index 886c649..7af3dcb 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -129,13 +129,12 @@ public class TableRecordInfo {
         return desc.findColumn(col);
     }
 
-    public int findMetric(String metricColumnName) {
-        if (metricColumnName == null)
+    public int findFactTableColumn(String columnName) {
+        if (columnName == null)
             return -1;
         for (int i = 0; i < allColumns.size(); ++i) {
             TblColRef tblColRef = allColumns.get(i);
-            if (measureSerializers[i] != null // has measureSerializers means it is a metric
-                    && tblColRef.isSameAs(desc.getFactTableName(), metricColumnName)) {
+            if (tblColRef.isSameAs(desc.getFactTableName(), columnName)) {
                 return i;
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
index 5bc611e..6e1224a 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 
 import org.apache.commons.net.util.Base64;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
@@ -109,14 +110,18 @@ public class IIDesc extends RootPersistentEntity {
         IIDimension.capicalizeStrings(valueDimensions);
         StringUtil.toUpperCaseArray(metricNames, metricNames);
 
-        // retrieve all columns and all tables
+        // retrieve all columns and all tables, and make available measure to ii
         HashSet<String> allTableNames = Sets.newHashSet();
+        measureDescs = Lists.newArrayList();
+        measureDescs.add(makeCountMeasure());
         for (IIDimension iiDimension : Iterables.concat(bitmapDimensions, valueDimensions)) {
             TableDesc tableDesc = this.getTableDesc(iiDimension.getTable());
             for (String column : iiDimension.getColumns()) {
                 ColumnDesc columnDesc = tableDesc.findColumnByName(column);
                 allColumns.add(new TblColRef(columnDesc));
+                measureDescs.add(makeHLLMeasure(columnDesc, null));
             }
+
             if (!allTableNames.contains(tableDesc.getIdentity())) {
                 allTableNames.add(tableDesc.getIdentity());
                 allTables.add(tableDesc);
@@ -126,6 +131,9 @@ public class IIDesc extends RootPersistentEntity {
             TableDesc tableDesc = this.getTableDesc(this.getFactTableName());
             ColumnDesc columnDesc = tableDesc.findColumnByName(column);
             allColumns.add(new TblColRef(columnDesc));
+            measureDescs.add(makeNormalMeasure("SUM", columnDesc));
+            measureDescs.add(makeNormalMeasure("MIN", columnDesc));
+            measureDescs.add(makeNormalMeasure("MAX", columnDesc));
             if (!allTableNames.contains(tableDesc.getIdentity())) {
                 allTableNames.add(tableDesc.getIdentity());
                 allTables.add(tableDesc);
@@ -136,9 +144,7 @@ public class IIDesc extends RootPersistentEntity {
         bitmapCols = new int[IIDimension.getColumnCount(bitmapDimensions)];
         valueCols = new int[IIDimension.getColumnCount(valueDimensions)];
         metricsCols = new int[metricNames.length];
-
         metricsColSet = new BitSet(this.getTableDesc(this.getFactTableName()).getColumnCount());
-        measureDescs = Lists.newArrayList();
 
         int totalIndex = 0;
         for (int i = 0; i < bitmapCols.length; ++i, ++totalIndex) {
@@ -150,14 +156,7 @@ public class IIDesc extends RootPersistentEntity {
         for (int i = 0; i < metricsCols.length; ++i, ++totalIndex) {
             metricsCols[i] = totalIndex;
             metricsColSet.set(totalIndex);
-
-            ColumnDesc col = this.getTableDesc(this.getFactTableName()).findColumnByName(metricNames[i]);
-            measureDescs.add(makeMeasureDescs("SUM", col));
-            measureDescs.add(makeMeasureDescs("MIN", col));
-            measureDescs.add(makeMeasureDescs("MAX", col));
-            // TODO support for HLL
         }
-        measureDescs.add(makeCountMeasure());
 
         // partitioning column
         tsCol = -1;
@@ -197,7 +196,7 @@ public class IIDesc extends RootPersistentEntity {
         return functions;
     }
 
-    private MeasureDesc makeMeasureDescs(String func, ColumnDesc columnDesc) {
+    private MeasureDesc makeNormalMeasure(String func, ColumnDesc columnDesc) {
         String columnName = columnDesc.getName();
         String returnType = columnDesc.getTypeName();
         MeasureDesc measureDesc = new MeasureDesc();
@@ -213,6 +212,25 @@ public class IIDesc extends RootPersistentEntity {
         return measureDesc;
     }
 
+    /**
+     * 
+     * @param hllType represents the presision
+     */
+    private MeasureDesc makeHLLMeasure(ColumnDesc columnDesc, String hllType) {
+        String columnName = columnDesc.getName();
+        MeasureDesc measureDesc = new MeasureDesc();
+        FunctionDesc f1 = new FunctionDesc();
+        f1.setExpression("COUNT_DISTINCT");
+        ParameterDesc p1 = new ParameterDesc();
+        p1.setType("column");
+        p1.setValue(columnName);
+        p1.setColRefs(ImmutableList.of(new TblColRef(columnDesc)));
+        f1.setParameter(p1);
+        f1.setReturnType(hllType);
+        measureDesc.setFunction(f1);
+        return measureDesc;
+    }
+
     private MeasureDesc makeCountMeasure() {
         MeasureDesc measureDesc = new MeasureDesc();
         FunctionDesc f1 = new FunctionDesc();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
index c6d4dc9..138940f 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
@@ -53,6 +53,6 @@ public class FixedHLLCodec extends FixedLenMeasureCodec<HyperLogLogPlusCounter>
 
     @Override
     public void write(HyperLogLogPlusCounter v, byte[] buf, int offset) {
-        current.writeRegistersArray(ByteBuffer.wrap(buf, offset, buf.length - offset));
+        v.writeRegistersArray(ByteBuffer.wrap(buf, offset, buf.length - offset));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
index 650432a..ad8c483 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
@@ -20,11 +20,14 @@ package org.apache.kylin.metadata.measure.fixedlen;
 
 import org.apache.kylin.metadata.model.DataType;
 
-
 abstract public class FixedLenMeasureCodec<T> {
 
     public static FixedLenMeasureCodec<?> get(DataType type) {
-        return new FixedPointLongCodec(type);
+        if (type.isHLLC()) {
+            return new FixedHLLCodec(type);
+        } else {
+            return new FixedPointLongCodec(type);
+        }
     }
 
     abstract public int getLength();
@@ -33,7 +36,6 @@ abstract public class FixedLenMeasureCodec<T> {
 
     abstract public T valueOf(String value);
 
-
     abstract public Object getValue();
 
     abstract public T read(byte[] buf, int offset);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java b/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
index 70c24c9..a4e8db6 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/DataType.java
@@ -89,6 +89,7 @@ public class DataType {
 
     private static final ConcurrentMap<DataType, DataType> CACHE = new ConcurrentHashMap<DataType, DataType>();
 
+
     public static DataType getInstance(String type) {
         if (type == null)
             return null;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index e80532c..eda31a1 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -48,7 +48,7 @@ public class FunctionDesc {
     private String returnType;
 
     private DataType returnDataType;
-    private boolean isAppliedOnDimension = false;
+    private boolean isDimensionAsMetric = false;
 
     public String getRewriteFieldName() {
         if (isSum()) {
@@ -62,7 +62,7 @@ public class FunctionDesc {
     }
 
     public boolean needRewrite() {
-        return !isSum() && !isHolisticCountDistinct() && !isAppliedOnDimension();
+        return !isSum() && !isHolisticCountDistinct() && !isDimensionAsMetric();
     }
 
     public boolean isMin() {
@@ -106,12 +106,12 @@ public class FunctionDesc {
         return sb.toString();
     }
 
-    public boolean isAppliedOnDimension() {
-        return isAppliedOnDimension;
+    public boolean isDimensionAsMetric() {
+        return isDimensionAsMetric;
     }
 
-    public void setAppliedOnDimension(boolean isAppliedOnDimension) {
-        this.isAppliedOnDimension = isAppliedOnDimension;
+    public void setDimensionAsMetric(boolean isDimensionAsMetric) {
+        this.isDimensionAsMetric = isDimensionAsMetric;
     }
 
     public String getExpression() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
index 5f49ba4..7a36cf1 100644
--- a/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
+++ b/query/src/main/java/org/apache/kylin/query/routing/RoutingRules/AdjustForWeeklyMatchedRealization.java
@@ -61,7 +61,7 @@ public class AdjustForWeeklyMatchedRealization extends RoutingRule {
     private static void adjustOLAPContextIfNecessary(IIInstance ii, OLAPContext olapContext) {
         IIDesc iiDesc = ii.getDescriptor();
         Collection<FunctionDesc> iiFuncs = iiDesc.listAllFunctions();
-        convertAggreationToDimension(olapContext, iiFuncs, iiDesc.getFactTableName());
+        convertAggregationToDimension(olapContext, iiFuncs, iiDesc.getFactTableName());
     }
 
     private static void adjustOLAPContextIfNecessary(CubeInstance cube, OLAPContext olapContext) {
@@ -70,17 +70,17 @@ public class AdjustForWeeklyMatchedRealization extends RoutingRule {
 
         CubeDesc cubeDesc = cube.getDescriptor();
         Collection<FunctionDesc> cubeFuncs = cubeDesc.listAllFunctions();
-        convertAggreationToDimension(olapContext, cubeFuncs, cubeDesc.getFactTable());
+        convertAggregationToDimension(olapContext, cubeFuncs, cubeDesc.getFactTable());
     }
 
-    private static void convertAggreationToDimension(OLAPContext olapContext, Collection<FunctionDesc> availableAggregations, String factTableName) {
+    private static void convertAggregationToDimension(OLAPContext olapContext, Collection<FunctionDesc> availableAggregations, String factTableName) {
         Iterator<FunctionDesc> it = olapContext.aggregations.iterator();
         while (it.hasNext()) {
             FunctionDesc functionDesc = it.next();
             if (!availableAggregations.contains(functionDesc)) {
                 // try to convert the metric to dimension to see if it works
                 TblColRef col = functionDesc.selectTblColRef(olapContext.metricsColumns, factTableName);
-                functionDesc.setAppliedOnDimension(true);
+                functionDesc.setDimensionAsMetric(true);
                 olapContext.rewriteFields.remove(functionDesc.getRewriteFieldName());
                 if (col != null) {
                     olapContext.metricsColumns.remove(col);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
----------------------------------------------------------------------
diff --git a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java b/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
index d37eac0..d36ffb9 100644
--- a/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
+++ b/query/src/test/java/org/apache/kylin/query/test/IIQueryTest.java
@@ -37,7 +37,7 @@ public class IIQueryTest extends KylinQueryTest {
     public static void setUp() throws Exception {
 
         KylinQueryTest.setUp();//invoke super class
-        distinctCountSupported = false;
+        distinctCountSupported = true;
 
         Map<RealizationType, Integer> priorities = Maps.newHashMap();
         priorities.put(RealizationType.INVERTED_INDEX, 0);
@@ -64,10 +64,15 @@ public class IIQueryTest extends KylinQueryTest {
 
     @Test
     public void testSingleRunQuery() throws Exception {
-        String queryFileName = "src/test/resources/query/sql_ii/query04.sql";
+        String queryFileName = "src/test/resources/query/sql_distinct/query00.sql";
 
         File sqlFile = new File(queryFileName);
         runSQL(sqlFile, true, true);
         runSQL(sqlFile, true, false);
     }
+
+    @Test
+    public void testDistinctCountQuery() throws Exception {
+        super.testDistinctCountQuery();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/query/src/test/resources/query/sql_fast_common/query00.sql
----------------------------------------------------------------------
diff --git a/query/src/test/resources/query/sql_fast_common/query00.sql b/query/src/test/resources/query/sql_fast_common/query00.sql
new file mode 100644
index 0000000..198aea1
--- /dev/null
+++ b/query/src/test/resources/query/sql_fast_common/query00.sql
@@ -0,0 +1,5 @@
+select lstg_format_name, cal_dt,
+ sum(price) as GMV,
+ count(1) as TRANS_CNT
+ from test_kylin_fact
+ group by lstg_format_name, cal_dt
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
index 6eddd8d..a4ecc2a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
@@ -136,7 +136,7 @@ public class CubeStorageEngine implements IStorageEngine {
     private void buildDimensionsAndMetrics(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, SQLDigest sqlDigest) {
 
         for (FunctionDesc func : sqlDigest.aggregations) {
-            if (!func.isAppliedOnDimension()) {
+            if (!func.isDimensionAsMetric()) {
                 metrics.add(func);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorConstants.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorConstants.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorConstants.java
index 40ba64a..7efb283 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorConstants.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/CoprocessorConstants.java
@@ -23,5 +23,5 @@ package org.apache.kylin.storage.hbase.coprocessor;
  */
 public class CoprocessorConstants {
     public static final int SERIALIZE_BUFFER_SIZE = 65536;
-    public static final int METRIC_SERIALIZE_BUFFER_SIZE = 1024;
+    public static final int METRIC_SERIALIZE_BUFFER_SIZE = 65536;
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
index 516c160..f8bf182 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
@@ -50,12 +50,12 @@ public class EndpointAggregators {
     private static class MetricInfo {
         private MetricType type;
         private int refIndex = -1;
-        private int presision = -1;
+        private int precision = -1;
 
         public MetricInfo(MetricType type, int refIndex, int presision) {
             this.type = type;
             this.refIndex = refIndex;
-            this.presision = presision;
+            this.precision = presision;
         }
 
         public MetricInfo(MetricType type, int refIndex) {
@@ -83,12 +83,12 @@ public class EndpointAggregators {
 
             if (functionDesc.isCount()) {
                 metricInfos[i] = new MetricInfo(MetricType.Count);
-            } else if (functionDesc.isAppliedOnDimension()) {
+            } else if (functionDesc.isDimensionAsMetric()) {
                 metricInfos[i] = new MetricInfo(MetricType.DimensionAsMetric);
             } else {
-                int index = tableInfo.findMetric(functionDesc.getParameter().getValue());
+                int index = tableInfo.findFactTableColumn(functionDesc.getParameter().getValue());
                 if (index < 0) {
-                    throw new IllegalStateException("Column " + functionDesc.getParameter().getColRefs().get(0) + " is not found in II");
+                    throw new IllegalStateException("Column " + functionDesc.getParameter().getValue() + " is not found in II");
                 }
 
                 if (functionDesc.isCountDistinct()) {
@@ -141,9 +141,13 @@ public class EndpointAggregators {
 
     public MeasureAggregator[] createBuffer() {
         MeasureAggregator[] aggrs = new MeasureAggregator[funcNames.length];
-        for (int j = 0; j < aggrs.length; j++) {
-            //all fixed length measures can be aggregated as long
-            aggrs[j] = MeasureAggregator.create(funcNames[j], "long");
+        for (int i = 0; i < aggrs.length; i++) {
+            if (metricInfos[i].type == MetricType.DistinctCount) {
+                aggrs[i] = MeasureAggregator.create(funcNames[i], dataTypes[i]);
+            } else {
+                //all other fixed length measures can be aggregated as long
+                aggrs[i] = MeasureAggregator.create(funcNames[i], "long");
+            }
         }
         return aggrs;
     }
@@ -179,7 +183,8 @@ public class EndpointAggregators {
                 //TODO: for unified dictionary, this is okay. but if different data blocks uses different dictionary, we'll have to aggregate original data
                 HyperLogLogPlusCounter hllc = hllcs[metricIndex];
                 if (hllc == null) {
-                    hllc = new HyperLogLogPlusCounter(metricInfo.presision);
+                    int precision = metricInfo.precision;
+                    hllc = new HyperLogLogPlusCounter(precision);
                 }
                 hllc.clear();
                 hllc.add(byteBuffer.get(), byteBuffer.getOffset(), byteBuffer.getLength());
@@ -244,7 +249,7 @@ public class EndpointAggregators {
                 MetricInfo metricInfo = value.metricInfos[i];
                 BytesUtil.writeAsciiString(metricInfo.type.toString(), out);
                 BytesUtil.writeVInt(metricInfo.refIndex, out);
-                BytesUtil.writeVInt(metricInfo.presision, out);
+                BytesUtil.writeVInt(metricInfo.precision, out);
             }
 
             BytesUtil.writeByteArray(TableRecordInfoDigest.serialize(value.tableRecordInfoDigest), out);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/579e793a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
index 465f7f3..5bf22e7 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointTupleIterator.java
@@ -54,6 +54,8 @@ import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
 import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
 
+import javax.xml.datatype.DatatypeConfigurationException;
+
 /**
  * Created by Hongbin Ma(Binmahone) on 12/2/14.
  */
@@ -99,7 +101,9 @@ public class EndpointTupleIterator implements ITupleIterator {
         if (measures == null) {
             measures = Lists.newArrayList();
         }
-        initMeaureParameters(measures, segment.getColumns());
+
+        //this method will change measures
+        rewriteMeasureParameters(measures, segment.getColumns());
 
         this.seg = segment;
         this.context = context;
@@ -137,7 +141,7 @@ public class EndpointTupleIterator implements ITupleIterator {
      * @param measures
      * @param columns
      */
-    private void initMeaureParameters(List<FunctionDesc> measures, List<TblColRef> columns) {
+    private void rewriteMeasureParameters(List<FunctionDesc> measures, List<TblColRef> columns) {
         for (FunctionDesc functionDesc : measures) {
             if (functionDesc.isCount()) {
                 functionDesc.setReturnType("bigint");
@@ -146,8 +150,15 @@ public class EndpointTupleIterator implements ITupleIterator {
                 boolean updated = false;
                 for (TblColRef column : columns) {
                     if (column.isSameAs(factTableName, functionDesc.getParameter().getValue())) {
-                        functionDesc.setReturnType(column.getColumn().getType().toString());
-                        functionDesc.setReturnDataType(DataType.getInstance(functionDesc.getReturnType()));
+                        if (functionDesc.isCountDistinct()) {
+                            //TODO: default precision might need be configurable
+                            String iiDefaultHLLC = "hllc10";
+                            functionDesc.setReturnType(iiDefaultHLLC);
+                            functionDesc.setReturnDataType(DataType.getInstance(iiDefaultHLLC));
+                        } else {
+                            functionDesc.setReturnType(column.getColumn().getType().toString());
+                            functionDesc.setReturnDataType(DataType.getInstance(functionDesc.getReturnType()));
+                        }
                         functionDesc.getParameter().setColRefs(ImmutableList.of(column));
                         updated = true;
                         break;
@@ -292,6 +303,7 @@ public class EndpointTupleIterator implements ITupleIterator {
             this.tableRecord.setBytes(columnsBytes, 0, columnsBytes.length);
             if (currentRow.hasMeasures()) {
                 byte[] measuresBytes = currentRow.getMeasures().toByteArray();
+
                 this.measureValues = pushedDownAggregators.deserializeMetricValues(measuresBytes, 0);
             }
 
@@ -318,7 +330,7 @@ public class EndpointTupleIterator implements ITupleIterator {
 
             if (measureValues != null) {
                 for (int i = 0; i < measures.size(); ++i) {
-                    if (!measures.get(i).isAppliedOnDimension()) {
+                    if (!measures.get(i).isDimensionAsMetric()) {
                         String fieldName = measures.get(i).getRewriteFieldName();
                         Object value = measureValues.get(i);
                         String dataType = tuple.getDataType(fieldName);