You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/12 06:43:11 UTC

[21/50] incubator-kylin git commit: hll on ii on going

hll on ii on going


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/0a96d742
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/0a96d742
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/0a96d742

Branch: refs/heads/inverted-index
Commit: 0a96d742771c8a3cafc2f5c54265cbc6531ecf2e
Parents: 9fb8aaa
Author: honma <ho...@ebay.com>
Authored: Wed Feb 11 09:14:20 2015 +0800
Committer: honma <ho...@ebay.com>
Committed: Wed Feb 11 09:14:20 2015 +0800

----------------------------------------------------------------------
 .../common/hll/HyperLogLogPlusCounter.java      | 28 ++++++--
 .../org/apache/kylin/common/util/BasicTest.java |  6 +-
 .../common/util/HBaseMiniclusterHelper.java     |  2 -
 .../common/util/HyperLogLogCounterTest.java     |  4 +-
 .../invertedindex/index/TableRecordInfo.java    |  3 +-
 .../cardinality/ColumnCardinalityMapper.java    |  2 +-
 .../cardinality/ColumnCardinalityReducer.java   |  4 +-
 .../job/tools/ColumnCardinalityMapperTest.java  |  4 +-
 .../job/tools/ColumnCardinalityReducerTest.java |  2 +-
 .../kylin/metadata/measure/HLLCSerializer.java  |  6 +-
 .../measure/fixedlen/FixedHLLCodec.java         | 58 ++++++++++++++++
 .../measure/fixedlen/FixedLenMeasureCodec.java  |  1 +
 .../query/sqlfunc/HLLDistinctCountAggFunc.java  |  4 +-
 .../endpoint/EndpointAggregators.java           | 71 +++++++++++++++-----
 14 files changed, 149 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java b/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
index e725e21..c323b90 100644
--- a/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
+++ b/common/src/main/java/org/apache/kylin/common/hll/HyperLogLogPlusCounter.java
@@ -66,7 +66,7 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter
     /** The larger p is, the more storage (2^p bytes), the better accuracy */
     private HyperLogLogPlusCounter(int p, HashFunction hashFunc) {
         this.p = p;
-        this.m = (int) Math.pow(2, p);
+        this.m = 1 << p;//(int) Math.pow(2, p);
         this.hashFunc = hashFunc;
         this.registers = new byte[m];
     }
@@ -77,7 +77,7 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter
     }
 
     public void add(String value) {
-        add(hashFunc.hashString(value,Charset.defaultCharset()).asLong());
+        add(hashFunc.hashString(value, Charset.defaultCharset()).asLong());
     }
 
     public void add(byte[] value) {
@@ -223,7 +223,7 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter
                                                                          // the
                                                                          // moment
 
-    public void writeRegisters(final ByteBuffer out) throws IOException {
+    public void writeCompactRegisters(final ByteBuffer out) throws IOException {
         int startPos = out.position();
 
         final int indexLen = getRegisterIndexSize();
@@ -264,7 +264,7 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter
         out.put(compressed);
     }
 
-    public void readRegisters(ByteBuffer in) throws IOException {
+    public void readCompactRegisters(ByteBuffer in) throws IOException {
         byte scheme = in.get();
         if ((scheme & COMPRESSION_FLAG) > 0) {
             scheme ^= COMPRESSION_FLAG;
@@ -286,12 +286,26 @@ public class HyperLogLogPlusCounter implements Comparable<HyperLogLogPlusCounter
                 registers[key] = in.get();
             }
         } else { // array scheme
-            for (int i = 0; i < m; i++) {
-                registers[i] = in.get();
-            }
+            in.get(registers);
         }
     }
 
+    /**
+     * For compressed output use writeCompactRegisters
+     * @param out
+     */
+    public void writeRegisters(final ByteBuffer out) {
+        out.put(this.registers);
+    }
+
+    /**
+     * For compressed input use readCompactRegisters
+     * @param in
+     */
+    public void readRegisters(ByteBuffer in) {
+        in.get(registers, 0, m);
+    }
+
     private int getRegisterIndexSize() {
         return (p - 1) / 8 + 1; // 2 when p=16, 3 when p=17
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
index 476b9c1..5952c33 100644
--- a/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/BasicTest.java
@@ -64,13 +64,9 @@ public class BasicTest {
     public void test1() throws Exception {
     }
 
+
     @Test
     @Ignore("fix it later")
     public void test2() throws IOException, ConfigurationException {
-        PropertiesConfiguration a = new PropertiesConfiguration();
-        a.setProperty("hi", "dd");
-
-        System.out.println("dfads");
-        a.save(System.out);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java b/common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java
index 58058ab..3846302 100644
--- a/common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java
+++ b/common/src/test/java/org/apache/kylin/common/util/HBaseMiniclusterHelper.java
@@ -18,7 +18,6 @@
 
 package org.apache.kylin.common.util;
 
-import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -27,7 +26,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.HBaseConnection;
 import org.apache.kylin.common.persistence.HBaseResourceStore;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java
----------------------------------------------------------------------
diff --git a/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java b/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java
index 75e84c1..088219f 100644
--- a/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java
+++ b/common/src/test/java/org/apache/kylin/common/util/HyperLogLogCounterTest.java
@@ -108,9 +108,9 @@ public class HyperLogLogCounterTest {
     private void checkSerialize(HyperLogLogPlusCounter hllc) throws IOException {
         long estimate = hllc.getCountEstimate();
         buf.clear();
-        hllc.writeRegisters(buf);
+        hllc.writeCompactRegisters(buf);
         buf.flip();
-        hllc.readRegisters(buf);
+        hllc.readCompactRegisters(buf);
         Assert.assertEquals(estimate, hllc.getCountEstimate());
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
index dca1a65..886c649 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/index/TableRecordInfo.java
@@ -134,7 +134,8 @@ public class TableRecordInfo {
             return -1;
         for (int i = 0; i < allColumns.size(); ++i) {
             TblColRef tblColRef = allColumns.get(i);
-            if (measureSerializers[i] != null && tblColRef.isSameAs(desc.getFactTableName(), metricColumnName)) {
+            if (measureSerializers[i] != null // has measureSerializers means it is a metric
+                    && tblColRef.isSameAs(desc.getFactTableName(), metricColumnName)) {
                 return i;
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
index dd7c720..329914a 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityMapper.java
@@ -94,7 +94,7 @@ public class ColumnCardinalityMapper<T> extends KylinMapper<T, HCatRecord, IntWr
             HyperLogLogPlusCounter hllc = hllcMap.get(key);
             ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
             buf.clear();
-            hllc.writeRegisters(buf);
+            hllc.writeCompactRegisters(buf);
             buf.flip();
             context.write(new IntWritable(key), new BytesWritable(buf.array(), buf.limit()));
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
index a9ac1a3..b4b55d3 100644
--- a/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
+++ b/job/src/main/java/org/apache/kylin/job/hadoop/cardinality/ColumnCardinalityReducer.java
@@ -50,7 +50,7 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri
         for (BytesWritable v : values) {
             ByteBuffer buffer = ByteBuffer.wrap(v.getBytes());
             HyperLogLogPlusCounter hll = new HyperLogLogPlusCounter();
-            hll.readRegisters(buffer);
+            hll.readCompactRegisters(buffer);
             getHllc(skey).merge(hll);
             hll.clear();
         }
@@ -77,7 +77,7 @@ public class ColumnCardinalityReducer extends KylinReducer<IntWritable, BytesWri
             HyperLogLogPlusCounter hllc = hllcMap.get(key);
             ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
             buf.clear();
-            hllc.writeRegisters(buf);
+            hllc.writeCompactRegisters(buf);
             buf.flip();
             context.write(new IntWritable(key), new LongWritable(hllc.getCountEstimate()));
             // context.write(new Text("ErrorRate_" + key), new

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
index e13289a..08c1611 100644
--- a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
+++ b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityMapperTest.java
@@ -84,7 +84,7 @@ public class ColumnCardinalityMapperTest {
         BytesWritable value1 = result.get(0).getSecond();
         byte[] bytes = value1.getBytes();
         HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter();
-        hllc.readRegisters(ByteBuffer.wrap(bytes));
+        hllc.readCompactRegisters(ByteBuffer.wrap(bytes));
         assertTrue(key1 > 0);
         assertEquals(8, hllc.getCountEstimate());
     }
@@ -117,7 +117,7 @@ public class ColumnCardinalityMapperTest {
         BytesWritable value1 = result.get(0).getSecond();
         byte[] bytes = value1.getBytes();
         HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter();
-        hllc.readRegisters(ByteBuffer.wrap(bytes));
+        hllc.readCompactRegisters(ByteBuffer.wrap(bytes));
         System.out.println("ab\177ab".length());
         assertTrue(key1 > 0);
         assertEquals(1, hllc.getCountEstimate());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
index 034c90e..6ad27a8 100644
--- a/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/tools/ColumnCardinalityReducerTest.java
@@ -67,7 +67,7 @@ public class ColumnCardinalityReducerTest {
         }
         ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
         buf.clear();
-        hllc.writeRegisters(buf);
+        hllc.writeCompactRegisters(buf);
         buf.flip();
         return buf.array();
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
index f63c8d7..20c6c21 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/HLLCSerializer.java
@@ -31,14 +31,14 @@ public class HLLCSerializer extends MeasureSerializer<HyperLogLogPlusCounter> {
 
     HyperLogLogPlusCounter current;
 
-    HLLCSerializer(int p) {
+    public HLLCSerializer(int p) {
         current = new HyperLogLogPlusCounter(p);
     }
 
     @Override
     public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) {
         try {
-            value.writeRegisters(out);
+            value.writeCompactRegisters(out);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -47,7 +47,7 @@ public class HLLCSerializer extends MeasureSerializer<HyperLogLogPlusCounter> {
     @Override
     public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
         try {
-            current.readRegisters(in);
+            current.readCompactRegisters(in);
         } catch (IOException e) {
             throw new RuntimeException(e);
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
new file mode 100644
index 0000000..9f6a1ba
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedHLLCodec.java
@@ -0,0 +1,58 @@
+package org.apache.kylin.metadata.measure.fixedlen;
+
+import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
+import org.apache.kylin.metadata.measure.HLLCSerializer;
+import org.apache.kylin.metadata.model.DataType;
+
+import java.util.Map;
+
+/**
+ * Created by Hongbin Ma(Binmahone) on 2/10/15.
+ */
+public class FixedHLLCodec extends FixedLenMeasureCodec<HyperLogLogPlusCounter> {
+
+    private DataType type;
+    private int presision;
+    private HyperLogLogPlusCounter current;
+
+    public FixedHLLCodec(DataType type) {
+        this.type = type;
+        this.presision = type.getPrecision();
+        this.current = new HyperLogLogPlusCounter(this.presision);
+    }
+
+    @Override
+    public int getLength() {
+        return 1 << presision;
+    }
+
+    @Override
+    public DataType getDataType() {
+        return type;
+    }
+
+    @Override
+    public HyperLogLogPlusCounter valueOf(String value) {
+        current.clear();
+        if (value == null)
+            current.add("__nUlL__");
+        else
+            current.add(value.getBytes());
+        return current;
+    }
+
+    @Override
+    public String toString(HyperLogLogPlusCounter value) {
+        return String.valueOf(value.getCountEstimate());
+    }
+
+    @Override
+    public HyperLogLogPlusCounter read(byte[] buf, int offset) {
+        return serializer.deserialize();
+    }
+
+    @Override
+    public void write(HyperLogLogPlusCounter v, byte[] buf, int offset) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
----------------------------------------------------------------------
diff --git a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
index 00de630..41a6356 100644
--- a/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
+++ b/metadata/src/main/java/org/apache/kylin/metadata/measure/fixedlen/FixedLenMeasureCodec.java
@@ -20,6 +20,7 @@ package org.apache.kylin.metadata.measure.fixedlen;
 
 import org.apache.kylin.metadata.model.DataType;
 
+
 abstract public class FixedLenMeasureCodec<T> {
 
     public static FixedLenMeasureCodec<?> get(DataType type) {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java b/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
index 356c1e3..8d9ace0 100644
--- a/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
+++ b/query/src/main/java/org/apache/kylin/query/sqlfunc/HLLDistinctCountAggFunc.java
@@ -118,12 +118,12 @@ public class HLLDistinctCountAggFunc {
         }
 
         @Override
-        public void writeRegisters(ByteBuffer out) throws IOException {
+        public void writeCompactRegisters(ByteBuffer out) throws IOException {
             throw new UnsupportedOperationException();
         }
 
         @Override
-        public void readRegisters(ByteBuffer in) throws IOException {
+        public void readCompactRegisters(ByteBuffer in) throws IOException {
             throw new UnsupportedOperationException();
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/0a96d742/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
index 227ecbe..b199862 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase.coprocessor.endpoint;
 
 import com.google.common.collect.Lists;
 
+import com.yammer.metrics.core.Metric;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
@@ -28,6 +29,7 @@ import org.apache.kylin.metadata.measure.MeasureAggregator;
 import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorConstants;
 import org.apache.hadoop.io.LongWritable;
 
@@ -40,10 +42,28 @@ import java.util.List;
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class EndpointAggregators {
 
+    private enum MetricType {
+        Count, DimensionAsMetric, DistinctCount, Normal
+    }
+
+    private static class MetricInfo {
+        private MetricType type;
+        private int refIndex = -1;
+
+        public MetricInfo(MetricType type, int refIndex) {
+            this.type = type;
+            this.refIndex = refIndex;
+        }
+
+        public MetricInfo(MetricType type) {
+            this.type = type;
+        }
+    }
+
     public static EndpointAggregators fromFunctions(TableRecordInfo tableInfo, List<FunctionDesc> metrics) {
         String[] funcNames = new String[metrics.size()];
         String[] dataTypes = new String[metrics.size()];
-        int[] refColIndex = new int[metrics.size()];
+        MetricInfo[] metricInfos = new MetricInfo[metrics.size()];
 
         for (int i = 0; i < metrics.size(); i++) {
             FunctionDesc functionDesc = metrics.get(i);
@@ -53,34 +73,40 @@ public class EndpointAggregators {
             dataTypes[i] = functionDesc.getReturnType();
 
             if (functionDesc.isCount()) {
-                refColIndex[i] = -1;//-1 for count, -2 for metricOnDimension
+                metricInfos[i] = new MetricInfo(MetricType.Count);
             } else if (functionDesc.isAppliedOnDimension()) {
-                refColIndex[i] = -2;
+                metricInfos[i] = new MetricInfo(MetricType.DimensionAsMetric);
             } else {
-                refColIndex[i] = tableInfo.findMetric(functionDesc.getParameter().getValue());
-                if (refColIndex[i] < 0) {
+                int index = tableInfo.findMetric(functionDesc.getParameter().getValue());
+                if (index < 0) {
                     throw new IllegalStateException("Column " + functionDesc.getParameter().getColRefs().get(0) + " is not found in II");
                 }
+
+                if (functionDesc.isCountDistinct()) {
+                    metricInfos[i] = new MetricInfo(MetricType.DistinctCount, index);
+                } else {
+                    metricInfos[i] = new MetricInfo(MetricType.Normal, index);
+                }
             }
         }
 
-        return new EndpointAggregators(funcNames, dataTypes, refColIndex, tableInfo.getDigest());
+        return new EndpointAggregators(funcNames, dataTypes, metricInfos, tableInfo.getDigest());
     }
 
     final String[] funcNames;
     final String[] dataTypes;
-    final int[] refColIndex;
+    final MetricInfo[] metricInfos;
     final TableRecordInfoDigest tableRecordInfo;
 
     final transient FixedLenMeasureCodec[] measureSerializers;
     final transient Object[] metricValues;
 
-    final LongWritable one = new LongWritable(1);
+    final LongWritable ONE = new LongWritable(1);
 
-    public EndpointAggregators(String[] funcNames, String[] dataTypes, int[] refColIndex, TableRecordInfoDigest tableInfo) {
+    public EndpointAggregators(String[] funcNames, String[] dataTypes, MetricInfo[] metricInfos, TableRecordInfoDigest tableInfo) {
         this.funcNames = funcNames;
         this.dataTypes = dataTypes;
-        this.refColIndex = refColIndex;
+        this.metricInfos = metricInfos;
         this.tableRecordInfo = tableInfo;
 
         this.metricValues = new Object[funcNames.length];
@@ -111,12 +137,19 @@ public class EndpointAggregators {
         int rawIndex = 0;
         int columnCount = tableRecordInfo.getColumnCount();
 
-        //normal column values to aggregate
         for (int columnIndex = 0; columnIndex < columnCount; ++columnIndex) {
-            if (tableRecordInfo.isMetrics(columnIndex)) {
-                for (int metricIndex = 0; metricIndex < refColIndex.length; ++metricIndex) {
-                    if (refColIndex[metricIndex] == columnIndex) {
+            for (int metricIndex = 0; metricIndex < metricInfos.length; ++metricIndex) {
+                if (metricInfos[metricIndex].refIndex == columnIndex) {
+                    if (metricInfos[metricIndex].type == MetricType.Normal) {
+                        //normal column values to aggregate
                         measureAggrs[metricIndex].aggregate(measureSerializers[metricIndex].read(row, rawIndex));
+                    } else if (metricInfos[metricIndex].type == MetricType.DistinctCount) {
+                        if (tableRecordInfo.isMetrics(columnCount)) {
+                            measureAggrs[metricIndex].aggregate(measureSerializers[metricIndex].read(row, rawIndex));
+                        } else {
+                            //TODO: for unified dictionary, this is okay. but if different data blocks uses different dictionary, we'll have to aggregate original data
+                            measureAggrs[metricIndex].aggregate(tableRecordInfo.);
+                        }
                     }
                 }
             }
@@ -124,9 +157,11 @@ public class EndpointAggregators {
         }
 
         //aggregate for "count"
-        for (int i = 0; i < refColIndex.length; ++i) {
-            if (refColIndex[i] == -1) {
-                measureAggrs[i].aggregate(one);
+        for (int i = 0; i < metricInfos.length; ++i) {
+            if (metricInfos[i].type == MetricType.Count) {
+                measureAggrs[i].aggregate(ONE);
+            } else if (metricInfos[i].type == MetricType.DistinctCount) {
+
             }
         }
     }
@@ -180,7 +215,7 @@ public class EndpointAggregators {
         public void serialize(EndpointAggregators value, ByteBuffer out) {
             BytesUtil.writeAsciiStringArray(value.funcNames, out);
             BytesUtil.writeAsciiStringArray(value.dataTypes, out);
-            BytesUtil.writeIntArray(value.refColIndex, out);
+            BytesUtil.writeIntArray(value.metricInfos, out);
             BytesUtil.writeByteArray(TableRecordInfoDigest.serialize(value.tableRecordInfo), out);
         }