You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/05 00:09:59 UTC

[18/21] kylin git commit: KYLIN-976 Clean up, remove all explict check on TopN & HLLC

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
index fbb258f..42946eb 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/translate/HBaseKeyRange.java
@@ -166,7 +166,7 @@ public class HBaseKeyRange implements Comparable<HBaseKeyRange> {
 
         List<Map<TblColRef, String>> fuzzyValues = FuzzyValueCombination.calculate(fuzzyValueSet, FUZZY_VALUE_CAP);
         for (Map<TblColRef, String> fuzzyValue : fuzzyValues) {
-            result.add(new Pair<byte[], byte[]>(fuzzyKeyEncoder.encode(fuzzyValue), fuzzyMaskEncoder.encode(fuzzyValue)));
+            result.add(Pair.newPair(fuzzyKeyEncoder.encode(fuzzyValue), fuzzyMaskEncoder.encode(fuzzyValue)));
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
index 152b4af..f2b34fc 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTable.java
@@ -94,6 +94,6 @@ public class DFSFileTable implements ReadableTable {
             lastModified = Math.max(lastModified, file.getModificationTime());
         }
 
-        return new Pair<Long, Long>(size, lastModified);
+        return Pair.newPair(size, lastModified);
     }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 45cc88e..3dddece 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -157,8 +157,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
         int colParamIdx = 0; // index among parameters of column type
         for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
             String value;
-            if (function.isCount() || function.isHolisticCountDistinct()) {
-                // note for holistic count distinct, this value will be ignored
+            if (function.isCount()) {
                 value = "1";
             } else if (param.isColumnType()) {
                 value = getCell(colIdxOnFlatTable[colParamIdx++], splitBuffers);

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index 3edaefb..daa610f 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -170,9 +170,8 @@ public class CuboidJob extends AbstractHadoopJob {
         // number of reduce tasks
         int numReduceTasks = (int) Math.round(totalReduceInputMB / perReduceInputMB * reduceCountRatio);
 
-        // adjust reducer number for cube which has DISTINCT_COUNT measures for
-        // better performance
-        if (cubeDesc.hasHolisticCountDistinctMeasures()) {
+        // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance
+        if (cubeDesc.hasMemoryHungryMeasures()) {
             numReduceTasks = numReduceTasks * 4;
         }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index b1527e6..18bce34 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -124,7 +124,7 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
             MeasureDesc measureDesc = measureDescs.get(i);
             MeasureType measureType = measureDesc.getFunction().getMeasureType();
             if (measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == false) {
-                dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester()));
+                dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
             }
         }
         if (dictMeasures.size() > 0) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index cf5049a..682a91c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -126,7 +126,7 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
             MeasureDesc measureDesc = measureDescs.get(i);
             MeasureType measureType = measureDesc.getFunction().getMeasureType();
             if (measureType.getColumnsNeedDictionary(measureDesc.getFunction()).isEmpty() == false) {
-                dictMeasures.add(new Pair<Integer, MeasureIngester>(i, measureType.newIngester()));
+                dictMeasures.add(Pair.newPair(i, measureType.newIngester()));
             }
         }
         if (dictMeasures.size() > 0) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 8f65691..c27b3f1 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -63,6 +63,7 @@ import org.apache.kylin.common.util.AbstractApplication;
 import org.apache.kylin.common.util.OptionsHelper;
 import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -324,11 +325,11 @@ public class SparkCubing extends AbstractApplication {
         }
 
         for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
-            if (measureDesc.getFunction().isTopN()) {
-                List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
-                TblColRef col = colRefs.get(colRefs.size() - 1);
-                dictionaryMap.put(col, cubeSegment.getDictionary(col));
-            }
+			FunctionDesc func = measureDesc.getFunction();
+			List<TblColRef> colRefs = func.getMeasureType().getColumnsNeedDictionary(func);
+			for (TblColRef col : colRefs) {
+				dictionaryMap.put(col, cubeSegment.getDictionary(col));
+			}
         }
 
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java
index c9a78a4..bd952a1 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/measure/FixedLenMeasureCodec.java
@@ -20,12 +20,13 @@ package org.apache.kylin.invertedindex.measure;
 
 import java.nio.ByteBuffer;
 
+import org.apache.kylin.measure.hllc.HLLCMeasureType;
 import org.apache.kylin.metadata.datatype.DataType;
 
 abstract public class FixedLenMeasureCodec<T> {
 
     public static FixedLenMeasureCodec<?> get(DataType type) {
-        if (type.isHLLC()) {
+        if (HLLCMeasureType.DATATYPE_HLLC.equals(type.getName())) {
             return new FixedHLLCodec(type);
         } else {
             return new FixedPointLongCodec(type);

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
deleted file mode 100644
index 16748a6..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTopNTupleIterator.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase.cube.v1;
-
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.Tuple;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
-import org.apache.kylin.storage.translate.HBaseKeyRange;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-/**
- */
-public class CubeSegmentTopNTupleIterator extends CubeSegmentTupleIterator{
-
-    private Iterator<Tuple> innerResultIterator;
-    
-    public CubeSegmentTopNTupleIterator(CubeSegment cubeSeg, List<HBaseKeyRange> keyRanges, HConnection conn, //
-                                    Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, TblColRef topNCol, //
-                                    List<RowValueDecoder> rowValueDecoders, StorageContext context, TupleInfo returnTupleInfo) {
-        super(cubeSeg, keyRanges, conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
-        this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo, topNCol);
-    }
-    
-    @Override
-    public boolean hasNext() {
-        if (next != null)
-            return true;
-
-
-        if (innerResultIterator == null) {
-            if (resultIterator == null) {
-                if (rangeIterator.hasNext() == false)
-                    return false;
-
-                resultIterator = doScan(rangeIterator.next());
-            }
-
-            if (resultIterator.hasNext() == false) {
-                closeScanner();
-                resultIterator = null;
-                innerResultIterator = null;
-                return hasNext();
-            }
-
-            Result result = resultIterator.next();
-            scanCount++;
-            if (++scanCountDelta >= 1000)
-                flushScanCountDelta();
-            innerResultIterator = tupleConverter.translateTopNResult(result, oneTuple);
-        }
-
-        if (innerResultIterator.hasNext()) {
-            next = innerResultIterator.next();
-            return true;
-        } else {
-            innerResultIterator = null;
-            return hasNext();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
index 4acd8f8..50cfefa 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeSegmentTupleIterator.java
@@ -99,7 +99,7 @@ public class CubeSegmentTupleIterator implements ITupleIterator {
             assert cuboid.equals(range.getCuboid());
         }
 
-        this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo, null);
+        this.tupleConverter = new CubeTupleConverter(cubeSeg, cuboid, rowValueDecoders, returnTupleInfo);
         this.oneTuple = new Tuple(returnTupleInfo);
         this.rangeIterator = keyRanges.iterator();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index 063e254..c115f9c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -48,6 +48,7 @@ import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.cube.model.HBaseMappingDesc;
 import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
@@ -95,9 +96,8 @@ public class CubeStorageQuery implements ICachableStorageQuery {
 
     @Override
     public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-
-        // check whether this is a TopN query
-        checkAndRewriteTopN(sqlDigest);
+        // allow custom measures hack
+        notifyBeforeStorageQuery(sqlDigest);
 
         Collection<TblColRef> groups = sqlDigest.groupbyColumns;
         TupleFilter filter = sqlDigest.filter;
@@ -378,12 +378,8 @@ public class CubeStorageQuery implements ICachableStorageQuery {
             for (HBaseColumnDesc hbCol : hbCols) {
                 bestHBCol = hbCol;
                 bestIndex = hbCol.findMeasure(aggrFunc);
-                MeasureDesc measure = hbCol.getMeasures()[bestIndex];
-                // criteria for holistic measure: Exact Aggregation && Exact Cuboid
-                if (measure.getFunction().isHolisticCountDistinct() && context.isExactAggregation()) {
-                    logger.info("Holistic count distinct chosen for " + aggrFunc);
-                    break;
-                }
+                // we used to prefer specific measure over another (holistic distinct count), now it's gone
+                break;
             }
 
             RowValueDecoder codec = codecMap.get(bestHBCol);
@@ -716,7 +712,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
     }
 
     private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) {
-        if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) {
+        if (RowValueDecoder.hasMemHungryMeasures(valueDecoders) == false) {
             return;
         }
 
@@ -754,33 +750,11 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
     }
 
-    private void checkAndRewriteTopN(SQLDigest sqlDigest) {
-        FunctionDesc topnFunc = null;
-        TblColRef topnLiteralCol = null;
+    private void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
         for (MeasureDesc measure : cubeDesc.getMeasures()) {
-            FunctionDesc func = measure.getFunction();
-            if (func.isTopN() && sqlDigest.groupbyColumns.contains(func.getTopNLiteralColumn())) {
-                topnFunc = func;
-                topnLiteralCol = func.getTopNLiteralColumn();
-            }
+            MeasureType<?> measureType = measure.getFunction().getMeasureType();
+            measureType.beforeStorageQuery(measure, sqlDigest);
         }
-
-        // if TopN is not involved
-        if (topnFunc == null)
-            return;
-
-        if (sqlDigest.aggregations.size() != 1) {
-            throw new IllegalStateException("When query with topN, only one metrics is allowed.");
-        }
-
-        FunctionDesc origFunc = sqlDigest.aggregations.iterator().next();
-        if (origFunc.isSum() == false) {
-            throw new IllegalStateException("When query with topN, only SUM function is allowed.");
-        }
-
-        sqlDigest.aggregations = Lists.newArrayList(topnFunc);
-        sqlDigest.groupbyColumns.remove(topnLiteralCol);
-        sqlDigest.metricColumns.add(topnLiteralCol);
-        logger.info("Rewrite function " + origFunc + " to " + topnFunc);
     }
+    
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
index a669e1b..e414e08 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeTupleConverter.java
@@ -2,18 +2,12 @@ package org.apache.kylin.storage.hbase.cube.v1;
 
 import java.io.IOException;
 import java.util.BitSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
 import org.apache.hadoop.hbase.client.Result;
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -40,19 +34,14 @@ public class CubeTupleConverter {
     final int[] dimensionTupleIdx;
     final int[][] metricsMeasureIdx;
     final int[][] metricsTupleIdx;
-    final TblColRef topNCol;
-    int topNColTupleIdx;
-    int topNMeasureTupleIdx;
-    Dictionary<String> topNColDict;
 
-    public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo, TblColRef topNCol) {
+    public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, List<RowValueDecoder> rowValueDecoders, TupleInfo tupleInfo) {
         this.cubeSeg = cubeSeg;
         this.cuboid = cuboid;
         this.tupleInfo = tupleInfo;
         this.rowKeyDecoder = new RowKeyDecoder(this.cubeSeg);
         this.rowValueDecoders = rowValueDecoders;
         this.derivedColFillers = Lists.newArrayList();
-        this.topNCol = topNCol;
         
         List<TblColRef> dimCols = cuboid.getColumns();
 
@@ -92,13 +81,6 @@ public class CubeTupleConverter {
             }
         }
 
-        if (this.topNCol != null) {
-            this.topNColTupleIdx = tupleInfo.hasColumn(this.topNCol) ? tupleInfo.getColumnIndex(this.topNCol) : -1;
-            this.topNMeasureTupleIdx = metricsTupleIdx[0][0];
-            
-            this.topNColDict = (Dictionary<String>)cubeSeg.getDictionary(this.topNCol);
-        }
-        
         // prepare derived columns and filler
         Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(dimCols, null);
         for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
@@ -112,46 +94,6 @@ public class CubeTupleConverter {
         }
     }
 
-    public Iterator<Tuple> translateTopNResult(Result hbaseRow, Tuple tuple) {
-        translateResult(hbaseRow, tuple);
-        Object topNCounterObj = tuple.getAllValues()[topNMeasureTupleIdx];
-        assert (topNCounterObj instanceof TopNCounter);
-        return new TopNCounterTupleIterator(tuple, (TopNCounter) topNCounterObj);
-    }
-
-    private class TopNCounterTupleIterator implements Iterator {
-
-        private Tuple tuple;
-        private Iterator<Counter> topNCounterIterator;
-        private Counter<ByteArray> counter;
-        
-        private TopNCounterTupleIterator(Tuple tuple, TopNCounter topNCounter) {
-            this.tuple = tuple;
-            this.topNCounterIterator = topNCounter.iterator();
-        }
-        
-        @Override
-        public boolean hasNext() {
-            return topNCounterIterator.hasNext();
-        }
-
-        @Override
-        public Tuple next() {
-            counter = topNCounterIterator.next();
-            int key = BytesUtil.readUnsigned(counter.getItem().array(), 0, counter.getItem().array().length);
-            String colValue = topNColDict.getValueFromId(key);
-            tuple.setDimensionValue(topNColTupleIdx, colValue);
-            tuple.setMeasureValue(topNMeasureTupleIdx, counter.getCount());
-            
-            return tuple;
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-    }
-    
     public void translateResult(Result hbaseRow, Tuple tuple) {
         try {
             byte[] rowkey = hbaseRow.getRow();

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
index 3f92cb0..e8dd5b9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
@@ -70,7 +69,8 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
         Map<CubeSegment, List<HBaseKeyRange>> rangesMap = makeRangesMap(segmentKeyRanges);
 
         for (Map.Entry<CubeSegment, List<HBaseKeyRange>> entry : rangesMap.entrySet()) {
-            this.segmentIteratorList.add(newCubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo));
+            CubeSegmentTupleIterator it = new CubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
+            this.segmentIteratorList.add(it);
         }
 
         this.segmentIteratorIterator = this.segmentIteratorList.iterator();
@@ -81,16 +81,6 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
         }
     }
 
-    private CubeSegmentTupleIterator newCubeSegmentTupleIterator(CubeSegment seg, List<HBaseKeyRange> keyRange, HConnection conn, Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context2, TupleInfo returnTupleInfo) {
-        MeasureDesc topN = RowValueDecoder.findTopN(rowValueDecoders);
-        if (topN != null) {
-            TblColRef topNCol = topN.getFunction().getTopNLiteralColumn();
-            return new CubeSegmentTopNTupleIterator(seg, keyRange, conn, dimensions, filter, groupBy, topNCol, rowValueDecoders, context, returnTupleInfo);
-        } else {
-            return new CubeSegmentTupleIterator(seg, keyRange, conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
-        }
-    }
-
     private Map<CubeSegment, List<HBaseKeyRange>> makeRangesMap(List<HBaseKeyRange> segmentKeyRanges) {
         Map<CubeSegment, List<HBaseKeyRange>> map = Maps.newHashMap();
         for (HBaseKeyRange range : segmentKeyRanges) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
index 521f111..5b16b04 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/coprocessor/observer/ObserverAggregators.java
@@ -32,6 +32,9 @@ import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.measure.MeasureAggregator;
 import org.apache.kylin.measure.MeasureCodec;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureTypeFactory;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.storage.hbase.common.coprocessor.CoprocessorConstants;
 import org.apache.kylin.storage.hbase.steps.RowValueDecoder;
@@ -122,6 +125,8 @@ public class ObserverAggregators {
     final int nHCols;
     final ByteBuffer[] hColValues;
     final int nTotalMeasures;
+    
+    MeasureType measureTypes[];
 
     public ObserverAggregators(HCol[] _hcols) {
         this.hcols = sort(_hcols);
@@ -150,11 +155,18 @@ public class ObserverAggregators {
     }
 
     public MeasureAggregator[] createBuffer() {
+        if (measureTypes == null) {
+            measureTypes = new MeasureType[nTotalMeasures];
+            int i = 0;
+            for (HCol col : hcols) {
+                for (int j = 0; j < col.nMeasures; j++)
+                    measureTypes[i++] = MeasureTypeFactory.create(col.funcNames[j], DataType.getType(col.dataTypes[j]));
+            }
+        }
+        
         MeasureAggregator[] aggrs = new MeasureAggregator[nTotalMeasures];
-        int i = 0;
-        for (HCol col : hcols) {
-            for (int j = 0; j < col.nMeasures; j++)
-                aggrs[i++] = MeasureAggregator.create(col.funcNames[j], col.dataTypes[j]);
+        for (int i = 0; i < nTotalMeasures; i++) {
+            aggrs[i] = measureTypes[i].newAggregator();
         }
         return aggrs;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java
index db05a40..ea33f9a 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/filter/FuzzyRowFilterV2.java
@@ -80,7 +80,7 @@ public class FuzzyRowFilterV2 extends FilterBase {
         for (int i = 0; i < fuzzyKeysData.size(); i++) {
             p = fuzzyKeysData.get(i);
             if (p.getFirst().length != p.getSecond().length) {
-                Pair<String, String> readable = new Pair<String, String>(Bytes.toStringBinary(p.getFirst()), Bytes.toStringBinary(p.getSecond()));
+                Pair<String, String> readable = Pair.newPair(Bytes.toStringBinary(p.getFirst()), Bytes.toStringBinary(p.getSecond()));
                 throw new IllegalArgumentException("Fuzzy pair lengths do not match: " + readable);
             }
             // update mask ( 0 -> -1 (0xff), 1 -> 0)
@@ -225,7 +225,7 @@ public class FuzzyRowFilterV2 extends FilterBase {
         void updateWith(Cell currentCell, Pair<byte[], byte[]> fuzzyData) {
             byte[] nextRowKeyCandidate = getNextForFuzzyRule(isReversed(), currentCell.getRowArray(), currentCell.getRowOffset(), currentCell.getRowLength(), fuzzyData.getFirst(), fuzzyData.getSecond());
             if (nextRowKeyCandidate != null) {
-                nextRows.add(new Pair<byte[], Pair<byte[], byte[]>>(nextRowKeyCandidate, fuzzyData));
+                nextRows.add(Pair.newPair(nextRowKeyCandidate, fuzzyData));
             }
         }
 
@@ -263,7 +263,7 @@ public class FuzzyRowFilterV2 extends FilterBase {
             FilterProtosExt.BytesBytesPair current = proto.getFuzzyKeysData(i);
             byte[] keyBytes = current.getFirst().toByteArray();
             byte[] keyMeta = current.getSecond().toByteArray();
-            fuzzyKeysData.add(new Pair<byte[], byte[]>(keyBytes, keyMeta));
+            fuzzyKeysData.add(Pair.newPair(keyBytes, keyMeta));
         }
         return new FuzzyRowFilterV2(fuzzyKeysData);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
index 412e7602..6a6c887 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseRPC.java
@@ -134,7 +134,7 @@ public abstract class CubeHBaseRPC {
             fuzzyKeyEncoder.encode(gtRecordFuzzyKey, gtRecordFuzzyKey.getInfo().getPrimaryKey(), hbaseFuzzyKey);
             fuzzyMaskEncoder.encode(gtRecordFuzzyKey, gtRecordFuzzyKey.getInfo().getPrimaryKey(), hbaseFuzzyMask);
 
-            ret.add(new Pair<byte[], byte[]>(hbaseFuzzyKey, hbaseFuzzyMask));
+            ret.add(Pair.newPair(hbaseFuzzyKey, hbaseFuzzyMask));
         }
 
         return ret;
@@ -153,7 +153,7 @@ public abstract class CubeHBaseRPC {
             for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
                 if (selectedColBlocks.get(colBlkIndex)) {
                     byte[] byteQualifier = Bytes.toBytes(hbaseColDesc.getQualifier());
-                    result.add(new Pair<byte[], byte[]>(byteFamily, byteQualifier));
+                    result.add(Pair.newPair(byteFamily, byteQualifier));
                 }
                 colBlkIndex++;
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index 53b690c..592451f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -52,7 +52,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
 
     @Override
     public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-        // check whether this is a TopN query
+        // allow custom measures hack
         notifyBeforeStorageQuery(sqlDigest);
 
         Collection<TblColRef> groups = sqlDigest.groupbyColumns;
@@ -86,10 +86,6 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         boolean isExactAggregation = isExactAggregation(cuboid, groups, filterDimsD, singleValuesD, derivedPostAggregation);
         context.setExactAggregation(isExactAggregation);
 
-        if (isExactAggregation) {
-            metrics = replaceHolisticCountDistinct(metrics);
-        }
-
         // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
         TupleFilter filterD = translateDerived(filter, groupsD);
 
@@ -111,22 +107,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         if (scanners.isEmpty())
             return ITupleIterator.EMPTY_TUPLE_ITERATOR;
 
-        return newSequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
-    }
-
-    private ITupleIterator newSequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> dimensionsD, Set<FunctionDesc> metrics, TupleInfo returnTupleInfo, StorageContext context) {
-        TblColRef topNCol = null;
-        for (FunctionDesc func : metrics) {
-            if (func.isTopN()) {
-                topNCol = func.getTopNLiteralColumn();
-                break;
-            }
-        }
-
-        if (topNCol != null)
-            return new SequentialCubeTopNTupleIterator(scanners, cuboid, dimensionsD, topNCol, metrics, returnTupleInfo, context);
-        else
-            return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
+        return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
     }
 
     private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
@@ -248,27 +229,6 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         return exact;
     }
 
-    private Set<FunctionDesc> replaceHolisticCountDistinct(Set<FunctionDesc> metrics) {
-        // for count distinct, try use its holistic version if possible
-        Set<FunctionDesc> result = new LinkedHashSet<FunctionDesc>();
-        for (FunctionDesc metric : metrics) {
-            if (metric.isCountDistinct() == false) {
-                result.add(metric);
-                continue;
-            }
-
-            FunctionDesc holisticVersion = null;
-            for (MeasureDesc measure : cubeDesc.getMeasures()) {
-                FunctionDesc measureFunc = measure.getFunction();
-                if (measureFunc.equals(metric) && measureFunc.isHolisticCountDistinct()) {
-                    holisticVersion = measureFunc;
-                }
-            }
-            result.add(holisticVersion == null ? metric : holisticVersion);
-        }
-        return result;
-    }
-
     @SuppressWarnings("unchecked")
     private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
         if (filter == null)
@@ -343,15 +303,13 @@ public class CubeStorageQuery implements ICachableStorageQuery {
     }
 
     private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
-        boolean hasMemHungryCountDistinct = false;
+        boolean hasMemHungryMeasure = false;
         for (FunctionDesc func : metrics) {
-            if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
-                hasMemHungryCountDistinct = true;
-            }
+            hasMemHungryMeasure |= func.getMeasureType().isMemoryHungry();
         }
 
-        // need to limit the memory usage for memory hungry count distinct
-        if (hasMemHungryCountDistinct == false) {
+        // need to limit the memory usage for memory hungry measures
+        if (hasMemHungryMeasure == false) {
             return;
         }
 
@@ -380,6 +338,13 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         }
     }
 
+    private void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
+        for (MeasureDesc measure : cubeDesc.getMeasures()) {
+            MeasureType<?> measureType = measure.getFunction().getMeasureType();
+            measureType.beforeStorageQuery(measure, sqlDigest);
+        }
+    }
+    
     // ============================================================================
 
     @Override
@@ -397,10 +362,4 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         return false;
     }
 
-    private void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
-        for (MeasureDesc measure : cubeDesc.getMeasures()) {
-            MeasureType measureType = measure.getFunction().getMeasureType();
-            measureType.beforeStorageQuery(measure, sqlDigest);
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
index 3653af1..a7346af 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
@@ -18,17 +18,12 @@
 
 package org.apache.kylin.storage.hbase.cube.v2;
 
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.TopNCounter;
 import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -60,24 +55,19 @@ public class CubeTupleConverter {
     final int[] gtColIdx;
     final int[] tupleIdx;
     final Object[] gtValues;
-    final MeasureType[] measureTypes;
+    final MeasureType<?>[] measureTypes;
     
     final List<IAdvMeasureFiller> advMeasureFillers;
     final List<Integer> advMeasureIndexInGTValues;
     
     final int nSelectedDims;
-    final TblColRef topNCol;
-    int topNColTupleIdx;
-    int topNMeasureTupleIdx;
-    Dictionary<String> topNColDict;
 
     public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
-            Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, TblColRef topNCol) {
+            Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
         this.cubeSeg = cubeSeg;
         this.cuboid = cuboid;
         this.tupleInfo = returnTupleInfo;
         this.derivedColFillers = Lists.newArrayList();
-        this.topNCol = topNCol;
 
         List<TblColRef> cuboidDims = cuboid.getColumns();
         CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
@@ -117,7 +107,7 @@ public class CubeTupleConverter {
                 tupleIdx[iii] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
             }
             
-            MeasureType measureType = metric.getMeasureType();
+            MeasureType<?> measureType = metric.getMeasureType();
             if (measureType.needAdvancedTupleFilling()) {
                 Map<TblColRef, Dictionary<String>> dictionaryMap = buildDictionaryMap(measureType.getColumnsNeedDictionary(metric));
                 advMeasureFillers.add(measureType.getAdvancedTupleFiller(metric, returnTupleInfo, dictionaryMap));
@@ -129,14 +119,6 @@ public class CubeTupleConverter {
             iii++;
         }
 
-        if (this.topNCol != null) {
-            this.topNColTupleIdx = tupleInfo.hasColumn(this.topNCol) ? tupleInfo.getColumnIndex(this.topNCol) : -1;
-            // topN only allow 1 measure
-            this.topNMeasureTupleIdx = tupleIdx[tupleIdx.length - 1];
-
-            this.topNColDict = (Dictionary<String>)cubeSeg.getDictionary(this.topNCol);
-        }
-
         // prepare derived columns and filler
         Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboidDims, null);
         for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
@@ -196,13 +178,6 @@ public class CubeTupleConverter {
         }
     }
 
-    public Iterator<Tuple> translateTopNResult(GTRecord record, Tuple tuple) {
-        translateResult(record, tuple);
-        Object topNCounterObj = tuple.getAllValues()[topNMeasureTupleIdx];
-        assert (topNCounterObj instanceof TopNCounter);
-        return new TopNCounterTupleIterator(tuple, (TopNCounter) topNCounterObj);
-    }
-    
     private interface IDerivedColumnFiller {
         public void fillDerivedColumns(Object[] gtValues, Tuple tuple);
     }
@@ -292,38 +267,4 @@ public class CubeTupleConverter {
     private static String toString(Object o) {
         return o == null ? null : o.toString();
     }
-
-    private class TopNCounterTupleIterator implements Iterator {
-
-        private Tuple tuple;
-        private Iterator<Counter> topNCounterIterator;
-        private Counter<ByteArray> counter;
-
-        private TopNCounterTupleIterator(Tuple tuple, TopNCounter topNCounter) {
-            this.tuple = tuple;
-            this.topNCounterIterator = topNCounter.iterator();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return topNCounterIterator.hasNext();
-        }
-
-        @Override
-        public Tuple next() {
-            counter = topNCounterIterator.next();
-            int key = BytesUtil.readUnsigned(counter.getItem().array(), 0, counter.getItem().array().length);
-            String colValue = topNColDict.getValueFromId(key);
-            tuple.setDimensionValue(topNColTupleIdx, colValue);
-            tuple.setMeasureValue(topNMeasureTupleIdx, counter.getCount());
-
-            return tuple;
-        }
-
-        @Override
-        public void remove() {
-            throw new UnsupportedOperationException();
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
index 303c360..ff7498b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/HBaseReadonlyStore.java
@@ -26,7 +26,6 @@ import java.util.List;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java
deleted file mode 100644
index 9eeae4e..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTopNTupleIterator.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.apache.kylin.storage.hbase.cube.v2;
-
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.Tuple;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Created by shaoshi on 10/28/15.
- */
-public class SequentialCubeTopNTupleIterator extends SequentialCubeTupleIterator {
-
-    private Iterator<Tuple> innerResultIterator;
-    private TblColRef topNCol;
-
-    public SequentialCubeTopNTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
-            TblColRef topNCol, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
-
-        super(scanners, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context);
-        this.topNCol = topNCol;
-    }
-
-    @Override
-    public boolean hasNext() {
-        if (next != null)
-            return true;
-        if (innerResultIterator == null) {
-            if (curScanner == null) {
-                if (scannerIterator.hasNext()) {
-                    curScanner = scannerIterator.next();
-                    curRecordIterator = curScanner.iterator();
-                    curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo, topNCol);
-                } else {
-                    return false;
-                }
-            }
-            
-            if (curRecordIterator.hasNext()) {
-                innerResultIterator = curTupleConverter.translateTopNResult(curRecordIterator.next(), tuple);
-                return hasNext();
-            } else {
-                close(curScanner);
-                curScanner = null;
-                curRecordIterator = null;
-                curTupleConverter = null;
-                innerResultIterator = null;
-                return hasNext();
-            }
-
-        }
-
-       
-        if (innerResultIterator.hasNext()) {
-            next = innerResultIterator.next();
-            return true;
-        } else {
-            innerResultIterator = null;
-            return hasNext();
-        }
-        
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
index 5bf5e95..7812baf 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
@@ -75,7 +75,7 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
             if (scannerIterator.hasNext()) {
                 curScanner = scannerIterator.next();
                 curRecordIterator = curScanner.iterator();
-                curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo, null);
+                curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
             } else {
                 return false;
             }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
index 036d9b8..10e80ae 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointAggregators.java
@@ -30,6 +30,7 @@ import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
 import org.apache.kylin.invertedindex.measure.FixedLenMeasureCodec;
 import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.measure.hllc.HLLCMeasureType;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.datatype.LongMutable;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -78,7 +79,7 @@ public class EndpointAggregators {
         } else {
             int index = tableInfo.findFactTableColumn(functionDesc.getParameter().getValue());
             Preconditions.checkState(index >= 0, "Column " + functionDesc.getParameter().getValue() + " is not found in II");
-            if (functionDesc.isCountDistinct()) {
+            if (HLLCMeasureType.isCountDistinct(functionDesc)) {
                 return new MetricInfo(MetricType.DistinctCount, index, functionDesc.getReturnDataType().getPrecision());
             } else {
                 return new MetricInfo(MetricType.Normal, index);
@@ -144,10 +145,10 @@ public class EndpointAggregators {
         MeasureAggregator[] aggrs = new MeasureAggregator[funcNames.length];
         for (int i = 0; i < aggrs.length; i++) {
             if (metricInfos[i].type == MetricType.DistinctCount) {
-                aggrs[i] = MeasureAggregator.create(funcNames[i], dataTypes[i]);
+                aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getType(dataTypes[i]));
             } else {
                 //all other fixed length measures can be aggregated as long
-                aggrs[i] = MeasureAggregator.create(funcNames[i], "long");
+                aggrs[i] = MeasureAggregator.create(funcNames[i], DataType.getType("long"));
             }
         }
         return aggrs;

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
index e638386..4ec421b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
@@ -42,6 +42,7 @@ import org.apache.kylin.common.util.RangeUtil;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.index.TableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.measure.hllc.HLLCMeasureType;
 import org.apache.kylin.metadata.filter.ConstantTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -205,7 +206,7 @@ public class EndpointTupleIterator implements ITupleIterator {
                 boolean updated = false;
                 for (TblColRef column : columns) {
                     if (column.isSameAs(factTableName, functionDesc.getParameter().getValue())) {
-                        if (functionDesc.isCountDistinct()) {
+                        if (HLLCMeasureType.isCountDistinct(functionDesc)) {
                             //TODO: default precision might need be configurable
                             String iiDefaultHLLC = "hllc10";
                             functionDesc.setReturnType(iiDefaultHLLC);

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index a7d0776..c798289 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -374,12 +374,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         int space = 0;
         for (MeasureDesc measureDesc : cubeSegment.getCubeDesc().getMeasures()) {
             DataType returnType = measureDesc.getFunction().getReturnDataType();
-            if (returnType.isHLLC()) {
-                // for HLL, it will be compressed when export to bytes
-                space += returnType.getStorageBytesEstimate() * 0.75;
-            } else {
-                space += returnType.getStorageBytesEstimate();
-            }
+            space += returnType.getStorageBytesEstimate();
         }
         bytesLength += space;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
index 205c0db..e565e86 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2.java
@@ -152,7 +152,7 @@ public class HBaseMROutput2 implements IMROutput2 {
 
             this.parsedKey = new ByteArrayWritable();
             this.parsedValue = new Object[measuresDescs.size()];
-            this.parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue);
+            this.parsedPair = Pair.newPair(parsedKey, parsedValue);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index f190186..dff1e51 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@ -241,7 +241,7 @@ public class HBaseMROutput2Transition implements IMROutput2 {
             if (parsedPair == null) {
                 parsedKey = new ByteArrayWritable();
                 parsedValue = new Object[seg.getCubeDesc().getMeasures().size()];
-                parsedPair = new Pair<ByteArrayWritable, Object[]>(parsedKey, parsedValue);
+                parsedPair = Pair.newPair(parsedKey, parsedValue);
             }
             
             // merge by cuboid files

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
index 617af76..59a5fed 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
@@ -122,34 +122,21 @@ public class RowValueDecoder implements Cloneable {
         }
     }
 
-    public boolean hasMemHungryCountDistinct() {
+    public boolean hasMemHungryMeasures() {
         for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
             FunctionDesc func = measures[i].getFunction();
-            if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
+            if (func.getMeasureType().isMemoryHungry())
                 return true;
-            }
         }
         return false;
     }
 
-    public static boolean hasMemHungryCountDistinct(Collection<RowValueDecoder> rowValueDecoders) {
+    public static boolean hasMemHungryMeasures(Collection<RowValueDecoder> rowValueDecoders) {
         for (RowValueDecoder decoder : rowValueDecoders) {
-            if (decoder.hasMemHungryCountDistinct())
+            if (decoder.hasMemHungryMeasures())
                 return true;
         }
         return false;
     }
 
-    public static MeasureDesc findTopN(Collection<RowValueDecoder> rowValueDecoders) {
-        for (RowValueDecoder decoder : rowValueDecoders) {
-            for (int i = decoder.projectionIndex.nextSetBit(0); i >= 0; i = decoder.projectionIndex.nextSetBit(i + 1)) {
-                MeasureDesc measure = decoder.measures[i];
-                FunctionDesc func = measure.getFunction();
-                if (func.isTopN())
-                    return measure;
-            }
-        }
-        return null;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
index e71089a..8a88b6d 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
@@ -325,7 +325,7 @@ public class GridTableHBaseBenchmark {
             int logicRows = nRows / nColumns;
             for (int i = 0; i < nColumns; i++) {
                 if (rand.nextDouble() < hitRatio) {
-                    hitsForColumnScan.add(new Pair<Integer, Integer>(i * logicRows, (i + 1) * logicRows));
+                    hitsForColumnScan.add(Pair.newPair(i * logicRows, (i + 1) * logicRows));
                 }
             }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/7f50c17a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java
index f717d82..1d85922 100644
--- a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/cube/v1/filter/TestFuzzyRowFilterV2EndToEnd.java
@@ -167,7 +167,7 @@ public class TestFuzzyRowFilterV2EndToEnd {
             }
             buf.putInt(i);
 
-            Pair<byte[], byte[]> pair = new Pair<byte[], byte[]>(fuzzyKey, mask);
+            Pair<byte[], byte[]> pair = Pair.newPair(fuzzyKey, mask);
             list.add(pair);
         }
 
@@ -200,7 +200,7 @@ public class TestFuzzyRowFilterV2EndToEnd {
             }
             buf.putInt(i * 2);
 
-            Pair<byte[], byte[]> pair = new Pair<byte[], byte[]>(fuzzyKey, mask);
+            Pair<byte[], byte[]> pair = Pair.newPair(fuzzyKey, mask);
             list.add(pair);
         }
 
@@ -309,8 +309,8 @@ public class TestFuzzyRowFilterV2EndToEnd {
 
         byte[] mask2 = new byte[] { 0, 0, 0, 0, 0, 0, 1, 1, 1, 1 };
 
-        Pair<byte[], byte[]> pair1 = new Pair<byte[], byte[]>(fuzzyKey1, mask1);
-        Pair<byte[], byte[]> pair2 = new Pair<byte[], byte[]>(fuzzyKey2, mask2);
+        Pair<byte[], byte[]> pair1 = Pair.newPair(fuzzyKey1, mask1);
+        Pair<byte[], byte[]> pair2 = Pair.newPair(fuzzyKey2, mask2);
 
         FuzzyRowFilterV2 fuzzyRowFilter1 = new FuzzyRowFilterV2(Lists.newArrayList(pair1));
         FuzzyRowFilterV2 fuzzyRowFilter2 = new FuzzyRowFilterV2(Lists.newArrayList(pair2));