You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/06/23 08:43:34 UTC

kylin git commit: refactor about default gtstore

Repository: kylin
Updated Branches:
  refs/heads/master 62ae3cb16 -> 4fd74fc6a


refactor about default gtstore


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4fd74fc6
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4fd74fc6
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4fd74fc6

Branch: refs/heads/master
Commit: 4fd74fc6a7d7c9bcccab38b57da76f2b983cf5ef
Parents: 62ae3cb
Author: Hongbin Ma <ma...@apache.org>
Authored: Thu Jun 23 16:37:09 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Thu Jun 23 16:37:09 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/gridtable/ScannerWorker.java   |  11 +-
 .../storage/gtrecord/CubeSegmentScanner.java    | 100 +++++
 .../storage/gtrecord/CubeTupleConverter.java    | 270 +++++++++++++
 .../gtrecord/GTCubeStorageQueryBase.java        | 377 +++++++++++++++++++
 .../gtrecord/SequentialCubeTupleIterator.java   | 210 +++++++++++
 .../hbase/cube/v2/CubeSegmentScanner.java       | 100 -----
 .../storage/hbase/cube/v2/CubeStorageQuery.java | 349 +----------------
 .../hbase/cube/v2/CubeTupleConverter.java       | 270 -------------
 .../cube/v2/SequentialCubeTupleIterator.java    | 210 -----------
 9 files changed, 968 insertions(+), 929 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
index 1ac3b02..586a584 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/ScannerWorker.java
@@ -24,8 +24,6 @@ import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
 import java.util.Iterator;
 
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.slf4j.Logger;
@@ -36,7 +34,7 @@ public class ScannerWorker {
     private static final Logger logger = LoggerFactory.getLogger(ScannerWorker.class);
     private IGTScanner internal = null;
 
-    public ScannerWorker(CubeSegment cubeSeg, Cuboid cuboid, GTScanRequest scanRequest) {
+    public ScannerWorker(CubeSegment cubeSeg, Cuboid cuboid, GTScanRequest scanRequest, String gtStorage) {
         if (scanRequest == null) {
             logger.info("Segment {} will be skipped", cubeSeg);
             internal = new EmptyGTScanner(0);
@@ -46,12 +44,7 @@ public class ScannerWorker {
         final GTInfo info = scanRequest.getInfo();
 
         try {
-            IGTStorage rpc;
-            if ("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) {
-                rpc = (IGTStorage) Class.forName("org.apache.kylin.storage.hbase.cube.v2.CubeHBaseScanRPC").getConstructor(CubeSegment.class, Cuboid.class, GTInfo.class).newInstance(cubeSeg, cuboid, info); // for local debug
-            } else {
-                rpc = (IGTStorage) Class.forName(KylinConfig.getInstanceFromEnv().getDefaultIGTStorage()).getConstructor(CubeSegment.class, Cuboid.class, GTInfo.class).newInstance(cubeSeg, cuboid, info); // default behavior
-            }
+            IGTStorage rpc = (IGTStorage) Class.forName(gtStorage).getConstructor(CubeSegment.class, Cuboid.class, GTInfo.class).newInstance(cubeSeg, cuboid, info); // default behavior
             internal = rpc.getGTScanner(scanRequest);
         } catch (IOException | InstantiationException | InvocationTargetException | IllegalAccessException | ClassNotFoundException | NoSuchMethodException e) {
             throw new RuntimeException("error", e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
new file mode 100644
index 0000000..c12159d
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.dict.BuildInFunctionTransformer;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanRangePlanner;
+import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.gridtable.ScannerWorker;
+import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.StorageContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class CubeSegmentScanner implements IGTScanner {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeSegmentScanner.class);
+
+    final CubeSegment cubeSeg;
+    final ScannerWorker scanner;
+    final Cuboid cuboid;
+
+    final GTScanRequest scanRequest;
+
+    public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
+            Collection<FunctionDesc> metrics, TupleFilter filter, StorageContext context,String gtStorage) {
+        this.cuboid = cuboid;
+        this.cubeSeg = cubeSeg;
+
+        // translate FunctionTupleFilter to IN clause
+        ITupleFilterTransformer translator = new BuildInFunctionTransformer(cubeSeg.getDimensionEncodingMap());
+        filter = translator.transform(filter);
+
+        String plannerName = KylinConfig.getInstanceFromEnv().getQueryStorageVisitPlanner();
+        GTScanRangePlanner scanRangePlanner;
+        try {
+            scanRangePlanner = (GTScanRangePlanner) Class.forName(plannerName).getConstructor(CubeSegment.class, Cuboid.class, TupleFilter.class, Set.class, Set.class, Collection.class).newInstance(cubeSeg, cuboid, filter, dimensions, groups, metrics);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        scanRequest = scanRangePlanner.planScanRequest();
+        if (scanRequest != null) {
+            scanRequest.setAllowPreAggregation(!context.isExactAggregation());
+            scanRequest.setAggrCacheGB(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
+            if (context.isLimitEnabled())
+                scanRequest.setRowLimit(context.getLimit());
+        }
+        scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest,gtStorage);
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        return scanner.iterator();
+    }
+
+    @Override
+    public void close() throws IOException {
+        scanner.close();
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return scanRequest == null ? null : scanRequest.getInfo();
+    }
+
+    @Override
+    public int getScannedRowCount() {
+        return scanner.getScannedRowCount();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
new file mode 100644
index 0000000..d6917e1
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+
+import org.apache.kylin.common.util.Array;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
+import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * convert GTRecord to tuple
+ */
+public class CubeTupleConverter {
+
+    final CubeSegment cubeSeg;
+    final Cuboid cuboid;
+    final TupleInfo tupleInfo;
+    final List<IDerivedColumnFiller> derivedColFillers;
+
+    final int[] gtColIdx;
+    final int[] tupleIdx;
+    final Object[] gtValues;
+    final MeasureType<?>[] measureTypes;
+    
+    final List<IAdvMeasureFiller> advMeasureFillers;
+    final List<Integer> advMeasureIndexInGTValues;
+    
+    final int nSelectedDims;
+
+    public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
+            Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
+        this.cubeSeg = cubeSeg;
+        this.cuboid = cuboid;
+        this.tupleInfo = returnTupleInfo;
+        this.derivedColFillers = Lists.newArrayList();
+
+        List<TblColRef> cuboidDims = cuboid.getColumns();
+        CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
+
+        nSelectedDims = selectedDimensions.size();
+        gtColIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
+        tupleIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
+        gtValues = new Object[selectedDimensions.size() + selectedMetrics.size()];
+
+        // measure types don't have this many, but aligned length make programming easier
+        measureTypes = new MeasureType[selectedDimensions.size() + selectedMetrics.size()];
+        
+        advMeasureFillers = Lists.newArrayListWithCapacity(1);
+        advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
+        
+        int iii = 0;
+
+        // pre-calculate dimension index mapping to tuple
+        for (TblColRef dim : selectedDimensions) {
+            int i = mapping.getIndexOf(dim);
+            gtColIdx[iii] = i;
+            tupleIdx[iii] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
+            iii++;
+        }
+
+        for (FunctionDesc metric : selectedMetrics) {
+            int i = mapping.getIndexOf(metric);
+            gtColIdx[iii] = i;
+            
+            if (metric.needRewrite()) {
+                String rewriteFieldName = metric.getRewriteFieldName();
+                tupleIdx[iii] = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1;
+            }
+            // a non-rewrite metrics (like sum, or dimension playing as metrics) is like a dimension column
+            else {
+                TblColRef col = metric.getParameter().getColRefs().get(0);
+                tupleIdx[iii] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
+            }
+            
+            MeasureType<?> measureType = metric.getMeasureType();
+            if (measureType.needAdvancedTupleFilling()) {
+                Map<TblColRef, Dictionary<String>> dictionaryMap = buildDictionaryMap(measureType.getColumnsNeedDictionary(metric));
+                advMeasureFillers.add(measureType.getAdvancedTupleFiller(metric, returnTupleInfo, dictionaryMap));
+                advMeasureIndexInGTValues.add(iii);
+            } else {
+                measureTypes[iii] = measureType;
+            }
+                
+            iii++;
+        }
+
+        // prepare derived columns and filler
+        Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboidDims, null);
+        for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
+            TblColRef[] hostCols = entry.getKey().data;
+            for (DeriveInfo deriveInfo : entry.getValue()) {
+                IDerivedColumnFiller filler = newDerivedColumnFiller(hostCols, deriveInfo);
+                if (filler != null) {
+                    derivedColFillers.add(filler);
+                }
+            }
+        }
+    }
+
+    // load only needed dictionaries
+    private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) {
+        Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+        for (TblColRef col : columnsNeedDictionary) {
+            result.put(col, cubeSeg.getDictionary(col));
+        }
+        return result;
+    }
+
+    public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple) {
+
+        record.getValues(gtColIdx, gtValues);
+
+        // dimensions
+        for (int i = 0; i < nSelectedDims; i++) {
+            int ti = tupleIdx[i];
+            if (ti >= 0) {
+                tuple.setDimensionValue(ti, toString(gtValues[i]));
+            }
+        }
+
+        // measures
+        for (int i = nSelectedDims; i < gtColIdx.length; i++) {
+            int ti = tupleIdx[i];
+            if (ti >= 0 && measureTypes[i] != null) {
+                measureTypes[i].fillTupleSimply(tuple, ti, gtValues[i]);
+            }
+        }
+
+        // derived
+        for (IDerivedColumnFiller filler : derivedColFillers) {
+            filler.fillDerivedColumns(gtValues, tuple);
+        }
+        
+        // advanced measure filling, due to possible row split, will complete at caller side
+        if (advMeasureFillers.isEmpty()) {
+            return null;
+        } else {
+            for (int i = 0; i < advMeasureFillers.size(); i++) {
+                Object measureValue = gtValues[advMeasureIndexInGTValues.get(i)];
+                advMeasureFillers.get(i).reload(measureValue);
+            }
+            return advMeasureFillers;
+        }
+    }
+
+    private interface IDerivedColumnFiller {
+        public void fillDerivedColumns(Object[] gtValues, Tuple tuple);
+    }
+
+    private IDerivedColumnFiller newDerivedColumnFiller(TblColRef[] hostCols, final DeriveInfo deriveInfo) {
+        boolean allHostsPresent = true;
+        final int[] hostTmpIdx = new int[hostCols.length];
+        for (int i = 0; i < hostCols.length; i++) {
+            hostTmpIdx[i] = indexOnTheGTValues(hostCols[i]);
+            allHostsPresent = allHostsPresent && hostTmpIdx[i] >= 0;
+        }
+
+        boolean needCopyDerived = false;
+        final int[] derivedTupleIdx = new int[deriveInfo.columns.length];
+        for (int i = 0; i < deriveInfo.columns.length; i++) {
+            TblColRef col = deriveInfo.columns[i];
+            derivedTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
+            needCopyDerived = needCopyDerived || derivedTupleIdx[i] >= 0;
+        }
+
+        if ((allHostsPresent && needCopyDerived) == false)
+            return null;
+
+        switch (deriveInfo.type) {
+        case LOOKUP:
+            return new IDerivedColumnFiller() {
+                CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
+                LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.dimension);
+                int[] derivedColIdx = initDerivedColIdx();
+                Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
+
+                private int[] initDerivedColIdx() {
+                    int[] idx = new int[deriveInfo.columns.length];
+                    for (int i = 0; i < idx.length; i++) {
+                        idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex();
+                    }
+                    return idx;
+                }
+
+                @Override
+                public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
+                    for (int i = 0; i < hostTmpIdx.length; i++) {
+                        lookupKey.data[i] = CubeTupleConverter.toString(gtValues[hostTmpIdx[i]]);
+                    }
+
+                    String[] lookupRow = lookupTable.getRow(lookupKey);
+
+                    if (lookupRow != null) {
+                        for (int i = 0; i < derivedTupleIdx.length; i++) {
+                            if (derivedTupleIdx[i] >= 0) {
+                                String value = lookupRow[derivedColIdx[i]];
+                                tuple.setDimensionValue(derivedTupleIdx[i], value);
+                            }
+                        }
+                    } else {
+                        for (int i = 0; i < derivedTupleIdx.length; i++) {
+                            if (derivedTupleIdx[i] >= 0) {
+                                tuple.setDimensionValue(derivedTupleIdx[i], null);
+                            }
+                        }
+                    }
+                }
+            };
+        case PK_FK:
+            return new IDerivedColumnFiller() {
+                @Override
+                public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
+                    // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns()
+                    tuple.setDimensionValue(derivedTupleIdx[0], CubeTupleConverter.toString(gtValues[hostTmpIdx[0]]));
+                }
+            };
+        default:
+            throw new IllegalArgumentException();
+        }
+    }
+
+    private int indexOnTheGTValues(TblColRef col) {
+        List<TblColRef> cuboidDims = cuboid.getColumns();
+        int cuboidIdx = cuboidDims.indexOf(col);
+        for (int i = 0; i < gtColIdx.length; i++) {
+            if (gtColIdx[i] == cuboidIdx)
+                return i;
+        }
+        return -1;
+    }
+
+    private static String toString(Object o) {
+        return o == null ? null : o.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
new file mode 100644
index 0000000..e58e74a
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
+import org.apache.kylin.dict.lookup.LookupStringTable;
+import org.apache.kylin.measure.MeasureType;
+import org.apache.kylin.metadata.filter.ColumnTupleFilter;
+import org.apache.kylin.metadata.filter.CompareTupleFilter;
+import org.apache.kylin.metadata.filter.LogicalTupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.SQLDigest;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+import org.apache.kylin.storage.IStorageQuery;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.translate.DerivedFilterTranslator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public abstract class GTCubeStorageQueryBase implements IStorageQuery {
+
+    private static final Logger logger = LoggerFactory.getLogger(GTCubeStorageQueryBase.class);
+
+    private final CubeInstance cubeInstance;
+    private final CubeDesc cubeDesc;
+
+    public GTCubeStorageQueryBase(CubeInstance cube) {
+        this.cubeInstance = cube;
+        this.cubeDesc = cube.getDescriptor();
+    }
+
+    @Override
+    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
+        // allow custom measures hack
+        notifyBeforeStorageQuery(sqlDigest);
+
+        Collection<TblColRef> groups = sqlDigest.groupbyColumns;
+        TupleFilter filter = sqlDigest.filter;
+
+        // build dimension & metrics
+        Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>();
+        Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
+        buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
+
+        // all dimensions = groups + other(like filter) dimensions
+        Set<TblColRef> otherDims = Sets.newHashSet(dimensions);
+        otherDims.removeAll(groups);
+
+        // expand derived (xxxD means contains host columns only, derived columns were translated)
+        Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
+        Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
+        Set<TblColRef> otherDimsD = expandDerived(otherDims, derivedPostAggregation);
+        otherDimsD.removeAll(groupsD);
+
+        // identify cuboid
+        Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
+        dimensionsD.addAll(groupsD);
+        dimensionsD.addAll(otherDimsD);
+        Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc, dimensionsD, metrics);
+        context.setCuboid(cuboid);
+
+        // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
+        Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
+        boolean isExactAggregation = isExactAggregation(cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation);
+        context.setExactAggregation(isExactAggregation);
+
+        // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
+        TupleFilter filterD = translateDerived(filter, groupsD);
+
+        setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
+        setLimit(filter, context);
+
+        List<CubeSegmentScanner> scanners = Lists.newArrayList();
+        for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
+            CubeSegmentScanner scanner;
+            if (cubeSeg.getInputRecords() == 0) {
+                logger.warn("cube segment {} input record is 0, " + "it may caused by kylin failed to the job counter " + "as the hadoop history server wasn't running", cubeSeg);
+            }
+            scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, context, getGTStorage());
+            scanners.add(scanner);
+        }
+
+        if (scanners.isEmpty())
+            return ITupleIterator.EMPTY_TUPLE_ITERATOR;
+
+        return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
+    }
+
+    protected abstract String getGTStorage();
+
+    private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
+        for (FunctionDesc func : sqlDigest.aggregations) {
+            if (!func.isDimensionAsMetric()) {
+                // use the FunctionDesc from cube desc as much as possible, that has more info such as HLLC precision
+                metrics.add(findAggrFuncFromCubeDesc(func));
+            }
+        }
+
+        for (TblColRef column : sqlDigest.allColumns) {
+            // skip measure columns
+            if (sqlDigest.metricColumns.contains(column) && !(sqlDigest.groupbyColumns.contains(column) || sqlDigest.filterColumns.contains(column))) {
+                continue;
+            }
+
+            dimensions.add(column);
+        }
+    }
+
+    private FunctionDesc findAggrFuncFromCubeDesc(FunctionDesc aggrFunc) {
+        for (MeasureDesc measure : cubeDesc.getMeasures()) {
+            if (measure.getFunction().equals(aggrFunc))
+                return measure.getFunction();
+        }
+        return aggrFunc;
+    }
+
+    private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
+        Set<TblColRef> expanded = Sets.newHashSet();
+        for (TblColRef col : cols) {
+            if (cubeDesc.hasHostColumn(col)) {
+                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+                for (TblColRef hostCol : hostInfo.columns) {
+                    expanded.add(hostCol);
+                    if (hostInfo.isOneToOne == false)
+                        derivedPostAggregation.add(hostCol);
+                }
+            } else {
+                expanded.add(col);
+            }
+        }
+        return expanded;
+    }
+
+    @SuppressWarnings("unchecked")
+    private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
+        Collection<? extends TupleFilter> toCheck;
+        if (filter instanceof CompareTupleFilter) {
+            toCheck = Collections.singleton(filter);
+        } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
+            toCheck = filter.getChildren();
+        } else {
+            return (Set<TblColRef>) Collections.EMPTY_SET;
+        }
+
+        Set<TblColRef> result = Sets.newHashSet();
+        for (TupleFilter f : toCheck) {
+            if (f instanceof CompareTupleFilter) {
+                CompareTupleFilter compFilter = (CompareTupleFilter) f;
+                // is COL=const ?
+                if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
+                    result.add(compFilter.getColumn());
+                }
+            }
+        }
+
+        // expand derived
+        Set<TblColRef> resultD = Sets.newHashSet();
+        for (TblColRef col : result) {
+            if (cubeDesc.isExtendedColumn(col)) {
+                throw new CubeDesc.CannotFilterExtendedColumnException(col);
+            }
+            if (cubeDesc.isDerived(col)) {
+                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+                if (hostInfo.isOneToOne) {
+                    for (TblColRef hostCol : hostInfo.columns) {
+                        resultD.add(hostCol);
+                    }
+                }
+                //if not one2one, it will be pruned
+            } else {
+                resultD.add(col);
+            }
+        }
+        return resultD;
+    }
+
+    private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
+        boolean exact = true;
+
+        if (cuboid.requirePostAggregation()) {
+            exact = false;
+            logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
+        }
+
+        // derived aggregation is bad, unless expanded columns are already in group by
+        if (groups.containsAll(derivedPostAggregation) == false) {
+            exact = false;
+            logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
+        }
+
+        // other columns (from filter) is bad, unless they are ensured to have single value
+        if (singleValuesD.containsAll(othersD) == false) {
+            exact = false;
+            logger.info("exactAggregation is false because some column not on group by: " + othersD //
+                    + " (single value column: " + singleValuesD + ")");
+        }
+
+        // for partitioned cube, the partition column must belong to group by or has single value
+        PartitionDesc partDesc = cuboid.getCubeDesc().getModel().getPartitionDesc();
+        if (partDesc.isPartitioned()) {
+            TblColRef col = partDesc.getPartitionDateColumnRef();
+            if (!groups.contains(col) && !singleValuesD.contains(col)) {
+                exact = false;
+                logger.info("exactAggregation is false because cube is partitioned and " + col + " is not on group by");
+            }
+        }
+
+        if (exact) {
+            logger.info("exactAggregation is true");
+        }
+        return exact;
+    }
+
+    @SuppressWarnings("unchecked")
+    private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
+        if (filter == null)
+            return filter;
+
+        if (filter instanceof CompareTupleFilter) {
+            return translateDerivedInCompare((CompareTupleFilter) filter, collector);
+        }
+
+        List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
+        List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
+        boolean modified = false;
+        for (TupleFilter child : children) {
+            TupleFilter translated = translateDerived(child, collector);
+            newChildren.add(translated);
+            if (child != translated)
+                modified = true;
+        }
+        if (modified) {
+            filter = replaceChildren(filter, newChildren);
+        }
+        return filter;
+    }
+
+    private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
+        if (filter instanceof LogicalTupleFilter) {
+            LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
+            r.addChildren(newChildren);
+            return r;
+        } else
+            throw new IllegalStateException("Cannot replaceChildren on " + filter);
+    }
+
+    private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
+        if (compf.getColumn() == null || compf.getValues().isEmpty())
+            return compf;
+
+        TblColRef derived = compf.getColumn();
+        if (cubeDesc.isExtendedColumn(derived)) {
+            throw new CubeDesc.CannotFilterExtendedColumnException(derived);
+        }
+        if (cubeDesc.isDerived(derived) == false)
+            return compf;
+
+        DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
+        CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
+        CubeSegment seg = cubeInstance.getLatestReadySegment();
+        LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
+        Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
+        TupleFilter translatedFilter = translated.getFirst();
+        boolean loosened = translated.getSecond();
+        if (loosened) {
+            collectColumnsRecursively(translatedFilter, collector);
+        }
+        return translatedFilter;
+    }
+
+    private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
+        if (filter == null)
+            return;
+
+        if (filter instanceof ColumnTupleFilter) {
+            collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
+        }
+        for (TupleFilter child : filter.getChildren()) {
+            collectColumnsRecursively(child, collector);
+        }
+    }
+
+    private void collectColumns(TblColRef col, Set<TblColRef> collector) {
+        if (cubeDesc.isExtendedColumn(col)) {
+            throw new CubeDesc.CannotFilterExtendedColumnException(col);
+        }
+        if (cubeDesc.isDerived(col)) {
+            DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
+            for (TblColRef h : hostInfo.columns)
+                collector.add(h);
+        } else {
+            collector.add(col);
+        }
+    }
+
+    private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
+        boolean hasMemHungryMeasure = false;
+        for (FunctionDesc func : metrics) {
+            hasMemHungryMeasure |= func.getMeasureType().isMemoryHungry();
+        }
+
+        // need to limit the memory usage for memory hungry measures
+        if (hasMemHungryMeasure == false) {
+            return;
+        }
+
+        int rowSizeEst = dimensions.size() * 3;
+        for (FunctionDesc func : metrics) {
+            // FIXME getStorageBytesEstimate() is not appropriate as here we want size in memory (not in storage)
+            rowSizeEst += func.getReturnDataType().getStorageBytesEstimate();
+        }
+
+        long rowEst = this.cubeInstance.getConfig().getQueryMemBudget() / rowSizeEst;
+        if (rowEst > 0) {
+            logger.info("Memory budget is set to " + rowEst + " rows");
+            context.setThreshold((int) rowEst);
+        } else {
+            logger.info("Memory budget is not set.");
+        }
+    }
+
+    private void setLimit(TupleFilter filter, StorageContext context) {
+        boolean goodAggr = context.isExactAggregation();
+        boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
+        boolean goodSort = context.hasSort() == false;
+        if (goodAggr && goodFilter && goodSort) {
+            logger.info("Enable limit " + context.getLimit());
+            context.enableLimit();
+        }
+    }
+
+    private void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
+        for (MeasureDesc measure : cubeDesc.getMeasures()) {
+            MeasureType<?> measureType = measure.getFunction().getMeasureType();
+            measureType.adjustSqlDigest(measure, sqlDigest);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
new file mode 100644
index 0000000..3681e5e
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.exception.ScanOutOfLimitException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SequentialCubeTupleIterator implements ITupleIterator {
+
+    private static final Logger logger = LoggerFactory.getLogger(SequentialCubeTupleIterator.class);
+
+    protected final Cuboid cuboid;
+    protected final Set<TblColRef> selectedDimensions;
+    protected final Set<FunctionDesc> selectedMetrics;
+    protected final TupleInfo tupleInfo;
+    protected final Tuple tuple;
+    protected final Iterator<CubeSegmentScanner> scannerIterator;
+    protected final StorageContext context;
+
+    protected CubeSegmentScanner curScanner;
+    protected Iterator<GTRecord> curRecordIterator;
+    protected CubeTupleConverter curTupleConverter;
+    protected Tuple next;
+    
+    private List<IAdvMeasureFiller> advMeasureFillers;
+    private int advMeasureRowsRemaining;
+    private int advMeasureRowIndex;
+
+    private int scanCount;
+    private int scanCountDelta;
+
+    public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
+            Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
+        this.cuboid = cuboid;
+        this.selectedDimensions = selectedDimensions;
+        this.selectedMetrics = selectedMetrics;
+        this.tupleInfo = returnTupleInfo;
+        this.tuple = new Tuple(returnTupleInfo);
+        this.scannerIterator = scanners.iterator();
+        this.context = context;
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (next != null)
+            return true;
+        
+        if (hitLimitAndThreshold())
+            return false;
+        
+        // consume any left rows from advanced measure filler
+        if (advMeasureRowsRemaining > 0) {
+            for (IAdvMeasureFiller filler : advMeasureFillers) {
+                filler.fillTuple(tuple, advMeasureRowIndex);
+            }
+            advMeasureRowIndex++;
+            advMeasureRowsRemaining--;
+            next = tuple;
+            return true;
+        }
+
+        // get the next GTRecord
+        if (curScanner == null) {
+            if (scannerIterator.hasNext()) {
+                curScanner = scannerIterator.next();
+                curRecordIterator = curScanner.iterator();
+                if (curRecordIterator.hasNext()) {
+                    //if the segment does not has any tuples, don't bother to create a converter
+                    curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+                }
+            } else {
+                return false;
+            }
+        }
+        if (curRecordIterator.hasNext() == false) {
+            close(curScanner);
+            curScanner = null;
+            curRecordIterator = null;
+            curTupleConverter = null;
+            return hasNext();
+        }
+
+        // now we have a GTRecord
+        GTRecord curRecord = curRecordIterator.next();
+        
+        // translate into tuple
+        advMeasureFillers = curTupleConverter.translateResult(curRecord, tuple);
+
+        // the simple case
+        if (advMeasureFillers == null) {
+            next = tuple;
+            return true;
+        }
+        
+        // advanced measure filling, like TopN, will produce multiple tuples out of one record
+        advMeasureRowsRemaining = -1;
+        for (IAdvMeasureFiller filler : advMeasureFillers) {
+            if (advMeasureRowsRemaining < 0)
+                advMeasureRowsRemaining = filler.getNumOfRows();
+            if (advMeasureRowsRemaining != filler.getNumOfRows())
+                throw new IllegalStateException();
+        }
+        if (advMeasureRowsRemaining < 0)
+            throw new IllegalStateException();
+        
+        advMeasureRowIndex = 0;
+        return hasNext();
+    }
+    
+
+    private boolean hitLimitAndThreshold() {
+        // check limit
+        if (context.isLimitEnabled() && scanCount >= context.getLimit() + context.getOffset()) {
+            return true;
+        }
+        // check threshold
+        if (scanCount >= context.getThreshold()) {
+            throw new ScanOutOfLimitException("Scan row count exceeded threshold: " + context.getThreshold() + ", please add filter condition to narrow down backend scan range, like where clause.");
+        }
+        return false;
+    }
+
+    @Override
+    public ITuple next() {
+        // fetch next record
+        if (next == null) {
+            hasNext();
+            if (next == null)
+                throw new NoSuchElementException();
+        }
+
+        scanCount++;
+        if (++scanCountDelta >= 1000)
+            flushScanCountDelta();
+
+        ITuple result = next;
+        next = null;
+        return result;
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close() {
+        // hasNext() loop may exit because of limit, threshold, etc.
+        // close all the remaining segmentIterator
+        flushScanCountDelta();
+
+        if (curScanner != null)
+            close(curScanner);
+
+        while (scannerIterator.hasNext()) {
+            close(scannerIterator.next());
+        }
+    }
+
+    protected void close(CubeSegmentScanner scanner) {
+        try {
+            scanner.close();
+        } catch (IOException e) {
+            logger.error("Exception when close CubeScanner", e);
+        }
+    }
+
+    public int getScanCount() {
+        return scanCount;
+    }
+
+    private void flushScanCountDelta() {
+        context.increaseTotalScanCount(scanCountDelta);
+        scanCountDelta = 0;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
deleted file mode 100644
index 9890ae9..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.cube.v2;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Set;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.dict.BuildInFunctionTransformer;
-import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRangePlanner;
-import org.apache.kylin.gridtable.GTScanRequest;
-import org.apache.kylin.gridtable.IGTScanner;
-import org.apache.kylin.gridtable.ScannerWorker;
-import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.storage.StorageContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class CubeSegmentScanner implements IGTScanner {
-
-    private static final Logger logger = LoggerFactory.getLogger(CubeSegmentScanner.class);
-
-    final CubeSegment cubeSeg;
-    final ScannerWorker scanner;
-    final Cuboid cuboid;
-
-    final GTScanRequest scanRequest;
-
-    public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
-            Collection<FunctionDesc> metrics, TupleFilter filter, StorageContext context) {
-        this.cuboid = cuboid;
-        this.cubeSeg = cubeSeg;
-
-        // translate FunctionTupleFilter to IN clause
-        ITupleFilterTransformer translator = new BuildInFunctionTransformer(cubeSeg.getDimensionEncodingMap());
-        filter = translator.transform(filter);
-
-        String plannerName = KylinConfig.getInstanceFromEnv().getQueryStorageVisitPlanner();
-        GTScanRangePlanner scanRangePlanner;
-        try {
-            scanRangePlanner = (GTScanRangePlanner) Class.forName(plannerName).getConstructor(CubeSegment.class, Cuboid.class, TupleFilter.class, Set.class, Set.class, Collection.class).newInstance(cubeSeg, cuboid, filter, dimensions, groups, metrics);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        scanRequest = scanRangePlanner.planScanRequest();
-        if (scanRequest != null) {
-            scanRequest.setAllowPreAggregation(!context.isExactAggregation());
-            scanRequest.setAggrCacheGB(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
-            if (context.isLimitEnabled())
-                scanRequest.setRowLimit(context.getLimit());
-        }
-        scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest);
-    }
-
-    @Override
-    public Iterator<GTRecord> iterator() {
-        return scanner.iterator();
-    }
-
-    @Override
-    public void close() throws IOException {
-        scanner.close();
-    }
-
-    @Override
-    public GTInfo getInfo() {
-        return scanRequest == null ? null : scanRequest.getInfo();
-    }
-
-    @Override
-    public int getScannedRowCount() {
-        return scanner.getScannedRowCount();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index cec4e2f..f9c9a2b 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -18,358 +18,27 @@
 
 package org.apache.kylin.storage.hbase.cube.v2;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.metadata.filter.ColumnTupleFilter;
-import org.apache.kylin.metadata.filter.CompareTupleFilter;
-import org.apache.kylin.metadata.filter.LogicalTupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter;
-import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.realization.SQLDigest;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.IStorageQuery;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.translate.DerivedFilterTranslator;
+import org.apache.kylin.storage.gtrecord.GTCubeStorageQueryBase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-public class CubeStorageQuery implements IStorageQuery {
+public class CubeStorageQuery extends GTCubeStorageQueryBase {
 
     private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class);
 
-    private final CubeInstance cubeInstance;
-    private final CubeDesc cubeDesc;
-
     public CubeStorageQuery(CubeInstance cube) {
-        this.cubeInstance = cube;
-        this.cubeDesc = cube.getDescriptor();
+        super(cube);
     }
 
     @Override
-    public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-        // allow custom measures hack
-        notifyBeforeStorageQuery(sqlDigest);
-
-        Collection<TblColRef> groups = sqlDigest.groupbyColumns;
-        TupleFilter filter = sqlDigest.filter;
-
-        // build dimension & metrics
-        Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>();
-        Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
-        buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
-
-        // all dimensions = groups + other(like filter) dimensions
-        Set<TblColRef> otherDims = Sets.newHashSet(dimensions);
-        otherDims.removeAll(groups);
-
-        // expand derived (xxxD means contains host columns only, derived columns were translated)
-        Set<TblColRef> derivedPostAggregation = Sets.newHashSet();
-        Set<TblColRef> groupsD = expandDerived(groups, derivedPostAggregation);
-        Set<TblColRef> otherDimsD = expandDerived(otherDims, derivedPostAggregation);
-        otherDimsD.removeAll(groupsD);
-
-        // identify cuboid
-        Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
-        dimensionsD.addAll(groupsD);
-        dimensionsD.addAll(otherDimsD);
-        Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc, dimensionsD, metrics);
-        context.setCuboid(cuboid);
-
-        // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
-        Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
-        boolean isExactAggregation = isExactAggregation(cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation);
-        context.setExactAggregation(isExactAggregation);
-
-        // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
-        TupleFilter filterD = translateDerived(filter, groupsD);
-
-        setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
-        setLimit(filter, context);
-
-        List<CubeSegmentScanner> scanners = Lists.newArrayList();
-        for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
-            CubeSegmentScanner scanner;
-            if (cubeSeg.getInputRecords() == 0) {
-                logger.warn("cube segment {} input record is 0, " + "it may caused by kylin failed to the job counter " + "as the hadoop history server wasn't running", cubeSeg);
-            }
-            scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, context);
-            scanners.add(scanner);
-        }
-
-        if (scanners.isEmpty())
-            return ITupleIterator.EMPTY_TUPLE_ITERATOR;
-
-        return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
-    }
-
-    private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
-        for (FunctionDesc func : sqlDigest.aggregations) {
-            if (!func.isDimensionAsMetric()) {
-                // use the FunctionDesc from cube desc as much as possible, that has more info such as HLLC precision
-                metrics.add(findAggrFuncFromCubeDesc(func));
-            }
-        }
-
-        for (TblColRef column : sqlDigest.allColumns) {
-            // skip measure columns
-            if (sqlDigest.metricColumns.contains(column) && !(sqlDigest.groupbyColumns.contains(column) || sqlDigest.filterColumns.contains(column))) {
-                continue;
-            }
-
-            dimensions.add(column);
-        }
-    }
-
-    private FunctionDesc findAggrFuncFromCubeDesc(FunctionDesc aggrFunc) {
-        for (MeasureDesc measure : cubeDesc.getMeasures()) {
-            if (measure.getFunction().equals(aggrFunc))
-                return measure.getFunction();
-        }
-        return aggrFunc;
-    }
-
-    private Set<TblColRef> expandDerived(Collection<TblColRef> cols, Set<TblColRef> derivedPostAggregation) {
-        Set<TblColRef> expanded = Sets.newHashSet();
-        for (TblColRef col : cols) {
-            if (cubeDesc.hasHostColumn(col)) {
-                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-                for (TblColRef hostCol : hostInfo.columns) {
-                    expanded.add(hostCol);
-                    if (hostInfo.isOneToOne == false)
-                        derivedPostAggregation.add(hostCol);
-                }
-            } else {
-                expanded.add(col);
-            }
-        }
-        return expanded;
-    }
-
-    @SuppressWarnings("unchecked")
-    private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
-        Collection<? extends TupleFilter> toCheck;
-        if (filter instanceof CompareTupleFilter) {
-            toCheck = Collections.singleton(filter);
-        } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
-            toCheck = filter.getChildren();
+    protected String getGTStorage() {
+        if ("scan".equalsIgnoreCase(BackdoorToggles.getHbaseCubeQueryProtocol())) {
+            return "org.apache.kylin.storage.hbase.cube.v2.CubeHBaseScanRPC";
         } else {
-            return (Set<TblColRef>) Collections.EMPTY_SET;
-        }
-
-        Set<TblColRef> result = Sets.newHashSet();
-        for (TupleFilter f : toCheck) {
-            if (f instanceof CompareTupleFilter) {
-                CompareTupleFilter compFilter = (CompareTupleFilter) f;
-                // is COL=const ?
-                if (compFilter.getOperator() == FilterOperatorEnum.EQ && compFilter.getValues().size() == 1 && compFilter.getColumn() != null) {
-                    result.add(compFilter.getColumn());
-                }
-            }
-        }
-
-        // expand derived
-        Set<TblColRef> resultD = Sets.newHashSet();
-        for (TblColRef col : result) {
-            if (cubeDesc.isExtendedColumn(col)) {
-                throw new CubeDesc.CannotFilterExtendedColumnException(col);
-            }
-            if (cubeDesc.isDerived(col)) {
-                DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-                if (hostInfo.isOneToOne) {
-                    for (TblColRef hostCol : hostInfo.columns) {
-                        resultD.add(hostCol);
-                    }
-                }
-                //if not one2one, it will be pruned
-            } else {
-                resultD.add(col);
-            }
+            return KylinConfig.getInstanceFromEnv().getDefaultIGTStorage();
         }
-        return resultD;
     }
-
-    private boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
-        boolean exact = true;
-
-        if (cuboid.requirePostAggregation()) {
-            exact = false;
-            logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
-        }
-
-        // derived aggregation is bad, unless expanded columns are already in group by
-        if (groups.containsAll(derivedPostAggregation) == false) {
-            exact = false;
-            logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
-        }
-
-        // other columns (from filter) is bad, unless they are ensured to have single value
-        if (singleValuesD.containsAll(othersD) == false) {
-            exact = false;
-            logger.info("exactAggregation is false because some column not on group by: " + othersD //
-                    + " (single value column: " + singleValuesD + ")");
-        }
-
-        // for partitioned cube, the partition column must belong to group by or has single value
-        PartitionDesc partDesc = cuboid.getCubeDesc().getModel().getPartitionDesc();
-        if (partDesc.isPartitioned()) {
-            TblColRef col = partDesc.getPartitionDateColumnRef();
-            if (!groups.contains(col) && !singleValuesD.contains(col)) {
-                exact = false;
-                logger.info("exactAggregation is false because cube is partitioned and " + col + " is not on group by");
-            }
-        }
-
-        if (exact) {
-            logger.info("exactAggregation is true");
-        }
-        return exact;
-    }
-
-    @SuppressWarnings("unchecked")
-    private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
-        if (filter == null)
-            return filter;
-
-        if (filter instanceof CompareTupleFilter) {
-            return translateDerivedInCompare((CompareTupleFilter) filter, collector);
-        }
-
-        List<TupleFilter> children = (List<TupleFilter>) filter.getChildren();
-        List<TupleFilter> newChildren = Lists.newArrayListWithCapacity(children.size());
-        boolean modified = false;
-        for (TupleFilter child : children) {
-            TupleFilter translated = translateDerived(child, collector);
-            newChildren.add(translated);
-            if (child != translated)
-                modified = true;
-        }
-        if (modified) {
-            filter = replaceChildren(filter, newChildren);
-        }
-        return filter;
-    }
-
-    private TupleFilter replaceChildren(TupleFilter filter, List<TupleFilter> newChildren) {
-        if (filter instanceof LogicalTupleFilter) {
-            LogicalTupleFilter r = new LogicalTupleFilter(filter.getOperator());
-            r.addChildren(newChildren);
-            return r;
-        } else
-            throw new IllegalStateException("Cannot replaceChildren on " + filter);
-    }
-
-    private TupleFilter translateDerivedInCompare(CompareTupleFilter compf, Set<TblColRef> collector) {
-        if (compf.getColumn() == null || compf.getValues().isEmpty())
-            return compf;
-
-        TblColRef derived = compf.getColumn();
-        if (cubeDesc.isExtendedColumn(derived)) {
-            throw new CubeDesc.CannotFilterExtendedColumnException(derived);
-        }
-        if (cubeDesc.isDerived(derived) == false)
-            return compf;
-
-        DeriveInfo hostInfo = cubeDesc.getHostInfo(derived);
-        CubeManager cubeMgr = CubeManager.getInstance(this.cubeInstance.getConfig());
-        CubeSegment seg = cubeInstance.getLatestReadySegment();
-        LookupStringTable lookup = cubeMgr.getLookupTable(seg, hostInfo.dimension);
-        Pair<TupleFilter, Boolean> translated = DerivedFilterTranslator.translate(lookup, hostInfo, compf);
-        TupleFilter translatedFilter = translated.getFirst();
-        boolean loosened = translated.getSecond();
-        if (loosened) {
-            collectColumnsRecursively(translatedFilter, collector);
-        }
-        return translatedFilter;
-    }
-
-    private void collectColumnsRecursively(TupleFilter filter, Set<TblColRef> collector) {
-        if (filter == null)
-            return;
-
-        if (filter instanceof ColumnTupleFilter) {
-            collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
-        }
-        for (TupleFilter child : filter.getChildren()) {
-            collectColumnsRecursively(child, collector);
-        }
-    }
-
-    private void collectColumns(TblColRef col, Set<TblColRef> collector) {
-        if (cubeDesc.isExtendedColumn(col)) {
-            throw new CubeDesc.CannotFilterExtendedColumnException(col);
-        }
-        if (cubeDesc.isDerived(col)) {
-            DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-            for (TblColRef h : hostInfo.columns)
-                collector.add(h);
-        } else {
-            collector.add(col);
-        }
-    }
-
-    private void setThreshold(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
-        boolean hasMemHungryMeasure = false;
-        for (FunctionDesc func : metrics) {
-            hasMemHungryMeasure |= func.getMeasureType().isMemoryHungry();
-        }
-
-        // need to limit the memory usage for memory hungry measures
-        if (hasMemHungryMeasure == false) {
-            return;
-        }
-
-        int rowSizeEst = dimensions.size() * 3;
-        for (FunctionDesc func : metrics) {
-            // FIXME getStorageBytesEstimate() is not appropriate as here we want size in memory (not in storage)
-            rowSizeEst += func.getReturnDataType().getStorageBytesEstimate();
-        }
-
-        long rowEst = this.cubeInstance.getConfig().getQueryMemBudget() / rowSizeEst;
-        if (rowEst > 0) {
-            logger.info("Memory budget is set to " + rowEst + " rows");
-            context.setThreshold((int) rowEst);
-        } else {
-            logger.info("Memory budget is not set.");
-        }
-    }
-
-    private void setLimit(TupleFilter filter, StorageContext context) {
-        boolean goodAggr = context.isExactAggregation();
-        boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
-        boolean goodSort = context.hasSort() == false;
-        if (goodAggr && goodFilter && goodSort) {
-            logger.info("Enable limit " + context.getLimit());
-            context.enableLimit();
-        }
-    }
-
-    private void notifyBeforeStorageQuery(SQLDigest sqlDigest) {
-        for (MeasureDesc measure : cubeDesc.getMeasures()) {
-            MeasureType<?> measureType = measure.getFunction().getMeasureType();
-            measureType.adjustSqlDigest(measure, sqlDigest);
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
deleted file mode 100644
index a7346af..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeTupleConverter.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.cube.v2;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-
-import org.apache.kylin.common.util.Array;
-import org.apache.kylin.common.util.Dictionary;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.gridtable.CuboidToGridTableMapping;
-import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
-import org.apache.kylin.dict.lookup.LookupStringTable;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.measure.MeasureType;
-import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.Tuple;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-/**
- * convert GTRecord to tuple
- */
-public class CubeTupleConverter {
-
-    final CubeSegment cubeSeg;
-    final Cuboid cuboid;
-    final TupleInfo tupleInfo;
-    final List<IDerivedColumnFiller> derivedColFillers;
-
-    final int[] gtColIdx;
-    final int[] tupleIdx;
-    final Object[] gtValues;
-    final MeasureType<?>[] measureTypes;
-    
-    final List<IAdvMeasureFiller> advMeasureFillers;
-    final List<Integer> advMeasureIndexInGTValues;
-    
-    final int nSelectedDims;
-
-    public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
-            Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
-        this.cubeSeg = cubeSeg;
-        this.cuboid = cuboid;
-        this.tupleInfo = returnTupleInfo;
-        this.derivedColFillers = Lists.newArrayList();
-
-        List<TblColRef> cuboidDims = cuboid.getColumns();
-        CuboidToGridTableMapping mapping = cuboid.getCuboidToGridTableMapping();
-
-        nSelectedDims = selectedDimensions.size();
-        gtColIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
-        tupleIdx = new int[selectedDimensions.size() + selectedMetrics.size()];
-        gtValues = new Object[selectedDimensions.size() + selectedMetrics.size()];
-
-        // measure types don't have this many, but aligned length make programming easier
-        measureTypes = new MeasureType[selectedDimensions.size() + selectedMetrics.size()];
-        
-        advMeasureFillers = Lists.newArrayListWithCapacity(1);
-        advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
-        
-        int iii = 0;
-
-        // pre-calculate dimension index mapping to tuple
-        for (TblColRef dim : selectedDimensions) {
-            int i = mapping.getIndexOf(dim);
-            gtColIdx[iii] = i;
-            tupleIdx[iii] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
-            iii++;
-        }
-
-        for (FunctionDesc metric : selectedMetrics) {
-            int i = mapping.getIndexOf(metric);
-            gtColIdx[iii] = i;
-            
-            if (metric.needRewrite()) {
-                String rewriteFieldName = metric.getRewriteFieldName();
-                tupleIdx[iii] = tupleInfo.hasField(rewriteFieldName) ? tupleInfo.getFieldIndex(rewriteFieldName) : -1;
-            }
-            // a non-rewrite metrics (like sum, or dimension playing as metrics) is like a dimension column
-            else {
-                TblColRef col = metric.getParameter().getColRefs().get(0);
-                tupleIdx[iii] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
-            }
-            
-            MeasureType<?> measureType = metric.getMeasureType();
-            if (measureType.needAdvancedTupleFilling()) {
-                Map<TblColRef, Dictionary<String>> dictionaryMap = buildDictionaryMap(measureType.getColumnsNeedDictionary(metric));
-                advMeasureFillers.add(measureType.getAdvancedTupleFiller(metric, returnTupleInfo, dictionaryMap));
-                advMeasureIndexInGTValues.add(iii);
-            } else {
-                measureTypes[iii] = measureType;
-            }
-                
-            iii++;
-        }
-
-        // prepare derived columns and filler
-        Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedInfo = cuboid.getCubeDesc().getHostToDerivedInfo(cuboidDims, null);
-        for (Entry<Array<TblColRef>, List<DeriveInfo>> entry : hostToDerivedInfo.entrySet()) {
-            TblColRef[] hostCols = entry.getKey().data;
-            for (DeriveInfo deriveInfo : entry.getValue()) {
-                IDerivedColumnFiller filler = newDerivedColumnFiller(hostCols, deriveInfo);
-                if (filler != null) {
-                    derivedColFillers.add(filler);
-                }
-            }
-        }
-    }
-
-    // load only needed dictionaries
-    private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) {
-        Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
-        for (TblColRef col : columnsNeedDictionary) {
-            result.put(col, cubeSeg.getDictionary(col));
-        }
-        return result;
-    }
-
-    public List<IAdvMeasureFiller> translateResult(GTRecord record, Tuple tuple) {
-
-        record.getValues(gtColIdx, gtValues);
-
-        // dimensions
-        for (int i = 0; i < nSelectedDims; i++) {
-            int ti = tupleIdx[i];
-            if (ti >= 0) {
-                tuple.setDimensionValue(ti, toString(gtValues[i]));
-            }
-        }
-
-        // measures
-        for (int i = nSelectedDims; i < gtColIdx.length; i++) {
-            int ti = tupleIdx[i];
-            if (ti >= 0 && measureTypes[i] != null) {
-                measureTypes[i].fillTupleSimply(tuple, ti, gtValues[i]);
-            }
-        }
-
-        // derived
-        for (IDerivedColumnFiller filler : derivedColFillers) {
-            filler.fillDerivedColumns(gtValues, tuple);
-        }
-        
-        // advanced measure filling, due to possible row split, will complete at caller side
-        if (advMeasureFillers.isEmpty()) {
-            return null;
-        } else {
-            for (int i = 0; i < advMeasureFillers.size(); i++) {
-                Object measureValue = gtValues[advMeasureIndexInGTValues.get(i)];
-                advMeasureFillers.get(i).reload(measureValue);
-            }
-            return advMeasureFillers;
-        }
-    }
-
-    private interface IDerivedColumnFiller {
-        public void fillDerivedColumns(Object[] gtValues, Tuple tuple);
-    }
-
-    private IDerivedColumnFiller newDerivedColumnFiller(TblColRef[] hostCols, final DeriveInfo deriveInfo) {
-        boolean allHostsPresent = true;
-        final int[] hostTmpIdx = new int[hostCols.length];
-        for (int i = 0; i < hostCols.length; i++) {
-            hostTmpIdx[i] = indexOnTheGTValues(hostCols[i]);
-            allHostsPresent = allHostsPresent && hostTmpIdx[i] >= 0;
-        }
-
-        boolean needCopyDerived = false;
-        final int[] derivedTupleIdx = new int[deriveInfo.columns.length];
-        for (int i = 0; i < deriveInfo.columns.length; i++) {
-            TblColRef col = deriveInfo.columns[i];
-            derivedTupleIdx[i] = tupleInfo.hasColumn(col) ? tupleInfo.getColumnIndex(col) : -1;
-            needCopyDerived = needCopyDerived || derivedTupleIdx[i] >= 0;
-        }
-
-        if ((allHostsPresent && needCopyDerived) == false)
-            return null;
-
-        switch (deriveInfo.type) {
-        case LOOKUP:
-            return new IDerivedColumnFiller() {
-                CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
-                LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.dimension);
-                int[] derivedColIdx = initDerivedColIdx();
-                Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
-
-                private int[] initDerivedColIdx() {
-                    int[] idx = new int[deriveInfo.columns.length];
-                    for (int i = 0; i < idx.length; i++) {
-                        idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex();
-                    }
-                    return idx;
-                }
-
-                @Override
-                public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
-                    for (int i = 0; i < hostTmpIdx.length; i++) {
-                        lookupKey.data[i] = CubeTupleConverter.toString(gtValues[hostTmpIdx[i]]);
-                    }
-
-                    String[] lookupRow = lookupTable.getRow(lookupKey);
-
-                    if (lookupRow != null) {
-                        for (int i = 0; i < derivedTupleIdx.length; i++) {
-                            if (derivedTupleIdx[i] >= 0) {
-                                String value = lookupRow[derivedColIdx[i]];
-                                tuple.setDimensionValue(derivedTupleIdx[i], value);
-                            }
-                        }
-                    } else {
-                        for (int i = 0; i < derivedTupleIdx.length; i++) {
-                            if (derivedTupleIdx[i] >= 0) {
-                                tuple.setDimensionValue(derivedTupleIdx[i], null);
-                            }
-                        }
-                    }
-                }
-            };
-        case PK_FK:
-            return new IDerivedColumnFiller() {
-                @Override
-                public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
-                    // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns()
-                    tuple.setDimensionValue(derivedTupleIdx[0], CubeTupleConverter.toString(gtValues[hostTmpIdx[0]]));
-                }
-            };
-        default:
-            throw new IllegalArgumentException();
-        }
-    }
-
-    private int indexOnTheGTValues(TblColRef col) {
-        List<TblColRef> cuboidDims = cuboid.getColumns();
-        int cuboidIdx = cuboidDims.indexOf(col);
-        for (int i = 0; i < gtColIdx.length; i++) {
-            if (gtColIdx[i] == cuboidIdx)
-                return i;
-        }
-        return -1;
-    }
-
-    private static String toString(Object o) {
-        return o == null ? null : o.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4fd74fc6/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
deleted file mode 100644
index f8b055c..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.cube.v2;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.NoSuchElementException;
-import java.util.Set;
-
-import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.apache.kylin.metadata.tuple.ITuple;
-import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.Tuple;
-import org.apache.kylin.metadata.tuple.TupleInfo;
-import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.exception.ScanOutOfLimitException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class SequentialCubeTupleIterator implements ITupleIterator {
-
-    private static final Logger logger = LoggerFactory.getLogger(SequentialCubeTupleIterator.class);
-
-    protected final Cuboid cuboid;
-    protected final Set<TblColRef> selectedDimensions;
-    protected final Set<FunctionDesc> selectedMetrics;
-    protected final TupleInfo tupleInfo;
-    protected final Tuple tuple;
-    protected final Iterator<CubeSegmentScanner> scannerIterator;
-    protected final StorageContext context;
-
-    protected CubeSegmentScanner curScanner;
-    protected Iterator<GTRecord> curRecordIterator;
-    protected CubeTupleConverter curTupleConverter;
-    protected Tuple next;
-    
-    private List<IAdvMeasureFiller> advMeasureFillers;
-    private int advMeasureRowsRemaining;
-    private int advMeasureRowIndex;
-
-    private int scanCount;
-    private int scanCountDelta;
-
-    public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
-            Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
-        this.cuboid = cuboid;
-        this.selectedDimensions = selectedDimensions;
-        this.selectedMetrics = selectedMetrics;
-        this.tupleInfo = returnTupleInfo;
-        this.tuple = new Tuple(returnTupleInfo);
-        this.scannerIterator = scanners.iterator();
-        this.context = context;
-    }
-
-    @Override
-    public boolean hasNext() {
-        if (next != null)
-            return true;
-        
-        if (hitLimitAndThreshold())
-            return false;
-        
-        // consume any left rows from advanced measure filler
-        if (advMeasureRowsRemaining > 0) {
-            for (IAdvMeasureFiller filler : advMeasureFillers) {
-                filler.fillTuple(tuple, advMeasureRowIndex);
-            }
-            advMeasureRowIndex++;
-            advMeasureRowsRemaining--;
-            next = tuple;
-            return true;
-        }
-
-        // get the next GTRecord
-        if (curScanner == null) {
-            if (scannerIterator.hasNext()) {
-                curScanner = scannerIterator.next();
-                curRecordIterator = curScanner.iterator();
-                if (curRecordIterator.hasNext()) {
-                    //if the segment does not has any tuples, don't bother to create a converter
-                    curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
-                }
-            } else {
-                return false;
-            }
-        }
-        if (curRecordIterator.hasNext() == false) {
-            close(curScanner);
-            curScanner = null;
-            curRecordIterator = null;
-            curTupleConverter = null;
-            return hasNext();
-        }
-
-        // now we have a GTRecord
-        GTRecord curRecord = curRecordIterator.next();
-        
-        // translate into tuple
-        advMeasureFillers = curTupleConverter.translateResult(curRecord, tuple);
-
-        // the simple case
-        if (advMeasureFillers == null) {
-            next = tuple;
-            return true;
-        }
-        
-        // advanced measure filling, like TopN, will produce multiple tuples out of one record
-        advMeasureRowsRemaining = -1;
-        for (IAdvMeasureFiller filler : advMeasureFillers) {
-            if (advMeasureRowsRemaining < 0)
-                advMeasureRowsRemaining = filler.getNumOfRows();
-            if (advMeasureRowsRemaining != filler.getNumOfRows())
-                throw new IllegalStateException();
-        }
-        if (advMeasureRowsRemaining < 0)
-            throw new IllegalStateException();
-        
-        advMeasureRowIndex = 0;
-        return hasNext();
-    }
-    
-
-    private boolean hitLimitAndThreshold() {
-        // check limit
-        if (context.isLimitEnabled() && scanCount >= context.getLimit() + context.getOffset()) {
-            return true;
-        }
-        // check threshold
-        if (scanCount >= context.getThreshold()) {
-            throw new ScanOutOfLimitException("Scan row count exceeded threshold: " + context.getThreshold() + ", please add filter condition to narrow down backend scan range, like where clause.");
-        }
-        return false;
-    }
-
-    @Override
-    public ITuple next() {
-        // fetch next record
-        if (next == null) {
-            hasNext();
-            if (next == null)
-                throw new NoSuchElementException();
-        }
-
-        scanCount++;
-        if (++scanCountDelta >= 1000)
-            flushScanCountDelta();
-
-        ITuple result = next;
-        next = null;
-        return result;
-    }
-
-    @Override
-    public void remove() {
-        throw new UnsupportedOperationException();
-    }
-
-    @Override
-    public void close() {
-        // hasNext() loop may exit because of limit, threshold, etc.
-        // close all the remaining segmentIterator
-        flushScanCountDelta();
-
-        if (curScanner != null)
-            close(curScanner);
-
-        while (scannerIterator.hasNext()) {
-            close(scannerIterator.next());
-        }
-    }
-
-    protected void close(CubeSegmentScanner scanner) {
-        try {
-            scanner.close();
-        } catch (IOException e) {
-            logger.error("Exception when close CubeScanner", e);
-        }
-    }
-
-    public int getScanCount() {
-        return scanCount;
-    }
-
-    private void flushScanCountDelta() {
-        context.increaseTotalScanCount(scanCountDelta);
-        scanCountDelta = 0;
-    }
-
-}