You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/21 02:29:28 UTC

[3/7] incubator-kylin git commit: KYLIN-878 HBase storage abstraction for cubing flow

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
deleted file mode 100644
index 549961f..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageEngine.java
+++ /dev/null
@@ -1,371 +0,0 @@
-package org.apache.kylin.storage.cube;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.ICachableStorageEngine;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.DerivedFilterTranslator;
-import org.apache.kylin.storage.tuple.TupleInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-
-public class CubeStorageEngine implements ICachableStorageEngine {
-
-    private static final Logger logger = LoggerFactory.getLogger(CubeStorageEngine.class);
-
-    private static final long MEM_BUDGET_PER_QUERY = 3L * 1024 * 1024 * 1024; // 3G
-
-    private final CubeInstance cubeInstance;
-    private final CubeDesc cubeDesc;
-
-    public CubeStorageEngine(CubeInstance cube) {
-        this.cubeInstance = cube;
-        this.cubeDesc = cube.getDescriptor();
-    }
-
-    @Override
-    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-        Collection<TblColRef> groups = sqlDigest.groupbyColumns;
-        TupleFilter filter = sqlDigest.filter;
-
-        // build dimension & metrics
-        Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>();
-        Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
-        buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
-
-        // all dimensions = groups + filter dimensions
-        Set<TblColRef> filterDims = Sets.newHashSet(dimensions);
-        filterDims.removeAll(groups);
-
-        // expand derived (xxxD means contains host columns only, derived columns were translated)
-        Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
-        Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
-        Set<TblColRef> filterDimsD = expandDerived(filterDims, derivedPostAggregation);
-        filterDimsD.removeAll(groupsD);
-        derivedPostAggregation.removeAll(groups);
-
-        // identify cuboid
-        Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
-        dimensionsD.addAll(groupsD);
-        dimensionsD.addAll(filterDimsD);
-        Cuboid cuboid = identifyCuboid(dimensionsD);
-        context.setCuboid(cuboid);
-
-        // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
-        Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
-        boolean isExactAggregation = isExactAggregation(cuboid, groups, filterDimsD, singleValuesD, derivedPostAggregation);
-        context.setExactAggregation(isExactAggregation);
-        
-        if (isExactAggregation) {
-            metrics = replaceHolisticCountDistinct(metrics);
-        }
-
-        // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
-        TupleFilter filterD = translateDerived(filter, groupsD);
-
-        setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
-        // TODO enable coprocessor
-//        setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
-        setLimit(filter, context);
-
-        List<CubeScanner> scanners = Lists.newArrayList();
-        for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
-            scanners.add(new CubeScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD));
-        }
-        
-        if (scanners.isEmpty())
-            return ITupleIterator.EMPTY_TUPLE_ITERATOR;
-        
-        return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo);
-    }
-
-    private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
-        for (FunctionDesc func : sqlDigest.aggregations) {
-            if (!func.isDimensionAsMetric()) {
-                // use the FunctionDesc from cube desc as much as possible, that has more info such as HLLC precision
-                metrics.add(findAggrFuncFromCubeDesc(func));
-            }
-        }
-
-        for (TblColRef column : sqlDigest.allColumns) {
-            // skip measure columns
-            if (sqlDigest.metricColumns.contains(column)) {
-                continue;
-            }
-            dimensions.add(column);
-        }
-    }
-    
-    private FunctionDesc findAggrFuncFromCubeDesc(FunctionDesc aggrFunc) {
-        for (MeasureDesc measure : cubeDesc.getMeasures()) {
-            if (measure.getFunction().equals(aggrFunc))
-                return measure.getFunction();
-        }
-        return aggrFunc;
-    }
-
-    private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
-        Set<TblColRef> expanded = Sets.newHashSet();
-        for (TblColRef col : cols) {
-            if (cubeDesc.isDerived(col)) {
-                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-                for (TblColRef hostCol : hostInfo.columns) {
-                    expanded.add(hostCol);
-                    if (hostInfo.isOneToOne == false)
-                        derivedPostAggregation.add(hostCol);
-                }
-            } else {
-                expanded.add(col);
-            }
-        }
-        return expanded;
-    }
-
-    private Cuboid identifyCuboid(Set<TblColRef> dimensions) {
-        long cuboidID = 0;
-        for (TblColRef column : dimensions) {
-            int index = cubeDesc.getRowkey().getColumnBitIndex(column);
-            cuboidID |= 1L << index;
-        }
-        return Cuboid.findById(cubeDesc, cuboidID);
-    }
-
-    @SuppressWarnings("unchecked")
-    private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
-        Collection<? extends TupleFilter> toCheck;
-        if (filter instanceof CompareTupleFilter) {
-            toCheck = Collections.singleton(filter);
-        } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
-            toCheck = filter.getChildren();
-        } else {
-            return (Set<TblColRef>) Collections.EMPTY_SET;
-        }
-
-        Set<TblColRef> result = Sets.newHashSet();
-        for (TupleFilter f : toCheck) {
-            if (f instanceof CompareTupleFilter) {
-                CompareTupleFilter compFilter = (CompareTupleFilter) f;
-                // is COL=const ?
-                if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
-                    result.add(compFilter.getColumn());
-                }
-            }
-        }
-
-        // expand derived
-        Set<TblColRef> resultD = Sets.newHashSet();
-        for (TblColRef col : result) {
-            if (cubeDesc.isDerived(col)) {
-                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-                if (hostInfo.isOneToOne) {
-                    for (TblColRef hostCol : hostInfo.columns) {
-                        resultD.add(hostCol);
-                    }
-                }
-                //if not one2one, it will be pruned
-            } else {
-                resultD.add(col);
-            }
-        }
-        return resultD;
-    }
-
-    private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
-        boolean exact = true;
-
-        if (cuboid.requirePostAggregation()) {
-            exact = false;
-            logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
-        }
-
-        // derived aggregation is bad, unless expanded columns are already in group by
-        if (groups.containsAll(derivedPostAggregation) == false) {
-            exact = false;
-            logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
-        }
-
-        // other columns (from filter) is bad, unless they are ensured to have single value
-        if (singleValuesD.containsAll(othersD) == false) {
-            exact = false;
-            logger.info("exactAggregation is false because some column not on group by: " + othersD //
-                    + " (single value column: " + singleValuesD + ")");
-        }
-
-        if (exact) {
-            logger.info("exactAggregation is true");
-        }
-        return exact;
-    }
-
-    private Set<FunctionDesc> replaceHolisticCountDistinct(Set<FunctionDesc> metrics) {
-        // for count distinct, try use its holistic version if possible
-        Set<FunctionDesc> result = new LinkedHashSet<FunctionDesc>();
-        for (FunctionDesc metric : metrics) {
-            if (metric.isCountDistinct() == false) {
-                result.add(metric);
-                continue;
-            }
-            
-            FunctionDesc holisticVersion = null;
-            for (MeasureDesc measure : cubeDesc.getMeasures()) {
-                FunctionDesc measureFunc = measure.getFunction();
-                if (measureFunc.equals(metric) && measureFunc.isHolisticCountDistinct()) {
-                    holisticVersion = measureFunc;
-                }
-            }
-            result.add(holisticVersion == null ? metric : holisticVersion);
-        }
-        return result;
-    }
-
-    @SuppressWarnings("unchecked")
-    private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
-        if (filter == null)
-            return filter;
-
-        if (filter instanceof CompareTupleFilter) {
-            return translateDerivedInCompare((CompareTupleFilter) filter, collector);
-        }
-
-        List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
-        List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
-        boolean modified = false;
-        for (TupleFilter child : children) {
-            TupleFilter translated = translateDerived(child, collector);
-            newChildren.add(translated);
-            if (child != translated)
-                modified = true;
-        }
-        if (modified) {
-            filter = replaceChildren(filter, newChildren);
-        }
-        return filter;
-    }
-
-    private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
-        if (filter instanceof LogicalTupleFilter) {
-            LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
-            r.addChildren(newChildren);
-            return r;
-        } else
-            throw new IllegalStateException("Cannot replaceChildren on " + filter);
-    }
-
-    private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
-        if (compf.getColumn() == null || compf.getValues().isEmpty())
-            return compf;
-
-        TblColRef derived = compf.getColumn();
-        if (cubeDesc.isDerived(derived) == false)
-            return compf;
-
-        DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
-        CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
-        CubeSegment seg = cubeInstance.getLatestReadySegment();
-        LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
-        Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
-        TupleFilter translatedFilter = translated.getFirst();
-        boolean loosened = translated.getSecond();
-        if (loosened) {
-            collectColumnsRecursively(translatedFilter, collector);
-        }
-        return translatedFilter;
-    }
-
-    private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
-        if (filter instanceof ColumnTupleFilter) {
-            collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
-        }
-        for (TupleFilter child : filter.getChildren()) {
-            collectColumnsRecursively(child, collector);
-        }
-    }
-
-    private void collectColumns(TblColRef col, Set<TblColRef> collector) {
-        if (cubeDesc.isDerived(col)) {
-            DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-            for (TblColRef h : hostInfo.columns)
-                collector.add(h);
-        } else {
-            collector.add(col);
-        }
-    }
-
-    private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
-        boolean hasMemHungryCountDistinct = false;
-        for (FunctionDesc func : metrics) {
-            if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
-                hasMemHungryCountDistinct = true;
-            }
-        }
-
-        // need to limit the memory usage for memory hungry count distinct
-        if (hasMemHungryCountDistinct == false) {
-            return;
-        }
-
-        int rowSizeEst = dimensions.size() * 3;
-        for (FunctionDesc func : metrics) {
-            rowSizeEst += func.getReturnDataType().getSpaceEstimate();
-        }
-
-        long rowEst = MEM_BUDGET_PER_QUERY / rowSizeEst;
-        context.setThreshold((int) rowEst);
-    }
-
-    private void setLimit(TupleFilter filter, StorageContext context) {
-        boolean goodAggr = context.isExactAggregation();
-        boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
-        boolean goodSort = context.hasSort() == false;
-        if (goodAggr && goodFilter && goodSort) {
-            logger.info("Enable limit " + context.getLimit());
-            context.enableLimit();
-        }
-    }
-    
-    // ============================================================================
-
-    @Override
-    public Range<Long> getVolatilePeriod() {
-        return null;
-    }
-
-    @Override
-    public String getStorageUUID() {
-        return cubeInstance.getUuid();
-    }
-
-
-    @Override
-    public boolean isDynamic() {
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageQuery.java
new file mode 100644
index 0000000..8164310
--- /dev/null
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeStorageQuery.java
@@ -0,0 +1,371 @@
+package org.apache.kylin.storage.cube;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.storage.ICachableStorageQuery;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.hbase.DerivedFilterTranslator;
+import org.apache.kylin.storage.tuple.TupleInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Range;
+import com.google.common.collect.Sets;
+
+public class CubeStorageQuery implements ICachableStorageQuery {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class);
+
+    private static final long MEM_BUDGET_PER_QUERY = 3L * 1024 * 1024 * 1024; // 3G
+
+    private final CubeInstance cubeInstance;
+    private final CubeDesc cubeDesc;
+
+    public CubeStorageQuery(CubeInstance cube) {
+        this.cubeInstance = cube;
+        this.cubeDesc = cube.getDescriptor();
+    }
+
+    @Override
+    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+        Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+        TupleFilter filter = sqlDigest.filter;
+
+        // build dimension & metrics
+        Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>();
+        Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
+        buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
+
+        // all dimensions = groups + filter dimensions
+        Set<TblColRef> filterDims = Sets.newHashSet(dimensions);
+        filterDims.removeAll(groups);
+
+        // expand derived (xxxD means contains host columns only, derived columns were translated)
+        Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
+        Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
+        Set<TblColRef> filterDimsD = expandDerived(filterDims, derivedPostAggregation);
+        filterDimsD.removeAll(groupsD);
+        derivedPostAggregation.removeAll(groups);
+
+        // identify cuboid
+        Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
+        dimensionsD.addAll(groupsD);
+        dimensionsD.addAll(filterDimsD);
+        Cuboid cuboid = identifyCuboid(dimensionsD);
+        context.setCuboid(cuboid);
+
+        // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
+        Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
+        boolean isExactAggregation = isExactAggregation(cuboid, groups, filterDimsD, singleValuesD, derivedPostAggregation);
+        context.setExactAggregation(isExactAggregation);
+        
+        if (isExactAggregation) {
+            metrics = replaceHolisticCountDistinct(metrics);
+        }
+
+        // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
+        TupleFilter filterD = translateDerived(filter, groupsD);
+
+        setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
+        // TODO enable coprocessor
+//        setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
+        setLimit(filter, context);
+
+        List<CubeScanner> scanners = Lists.newArrayList();
+        for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
+            scanners.add(new CubeScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD));
+        }
+        
+        if (scanners.isEmpty())
+            return ITupleIterator.EMPTY_TUPLE_ITERATOR;
+        
+        return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo);
+    }
+
+    private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
+        for (FunctionDesc func : sqlDigest.aggregations) {
+            if (!func.isDimensionAsMetric()) {
+                // use the FunctionDesc from cube desc as much as possible, that has more info such as HLLC precision
+                metrics.add(findAggrFuncFromCubeDesc(func));
+            }
+        }
+
+        for (TblColRef column : sqlDigest.allColumns) {
+            // skip measure columns
+            if (sqlDigest.metricColumns.contains(column)) {
+                continue;
+            }
+            dimensions.add(column);
+        }
+    }
+    
+    private FunctionDesc findAggrFuncFromCubeDesc(FunctionDesc aggrFunc) {
+        for (MeasureDesc measure : cubeDesc.getMeasures()) {
+            if (measure.getFunction().equals(aggrFunc))
+                return measure.getFunction();
+        }
+        return aggrFunc;
+    }
+
+    private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
+        Set<TblColRef> expanded = Sets.newHashSet();
+        for (TblColRef col : cols) {
+            if (cubeDesc.isDerived(col)) {
+                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+                for (TblColRef hostCol : hostInfo.columns) {
+                    expanded.add(hostCol);
+                    if (hostInfo.isOneToOne == false)
+                        derivedPostAggregation.add(hostCol);
+                }
+            } else {
+                expanded.add(col);
+            }
+        }
+        return expanded;
+    }
+
+    private Cuboid identifyCuboid(Set<TblColRef> dimensions) {
+        long cuboidID = 0;
+        for (TblColRef column : dimensions) {
+            int index = cubeDesc.getRowkey().getColumnBitIndex(column);
+            cuboidID |= 1L << index;
+        }
+        return Cuboid.findById(cubeDesc, cuboidID);
+    }
+
+    @SuppressWarnings("unchecked")
+    private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
+        Collection<? extends TupleFilter> toCheck;
+        if (filter instanceof CompareTupleFilter) {
+            toCheck = Collections.singleton(filter);
+        } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
+            toCheck = filter.getChildren();
+        } else {
+            return (Set<TblColRef>) Collections.EMPTY_SET;
+        }
+
+        Set<TblColRef> result = Sets.newHashSet();
+        for (TupleFilter f : toCheck) {
+            if (f instanceof CompareTupleFilter) {
+                CompareTupleFilter compFilter = (CompareTupleFilter) f;
+                // is COL=const ?
+                if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
+                    result.add(compFilter.getColumn());
+                }
+            }
+        }
+
+        // expand derived
+        Set<TblColRef> resultD = Sets.newHashSet();
+        for (TblColRef col : result) {
+            if (cubeDesc.isDerived(col)) {
+                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+                if (hostInfo.isOneToOne) {
+                    for (TblColRef hostCol : hostInfo.columns) {
+                        resultD.add(hostCol);
+                    }
+                }
+                //if not one2one, it will be pruned
+            } else {
+                resultD.add(col);
+            }
+        }
+        return resultD;
+    }
+
+    private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
+        boolean exact = true;
+
+        if (cuboid.requirePostAggregation()) {
+            exact = false;
+            logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
+        }
+
+        // derived aggregation is bad, unless expanded columns are already in group by
+        if (groups.containsAll(derivedPostAggregation) == false) {
+            exact = false;
+            logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
+        }
+
+        // other columns (from filter) is bad, unless they are ensured to have single value
+        if (singleValuesD.containsAll(othersD) == false) {
+            exact = false;
+            logger.info("exactAggregation is false because some column not on group by: " + othersD //
+                    + " (single value column: " + singleValuesD + ")");
+        }
+
+        if (exact) {
+            logger.info("exactAggregation is true");
+        }
+        return exact;
+    }
+
+    private Set<FunctionDesc> replaceHolisticCountDistinct(Set<FunctionDesc> metrics) {
+        // for count distinct, try use its holistic version if possible
+        Set<FunctionDesc> result = new LinkedHashSet<FunctionDesc>();
+        for (FunctionDesc metric : metrics) {
+            if (metric.isCountDistinct() == false) {
+                result.add(metric);
+                continue;
+            }
+            
+            FunctionDesc holisticVersion = null;
+            for (MeasureDesc measure : cubeDesc.getMeasures()) {
+                FunctionDesc measureFunc = measure.getFunction();
+                if (measureFunc.equals(metric) && measureFunc.isHolisticCountDistinct()) {
+                    holisticVersion = measureFunc;
+                }
+            }
+            result.add(holisticVersion == null ? metric : holisticVersion);
+        }
+        return result;
+    }
+
+    @SuppressWarnings("unchecked")
+    private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
+        if (filter == null)
+            return filter;
+
+        if (filter instanceof CompareTupleFilter) {
+            return translateDerivedInCompare((CompareTupleFilter) filter, collector);
+        }
+
+        List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
+        List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
+        boolean modified = false;
+        for (TupleFilter child : children) {
+            TupleFilter translated = translateDerived(child, collector);
+            newChildren.add(translated);
+            if (child != translated)
+                modified = true;
+        }
+        if (modified) {
+            filter = replaceChildren(filter, newChildren);
+        }
+        return filter;
+    }
+
+    private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
+        if (filter instanceof LogicalTupleFilter) {
+            LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
+            r.addChildren(newChildren);
+            return r;
+        } else
+            throw new IllegalStateException("Cannot replaceChildren on " + filter);
+    }
+
+    private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
+        if (compf.getColumn() == null || compf.getValues().isEmpty())
+            return compf;
+
+        TblColRef derived = compf.getColumn();
+        if (cubeDesc.isDerived(derived) == false)
+            return compf;
+
+        DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
+        CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
+        CubeSegment seg = cubeInstance.getLatestReadySegment();
+        LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
+        Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
+        TupleFilter translatedFilter = translated.getFirst();
+        boolean loosened = translated.getSecond();
+        if (loosened) {
+            collectColumnsRecursively(translatedFilter, collector);
+        }
+        return translatedFilter;
+    }
+
+    private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
+        if (filter instanceof ColumnTupleFilter) {
+            collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
+        }
+        for (TupleFilter child : filter.getChildren()) {
+            collectColumnsRecursively(child, collector);
+        }
+    }
+
+    private void collectColumns(TblColRef col, Set<TblColRef> collector) {
+        if (cubeDesc.isDerived(col)) {
+            DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+            for (TblColRef h : hostInfo.columns)
+                collector.add(h);
+        } else {
+            collector.add(col);
+        }
+    }
+
+    private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
+        boolean hasMemHungryCountDistinct = false;
+        for (FunctionDesc func : metrics) {
+            if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
+                hasMemHungryCountDistinct = true;
+            }
+        }
+
+        // need to limit the memory usage for memory hungry count distinct
+        if (hasMemHungryCountDistinct == false) {
+            return;
+        }
+
+        int rowSizeEst = dimensions.size() * 3;
+        for (FunctionDesc func : metrics) {
+            rowSizeEst += func.getReturnDataType().getSpaceEstimate();
+        }
+
+        long rowEst = MEM_BUDGET_PER_QUERY / rowSizeEst;
+        context.setThreshold((int) rowEst);
+    }
+
+    private void setLimit(TupleFilter filter, StorageContext context) {
+        boolean goodAggr = context.isExactAggregation();
+        boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
+        boolean goodSort = context.hasSort() == false;
+        if (goodAggr && goodFilter && goodSort) {
+            logger.info("Enable limit " + context.getLimit());
+            context.enableLimit();
+        }
+    }
+    
+    // ============================================================================
+
+    @Override
+    public Range<Long> getVolatilePeriod() {
+        return null;
+    }
+
+    @Override
+    public String getStorageUUID() {
+        return cubeInstance.getUuid();
+    }
+
+
+    @Override
+    public boolean isDynamic() {
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/22dc5734/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
deleted file mode 100644
index fdca42b..0000000
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageEngine.java
+++ /dev/null
@@ -1,663 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.storage.hbase;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Range;
-import com.google.common.collect.Sets;
-import org.apache.hadoop.hbase.client.HConnection;
-import org.apache.kylin.common.persistence.HBaseConnection;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowValueDecoder;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.cube.model.HBaseMappingDesc;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.storage.ICachableStorageEngine;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverEnabler;
-import org.apache.kylin.storage.tuple.TupleInfo;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-
-/**
- * @author xjiang, yangli9
- */
-public class CubeStorageEngine implements ICachableStorageEngine {
-
-    private static final Logger logger = LoggerFactory.getLogger(CubeStorageEngine.class);
-
-    private static final int MERGE_KEYRANGE_THRESHOLD = 100;
-    private static final long MEM_BUDGET_PER_QUERY = 3L * 1024 * 1024 * 1024; // 3G
-
-    private final CubeInstance cubeInstance;
-    private final CubeDesc cubeDesc;
-    private final String uuid;
-
-    public CubeStorageEngine(CubeInstance cube) {
-        this.cubeInstance = cube;
-        this.cubeDesc = cube.getDescriptor();
-        this.uuid = cube.getUuid();
-    }
-
-    @Override
-    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-
-        Collection<TblColRef> groups = sqlDigest.groupbyColumns;
-        TupleFilter filter = sqlDigest.filter;
-
-        // build dimension & metrics
-        Collection<TblColRef> dimensions = new HashSet<TblColRef>();
-        Collection<FunctionDesc> metrics = new HashSet<FunctionDesc>();
-        buildDimensionsAndMetrics(dimensions, metrics, sqlDigest);
-
-        // all dimensions = groups + others
-        Set<TblColRef> others = Sets.newHashSet(dimensions);
-        others.removeAll(groups);
-
-        // expand derived
-        Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
-        Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
-        Set<TblColRef> othersD = expandDerived(others, derivedPostAggregation);
-        othersD.removeAll(groupsD);
-        derivedPostAggregation.removeAll(groups);
-
-        // identify cuboid
-        Set<TblColRef> dimensionsD = Sets.newHashSet();
-        dimensionsD.addAll(groupsD);
-        dimensionsD.addAll(othersD);
-        Cuboid cuboid = identifyCuboid(dimensionsD);
-        context.setCuboid(cuboid);
-
-        // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
-        Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
-        boolean isExactAggregation = isExactAggregation(cuboid, groups, othersD, singleValuesD, derivedPostAggregation);
-        context.setExactAggregation(isExactAggregation);
-
-        // translate filter for scan range and compose returning groups for coprocessor, note:
-        // - columns on non-evaluatable filter have to return
-        // - columns on loosened filter (due to derived translation) have to return
-        Set<TblColRef> groupsCopD = Sets.newHashSet(groupsD);
-        collectNonEvaluable(filter, groupsCopD);
-        TupleFilter filterD = translateDerived(filter, groupsCopD);
-
-        // flatten to OR-AND filter, (A AND B AND ..) OR (C AND D AND ..) OR ..
-        TupleFilter flatFilter = flattenToOrAndFilter(filterD);
-
-        // translate filter into segment scan ranges
-        List<HBaseKeyRange> scans = buildScanRanges(flatFilter, dimensionsD);
-
-        // check involved measures, build value decoder for each each family:column
-        List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHBaseMapping(), metrics, context);
-
-        //memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more
-        //setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
-        setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
-        setLimit(filter, context);
-
-        HConnection conn = HBaseConnection.get(context.getConnUrl());
-        return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);
-    }
-
-    @Override
-    public Range<Long> getVolatilePeriod() {
-        return null;
-    }
-
-    @Override
-    public String getStorageUUID() {
-        return this.uuid;
-    }
-
-    @Override
-    public boolean isDynamic() {
-        return false;
-    }
-
-    private void buildDimensionsAndMetrics(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, SQLDigest sqlDigest) {
-
-        for (FunctionDesc func : sqlDigest.aggregations) {
-            if (!func.isDimensionAsMetric()) {
-                metrics.add(func);
-            }
-        }
-
-        for (TblColRef column : sqlDigest.allColumns) {
-            // skip measure columns
-            if (sqlDigest.metricColumns.contains(column)) {
-                continue;
-            }
-            dimensions.add(column);
-        }
-    }
-
-    private Cuboid identifyCuboid(Set<TblColRef> dimensions) {
-        long cuboidID = 0;
-        for (TblColRef column : dimensions) {
-            int index = cubeDesc.getRowkey().getColumnBitIndex(column);
-            cuboidID |= 1L << index;
-        }
-        return Cuboid.findById(cubeDesc, cuboidID);
-    }
-
-    private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
-        boolean exact = true;
-
-        if (cuboid.requirePostAggregation()) {
-            exact = false;
-            logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
-        }
-
-        // derived aggregation is bad, unless expanded columns are already in group by
-        if (groups.containsAll(derivedPostAggregation) == false) {
-            exact = false;
-            logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
-        }
-
-        // other columns (from filter) is bad, unless they are ensured to have single value
-        if (singleValuesD.containsAll(othersD) == false) {
-            exact = false;
-            logger.info("exactAggregation is false because some column not on group by: " + othersD //
-                    + " (single value column: " + singleValuesD + ")");
-        }
-
-        if (exact) {
-            logger.info("exactAggregation is true");
-        }
-        return exact;
-    }
-
-    private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
-        Set<TblColRef> expanded = Sets.newHashSet();
-        for (TblColRef col : cols) {
-            if (cubeDesc.isDerived(col)) {
-                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-                for (TblColRef hostCol : hostInfo.columns) {
-                    expanded.add(hostCol);
-                    if (hostInfo.isOneToOne == false)
-                        derivedPostAggregation.add(hostCol);
-                }
-            } else {
-                expanded.add(col);
-            }
-        }
-        return expanded;
-    }
-
-    @SuppressWarnings("unchecked")
-    private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
-        Collection<? extends TupleFilter> toCheck;
-        if (filter instanceof CompareTupleFilter) {
-            toCheck = Collections.singleton(filter);
-        } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
-            toCheck = filter.getChildren();
-        } else {
-            return (Set<TblColRef>) Collections.EMPTY_SET;
-        }
-
-        Set<TblColRef> result = Sets.newHashSet();
-        for (TupleFilter f : toCheck) {
-            if (f instanceof CompareTupleFilter) {
-                CompareTupleFilter compFilter = (CompareTupleFilter) f;
-                // is COL=const ?
-                if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
-                    result.add(compFilter.getColumn());
-                }
-            }
-        }
-
-        // expand derived
-        Set<TblColRef> resultD = Sets.newHashSet();
-        for (TblColRef col : result) {
-            if (cubeDesc.isDerived(col)) {
-                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-                if (hostInfo.isOneToOne) {
-                    for (TblColRef hostCol : hostInfo.columns) {
-                        resultD.add(hostCol);
-                    }
-                }
-                //if not one2one, it will be pruned
-            } else {
-                resultD.add(col);
-            }
-        }
-        return resultD;
-    }
-
-    private void collectNonEvaluable(TupleFilter filter, Set<TblColRef> collector) {
-        if (filter == null)
-            return;
-
-        if (filter.isEvaluable()) {
-            for (TupleFilter child : filter.getChildren())
-                collectNonEvaluable(child, collector);
-        } else {
-            collectColumnsRecursively(filter, collector);
-        }
-    }
-
-    private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
-        if (filter instanceof ColumnTupleFilter) {
-            collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
-        }
-        for (TupleFilter child : filter.getChildren()) {
-            collectColumnsRecursively(child, collector);
-        }
-    }
-
-    private void collectColumns(TblColRef col, Set<TblColRef> collector) {
-        if (cubeDesc.isDerived(col)) {
-            DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-            for (TblColRef h : hostInfo.columns)
-                collector.add(h);
-        } else {
-            collector.add(col);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
-        if (filter == null)
-            return filter;
-
-        if (filter instanceof CompareTupleFilter) {
-            return translateDerivedInCompare((CompareTupleFilter) filter, collector);
-        }
-
-        List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
-        List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
-        boolean modified = false;
-        for (TupleFilter child : children) {
-            TupleFilter translated = translateDerived(child, collector);
-            newChildren.add(translated);
-            if (child != translated)
-                modified = true;
-        }
-        if (modified) {
-            filter = replaceChildren(filter, newChildren);
-        }
-        return filter;
-    }
-
-    private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
-        if (filter instanceof LogicalTupleFilter) {
-            LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
-            r.addChildren(newChildren);
-            return r;
-        } else
-            throw new IllegalStateException("Cannot replaceChildren on " + filter);
-    }
-
-    private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
-        if (compf.getColumn() == null || compf.getValues().isEmpty())
-            return compf;
-
-        TblColRef derived = compf.getColumn();
-        if (cubeDesc.isDerived(derived) == false)
-            return compf;
-
-        DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
-        CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
-        CubeSegment seg = cubeInstance.getLatestReadySegment();
-        LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
-        Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
-        TupleFilter translatedFilter = translated.getFirst();
-        boolean loosened = translated.getSecond();
-        if (loosened) {
-            collectColumnsRecursively(translatedFilter, collector);
-        }
-        return translatedFilter;
-    }
-
-    private List<RowValueDecoder> translateAggregation(HBaseMappingDesc hbaseMapping, Collection<FunctionDesc> metrics, //
-            StorageContext context) {
-        Map<HBaseColumnDesc, RowValueDecoder> codecMap = Maps.newHashMap();
-        for (FunctionDesc aggrFunc : metrics) {
-            Collection<HBaseColumnDesc> hbCols = hbaseMapping.findHBaseColumnByFunction(aggrFunc);
-            if (hbCols.isEmpty()) {
-                throw new IllegalStateException("can't find HBaseColumnDesc for function " + aggrFunc.getFullExpression());
-            }
-            HBaseColumnDesc bestHBCol = null;
-            int bestIndex = -1;
-            for (HBaseColumnDesc hbCol : hbCols) {
-                bestHBCol = hbCol;
-                bestIndex = hbCol.findMeasureIndex(aggrFunc);
-                MeasureDesc measure = hbCol.getMeasures()[bestIndex];
-                // criteria for holistic measure: Exact Aggregation && Exact Cuboid
-                if (measure.getFunction().isHolisticCountDistinct() && context.isExactAggregation()) {
-                    logger.info("Holistic count distinct chosen for " + aggrFunc);
-                    break;
-                }
-            }
-
-            RowValueDecoder codec = codecMap.get(bestHBCol);
-            if (codec == null) {
-                codec = new RowValueDecoder(bestHBCol);
-                codecMap.put(bestHBCol, codec);
-            }
-            codec.setIndex(bestIndex);
-        }
-        return new ArrayList<RowValueDecoder>(codecMap.values());
-    }
-
-    private TupleFilter flattenToOrAndFilter(TupleFilter filter) {
-        if (filter == null)
-            return null;
-
-        TupleFilter flatFilter = filter.flatFilter();
-
-        // normalize to OR-AND filter
-        if (flatFilter.getOperator() == FilterOperatorEnum.AND) {
-            LogicalTupleFilter f = new LogicalTupleFilter(FilterOperatorEnum.OR);
-            f.addChild(flatFilter);
-            flatFilter = f;
-        }
-
-        if (flatFilter.getOperator() != FilterOperatorEnum.OR)
-            throw new IllegalStateException();
-
-        return flatFilter;
-    }
-
-    private List<HBaseKeyRange> buildScanRanges(TupleFilter flatFilter, Collection<TblColRef> dimensionColumns) {
-
-        List<HBaseKeyRange> result = Lists.newArrayList();
-
-        logger.info("Current cubeInstance is " + cubeInstance + " with " + cubeInstance.getSegments().size() + " segs in all");
-        List<CubeSegment> segs = cubeInstance.getSegments(SegmentStatusEnum.READY);
-        logger.info("READY segs count: " + segs.size());
-
-        // build row key range for each cube segment
-        for (CubeSegment cubeSeg : segs) {
-
-            // consider derived (lookup snapshot), filter on dimension may
-            // differ per segment
-            List<Collection<ColumnValueRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter, cubeSeg);
-            if (orAndDimRanges == null) { // has conflict
-                continue;
-            }
-
-            List<HBaseKeyRange> scanRanges = Lists.newArrayListWithCapacity(orAndDimRanges.size());
-            for (Collection<ColumnValueRange> andDimRanges : orAndDimRanges) {
-                HBaseKeyRange rowKeyRange = new HBaseKeyRange(dimensionColumns, andDimRanges, cubeSeg, cubeDesc);
-                scanRanges.add(rowKeyRange);
-            }
-
-            List<HBaseKeyRange> mergedRanges = mergeOverlapRanges(scanRanges);
-            mergedRanges = mergeTooManyRanges(mergedRanges);
-            result.addAll(mergedRanges);
-        }
-
-        logger.info("hbasekeyrange count: " + result.size());
-        dropUnhitSegments(result);
-        logger.info("hbasekeyrange count after dropping unhit :" + result.size());
-
-        return result;
-    }
-
-    private List<Collection<ColumnValueRange>> translateToOrAndDimRanges(TupleFilter flatFilter, CubeSegment cubeSegment) {
-        List<Collection<ColumnValueRange>> result = Lists.newArrayList();
-
-        if (flatFilter == null) {
-            result.add(Collections.<ColumnValueRange> emptyList());
-            return result;
-        }
-
-        for (TupleFilter andFilter : flatFilter.getChildren()) {
-            if (andFilter.getOperator() != FilterOperatorEnum.AND) {
-                throw new IllegalStateException("Filter should be AND instead of " + andFilter);
-            }
-
-            Collection<ColumnValueRange> andRanges = translateToAndDimRanges(andFilter.getChildren(), cubeSegment);
-
-            result.add(andRanges);
-        }
-
-        return preprocessConstantConditions(result);
-    }
-
-    private List<Collection<ColumnValueRange>> preprocessConstantConditions(List<Collection<ColumnValueRange>> orAndRanges) {
-        boolean globalAlwaysTrue = false;
-        Iterator<Collection<ColumnValueRange>> iterator = orAndRanges.iterator();
-        while (iterator.hasNext()) {
-            Collection<ColumnValueRange> andRanges = iterator.next();
-            Iterator<ColumnValueRange> iterator2 = andRanges.iterator();
-            boolean hasAlwaysFalse = false;
-            while (iterator2.hasNext()) {
-                ColumnValueRange range = iterator2.next();
-                if (range.satisfyAll())
-                    iterator2.remove();
-                else if (range.satisfyNone())
-                    hasAlwaysFalse = true;
-            }
-            if (hasAlwaysFalse) {
-                iterator.remove();
-            } else if (andRanges.isEmpty()) {
-                globalAlwaysTrue = true;
-                break;
-            }
-        }
-        if (globalAlwaysTrue) {
-            orAndRanges.clear();
-            orAndRanges.add(Collections.<ColumnValueRange> emptyList());
-        }
-        return orAndRanges;
-    }
-
-    // return empty collection to mean true; return null to mean false
-    @SuppressWarnings("unchecked")
-    private Collection<ColumnValueRange> translateToAndDimRanges(List<? extends TupleFilter> andFilters, CubeSegment cubeSegment) {
-        Map<TblColRef, ColumnValueRange> rangeMap = new HashMap<TblColRef, ColumnValueRange>();
-        for (TupleFilter filter : andFilters) {
-            if ((filter instanceof CompareTupleFilter) == false) {
-                continue;
-            }
-
-            CompareTupleFilter comp = (CompareTupleFilter) filter;
-            if (comp.getColumn() == null) {
-                continue;
-            }
-
-            ColumnValueRange range = new ColumnValueRange(comp.getColumn(), (Collection<String>) comp.getValues(), comp.getOperator());
-            andMerge(range, rangeMap);
-        }
-
-        // a little pre-evaluation to remove invalid EQ/IN values and round start/end according to dictionary
-        Iterator<ColumnValueRange> it = rangeMap.values().iterator();
-        while (it.hasNext()) {
-            ColumnValueRange range = it.next();
-            range.preEvaluateWithDict((Dictionary<String>) cubeSegment.getDictionary(range.getColumn()));
-            if (range.satisfyAll())
-                it.remove();
-            else if (range.satisfyNone())
-                return null;
-        }
-
-        return rangeMap.values();
-    }
-
-    private void andMerge(ColumnValueRange range, Map<TblColRef, ColumnValueRange> rangeMap) {
-        ColumnValueRange columnRange = rangeMap.get(range.getColumn());
-        if (columnRange == null) {
-            rangeMap.put(range.getColumn(), range);
-        } else {
-            columnRange.andMerge(range);
-        }
-    }
-
-    private List<HBaseKeyRange> mergeOverlapRanges(List<HBaseKeyRange> keyRanges) {
-        if (keyRanges.size() <= 1) {
-            return keyRanges;
-        }
-
-        if (logger.isDebugEnabled()) {
-            logger.debug("Merging key range from " + keyRanges.size());
-        }
-
-        // sort ranges by start key
-        Collections.sort(keyRanges);
-
-        // merge the overlap range
-        List<HBaseKeyRange> mergedRanges = new LinkedList<HBaseKeyRange>();
-        int beginIndex = 0;
-        byte[] maxStopKey = keyRanges.get(0).getStopKey();
-        for (int index = 0; index < keyRanges.size(); index++) {
-            HBaseKeyRange keyRange = keyRanges.get(index);
-            if (Bytes.compareTo(maxStopKey, keyRange.getStartKey()) < 0) {
-                // merge the current key ranges
-                HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, index - 1);
-                mergedRanges.add(mergedRange);
-                // start new merge
-                beginIndex = index;
-            }
-            if (Bytes.compareTo(maxStopKey, keyRange.getStopKey()) < 0) {
-                // update the stop key
-                maxStopKey = keyRange.getStopKey();
-            }
-        }
-        // merge last range
-        HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, beginIndex, keyRanges.size() - 1);
-        mergedRanges.add(mergedRange);
-        if (logger.isDebugEnabled()) {
-            logger.debug("Merging key range to " + mergedRanges.size());
-        }
-        return mergedRanges;
-    }
-
-    private HBaseKeyRange mergeKeyRange(List<HBaseKeyRange> keyRanges, int from, int to) {
-        HBaseKeyRange keyRange = keyRanges.get(from);
-        int mergeSize = to - from + 1;
-        if (mergeSize > 1) {
-            // merge range from mergeHeader to i - 1
-            CubeSegment cubeSegment = keyRange.getCubeSegment();
-            Cuboid cuboid = keyRange.getCuboid();
-            byte[] startKey = keyRange.getStartKey();
-            byte[] stopKey = keyRange.getStopKey();
-            long partitionColumnStartDate = Long.MAX_VALUE;
-            long partitionColumnEndDate = 0;
-            List<Pair<byte[], byte[]>> newFuzzyKeys = new ArrayList<Pair<byte[], byte[]>>(mergeSize);
-            List<Collection<ColumnValueRange>> newFlatOrAndFilter = Lists.newLinkedList();
-
-            boolean hasNonFuzzyRange = false;
-            for (int k = from; k <= to; k++) {
-                HBaseKeyRange nextRange = keyRanges.get(k);
-                hasNonFuzzyRange = hasNonFuzzyRange || nextRange.getFuzzyKeys().isEmpty();
-                newFuzzyKeys.addAll(nextRange.getFuzzyKeys());
-                newFlatOrAndFilter.addAll(nextRange.getFlatOrAndFilter());
-                if (Bytes.compareTo(stopKey, nextRange.getStopKey()) < 0) {
-                    stopKey = nextRange.getStopKey();
-                }
-                if (nextRange.getPartitionColumnStartDate() > 0 && nextRange.getPartitionColumnStartDate() < partitionColumnStartDate) {
-                    partitionColumnStartDate = nextRange.getPartitionColumnStartDate();
-                }
-                if (nextRange.getPartitionColumnEndDate() < Long.MAX_VALUE && nextRange.getPartitionColumnEndDate() > partitionColumnEndDate) {
-                    partitionColumnEndDate = nextRange.getPartitionColumnEndDate();
-                }
-            }
-
-            // if any range is non-fuzzy, then all fuzzy keys must be cleared
-            if (hasNonFuzzyRange) {
-                newFuzzyKeys.clear();
-            }
-
-            partitionColumnStartDate = (partitionColumnStartDate == Long.MAX_VALUE) ? 0 : partitionColumnStartDate;
-            partitionColumnEndDate = (partitionColumnEndDate == 0) ? Long.MAX_VALUE : partitionColumnEndDate;
-            keyRange = new HBaseKeyRange(cubeSegment, cuboid, startKey, stopKey, newFuzzyKeys, newFlatOrAndFilter, partitionColumnStartDate, partitionColumnEndDate);
-        }
-        return keyRange;
-    }
-
-    private List<HBaseKeyRange> mergeTooManyRanges(List<HBaseKeyRange> keyRanges) {
-        if (keyRanges.size() < MERGE_KEYRANGE_THRESHOLD) {
-            return keyRanges;
-        }
-        // TODO: check the distance between range. and merge the large distance range
-        List<HBaseKeyRange> mergedRanges = new LinkedList<HBaseKeyRange>();
-        HBaseKeyRange mergedRange = mergeKeyRange(keyRanges, 0, keyRanges.size() - 1);
-        mergedRanges.add(mergedRange);
-        return mergedRanges;
-    }
-
-    private void dropUnhitSegments(List<HBaseKeyRange> scans) {
-        if (cubeDesc.getModel().getPartitionDesc().isPartitioned()) {
-            Iterator<HBaseKeyRange> iterator = scans.iterator();
-            while (iterator.hasNext()) {
-                HBaseKeyRange scan = iterator.next();
-                if (scan.hitSegment() == false) {
-                    iterator.remove();
-                }
-            }
-        }
-    }
-
-    private void setThreshold(Collection<TblColRef> dimensions, List<RowValueDecoder> valueDecoders, StorageContext context) {
-        if (RowValueDecoder.hasMemHungryCountDistinct(valueDecoders) == false) {
-            return;
-        }
-
-        int rowSizeEst = dimensions.size() * 3;
-        for (RowValueDecoder decoder : valueDecoders) {
-            MeasureDesc[] measures = decoder.getMeasures();
-            BitSet projectionIndex = decoder.getProjectionIndex();
-            for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
-                FunctionDesc func = measures[i].getFunction();
-                rowSizeEst += func.getReturnDataType().getSpaceEstimate();
-            }
-        }
-
-        long rowEst = MEM_BUDGET_PER_QUERY / rowSizeEst;
-        context.setThreshold((int) rowEst);
-    }
-
-    private void setLimit(TupleFilter filter, StorageContext context) {
-        boolean goodAggr = context.isExactAggregation();
-        boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
-        boolean goodSort = context.hasSort() == false;
-        if (goodAggr && goodFilter && goodSort) {
-            logger.info("Enable limit " + context.getLimit());
-            context.enableLimit();
-        }
-    }
-
-    private void setCoprocessor(Set<TblColRef> groupsCopD, List<RowValueDecoder> valueDecoders, StorageContext context) {
-        ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
-    }
-
-}