You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/08/10 04:56:57 UTC

[2/2] kylin git commit: KYLIN-1936 improve enable limit logic (exactAggregation is too strict)

KYLIN-1936 improve enable limit logic (exactAggregation is too strict)


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/28e94230
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/28e94230
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/28e94230

Branch: refs/heads/master
Commit: 28e942306d545ff3b5f604e8225bacad650ad8ac
Parents: c67891d
Author: Hongbin Ma <ma...@apache.org>
Authored: Fri Aug 5 12:50:27 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Aug 10 12:56:32 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   6 +-
 .../inmemcubing/AbstractInMemCubeBuilder.java   |   3 +-
 .../cube/inmemcubing/DoggedCubeBuilder.java     |   4 +-
 .../cube/inmemcubing/InMemCubeBuilder.java      |   9 +-
 .../kylin/gridtable/GTAggregateScanner.java     |  38 ++++-
 .../java/org/apache/kylin/gridtable/GTInfo.java |   2 +-
 .../org/apache/kylin/gridtable/GTRecord.java    |  33 +++-
 .../GTScanExceedThresholdException.java         |  26 +++
 .../kylin/gridtable/GTScanRangePlanner.java     |   2 +-
 .../apache/kylin/gridtable/GTScanRequest.java   |  92 +++++-----
 .../kylin/gridtable/GTScanRequestBuilder.java   |  97 +++++++++++
 .../kylin/gridtable/GTScanTimeoutException.java |  26 +++
 .../gridtable/benchmark/GTScannerBenchmark.java |   5 +-
 .../benchmark/GTScannerBenchmark2.java          |   5 +-
 .../inmemcubing/ConcurrentDiskStoreTest.java    |   4 +-
 .../cube/inmemcubing/MemDiskStoreTest.java      |   4 +-
 .../gridtable/AggregationCacheSpillTest.java    |  12 +-
 .../kylin/gridtable/DictGridTableTest.java      |  10 +-
 .../kylin/gridtable/SimpleGridTableTest.java    |   4 +-
 .../org/apache/kylin/metadata/tuple/ITuple.java |   2 +-
 .../org/apache/kylin/metadata/tuple/Tuple.java  |   5 +
 .../apache/kylin/storage/StorageContext.java    |   4 +
 .../storage/gtrecord/CubeSegmentScanner.java    |   8 +-
 .../storage/gtrecord/CubeTupleConverter.java    | 148 +++++++++++-----
 .../gtrecord/FetchSourceAwareIterator.java      |  24 +++
 .../gtrecord/GTCubeStorageQueryBase.java        |  47 +++++-
 .../storage/gtrecord/IFetchSourceAware.java     |  25 +++
 .../kylin/storage/gtrecord/PeekingImpl.java     |  73 ++++++++
 .../gtrecord/SegmentCubeTupleIterator.java      | 162 ++++++++++++++++++
 .../gtrecord/SequentialCubeTupleIterator.java   | 151 ++++-------------
 .../storage/gtrecord/SortedIteratorMerger.java  | 100 +++++++++++
 .../gtrecord/SortedIteratorMergerWithLimit.java | 143 ++++++++++++++++
 .../gtrecord/SortedIteratorMergerTest.java      |  97 +++++++++++
 .../SortedIteratorMergerWithLimitTest.java      | 127 ++++++++++++++
 .../apache/kylin/query/HackedDbUnitAssert.java  | 169 +++++++++++++++++++
 .../apache/kylin/query/ITKylinQueryTest.java    |  12 +-
 .../org/apache/kylin/query/KylinTestBase.java   |  72 ++++++++
 .../hbase/cube/HBaseScannerBenchmark.java       |   6 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     | 165 +-----------------
 .../hbase/cube/v2/ExpectedSizeIterator.java     | 116 +++++++++++++
 .../storage/hbase/cube/v2/GTBlobScatter.java    | 150 ++++++++++++++++
 .../coprocessor/endpoint/CubeVisitService.java  |  70 ++++----
 42 files changed, 1811 insertions(+), 447 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f51dce6..eb4102a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -23,9 +23,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.SortedSet;
+import java.util.Map.Entry;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -470,6 +470,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Float.parseFloat(getOptional("kylin.hbase.hfile.size.gb", "2.0"));
     }
 
+    public int getStoragePushDownLimitMax() {
+        return Integer.parseInt(getOptional("kylin.query.pushdown.limit.max", "10000"));
+    }
+
     public int getScanThreshold() {
         return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
index e385ab9..d417d11 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
@@ -25,6 +25,7 @@ import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -83,7 +84,7 @@ abstract public class AbstractInMemCubeBuilder {
 
     protected void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException {
         long startTime = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(gridTable.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
         IGTScanner scanner = gridTable.scan(req);
         for (GTRecord record : scanner) {
             output.write(cuboidId, record);

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 606c820..15f2241 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -35,7 +35,7 @@ import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -399,7 +399,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
                 if (cuboidIterator.hasNext()) {
                     CuboidResult cuboid = cuboidIterator.next();
                     currentCuboidId = cuboid.cuboidId;
-                    scanner = cuboid.table.scan(new GTScanRequest(cuboid.table.getInfo(), null, null, null));
+                    scanner = cuboid.table.scan(new GTScanRequestBuilder().setInfo(cuboid.table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
                     recordIterator = scanner.iterator();
                 } else {
                     return false;

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index e12e815..36d1296 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -31,8 +31,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.gridtable.CubeGridTable;
@@ -42,6 +42,7 @@ import org.apache.kylin.gridtable.GTBuilder;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.measure.topn.Counter;
@@ -329,8 +330,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         IGTScanner baseInput = new InputConverter(baseCuboid.getInfo(), input);
 
         Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
-        GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
-        GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(baseCuboid.getInfo()).setRanges(null).setDimensions(null).setAggrGroupBy(dimensionMetricsBitSet.getFirst()).setAggrMetrics(dimensionMetricsBitSet.getSecond()).setAggrMetricsFuncs(metricsAggrFuncs).setFilterPushDown(null).createGTScanRequest();
+        GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req, Long.MAX_VALUE);
         aggregationScanner.trackMemoryLevel(baseCuboidMemTracker);
 
         int count = 0;
@@ -397,7 +398,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
     private GTAggregateScanner prepareGTAggregationScanner(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
         GTInfo info = gridTable.getInfo();
-        GTScanRequest req = new GTScanRequest(info, null, null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(aggregationColumns).setAggrMetrics(measureColumns).setAggrMetricsFuncs(metricsAggrFuncs).setFilterPushDown(null).createGTScanRequest();
         GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req);
 
         // for child cuboid, some measures don't need aggregation.

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index cb23af4..ccf4895 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -63,24 +63,28 @@ public class GTAggregateScanner implements IGTScanner {
     final IGTScanner inputScanner;
     final AggregationCache aggrCache;
     final long spillThreshold;
+    final int storagePushDownLimit;//default to be Int.MAX
+    final long deadline;
 
     private int aggregatedRowCount = 0;
     private MemoryWaterLevel memTracker;
     private boolean[] aggrMask;
 
-    public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
+    public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, long deadline) {
         if (!req.hasAggregation())
             throw new IllegalStateException();
 
         this.info = inputScanner.getInfo();
-        this.dimensions = req.getColumns().andNot(req.getAggrMetrics());
+        this.dimensions = req.getDimensions();
         this.groupBy = req.getAggrGroupBy();
         this.metrics = req.getAggrMetrics();
         this.metricsAggrFuncs = req.getAggrMetricsFuncs();
         this.inputScanner = inputScanner;
         this.aggrCache = new AggregationCache();
-        this.spillThreshold = (long) (req.getAggrCacheGB() * MemoryBudgetController.ONE_GB);
+        this.spillThreshold = (long) (req.getAggCacheMemThreshold() * MemoryBudgetController.ONE_GB);
         this.aggrMask = new boolean[metricsAggrFuncs.length];
+        this.storagePushDownLimit = req.getStoragePushDownLimit();
+        this.deadline = deadline;
 
         Arrays.fill(aggrMask, true);
     }
@@ -133,8 +137,25 @@ public class GTAggregateScanner implements IGTScanner {
     public Iterator<GTRecord> iterator() {
         long count = 0;
         for (GTRecord r : inputScanner) {
+
             count++;
-            aggrCache.aggregate(r);
+
+            if (getNumOfSpills() == 0) {
+                //check limit
+                boolean ret = aggrCache.aggregate(r, storagePushDownLimit);
+
+                if (!ret) {
+                    logger.info("abort reading inputScanner because storage push down limit is hit");
+                    break;//limit is hit
+                }
+            } else {//else if dumps is not empty, it means a lot of row need aggregated, so it's less likely that limit clause is helping 
+                aggrCache.aggregate(r, Integer.MAX_VALUE);
+            }
+
+            //check deadline
+            if (count % 10000 == 1 && System.currentTimeMillis() > deadline) {
+                throw new GTScanTimeoutException("Timeout in GTAggregateScanner with scanned count " + count);
+            }
         }
         logger.info("GTAggregateScanner input rows: " + count);
         return aggrCache.iterator();
@@ -241,7 +262,7 @@ public class GTAggregateScanner implements IGTScanner {
             return result;
         }
 
-        void aggregate(GTRecord r) {
+        boolean aggregate(GTRecord r, int stopForLimit) {
             if (++aggregatedRowCount % 100000 == 0) {
                 if (memTracker != null) {
                     memTracker.markHigh();
@@ -257,6 +278,12 @@ public class GTAggregateScanner implements IGTScanner {
             final byte[] key = createKey(r);
             MeasureAggregator[] aggrs = aggBufMap.get(key);
             if (aggrs == null) {
+
+                //for storage push down limit
+                if (aggBufMap.size() >= stopForLimit) {
+                    return false;
+                }
+
                 aggrs = newAggregators();
                 aggBufMap.put(key, aggrs);
             }
@@ -267,6 +294,7 @@ public class GTAggregateScanner implements IGTScanner {
                     aggrs[i].aggregate(metrics);
                 }
             }
+            return true;
         }
 
         private void spillBuffMap() throws RuntimeException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
index 673d22e..21da4ea 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
@@ -151,7 +151,7 @@ public class GTInfo {
         if (!expected.equals(ref))
             throw new IllegalArgumentException();
     }
-
+    
     void validate() {
         if (codeSystem == null)
             throw new IllegalStateException();

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index 37f42c7..4d26029 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -20,6 +20,7 @@ package org.apache.kylin.gridtable;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 
 import org.apache.kylin.common.util.ByteArray;
@@ -27,7 +28,7 @@ import org.apache.kylin.common.util.ImmutableBitSet;
 
 import com.google.common.base.Preconditions;
 
-public class GTRecord implements Comparable<GTRecord> {
+public class GTRecord implements Comparable<GTRecord>, Cloneable {
 
     final transient GTInfo info;
     final ByteArray[] cols;
@@ -54,6 +55,11 @@ public class GTRecord implements Comparable<GTRecord> {
         }
     }
 
+    @Override
+    public Object clone() {
+        return new GTRecord(this);
+    }
+
     public GTInfo getInfo() {
         return info;
     }
@@ -189,12 +195,33 @@ public class GTRecord implements Comparable<GTRecord> {
 
     @Override
     public int compareTo(GTRecord o) {
+        return compareToInternal(o, info.colAll);
+    }
+
+    public int compareToOnPrimaryKey(GTRecord o) {
+        return compareToInternal(o, info.primaryKey);
+    }
+
+    public static Comparator<GTRecord> getPrimaryKeyComparator() {
+        return new Comparator<GTRecord>() {
+            @Override
+            public int compare(GTRecord o1, GTRecord o2) {
+                if (o1 == null || o2 == null) {
+                    throw new IllegalStateException("Cannot handle null");
+                }
+
+                return o1.compareToOnPrimaryKey(o2);
+            }
+        };
+    }
+
+    private int compareToInternal(GTRecord o, ImmutableBitSet participateCols) {
         assert this.info == o.info; // reference equal for performance
         IGTComparator comparator = info.codeSystem.getComparator();
 
         int comp = 0;
-        for (int i = 0; i < info.colAll.trueBitCount(); i++) {
-            int c = info.colAll.trueBitAt(i);
+        for (int i = 0; i < participateCols.trueBitCount(); i++) {
+            int c = participateCols.trueBitAt(i);
             comp = comparator.compare(cols[c], o.cols[c]);
             if (comp != 0)
                 return comp;

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
new file mode 100644
index 0000000..dd57e90
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+public class GTScanExceedThresholdException extends RuntimeException {
+
+    public GTScanExceedThresholdException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
index 17d27f9..b8f7e0e 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -165,7 +165,7 @@ public class GTScanRangePlanner {
         GTScanRequest scanRequest;
         List<GTScanRange> scanRanges = this.planScanRanges();
         if (scanRanges != null && scanRanges.size() != 0) {
-            scanRequest = new GTScanRequest(gtInfo, scanRanges, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter);
+            scanRequest = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions).setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs).setFilterPushDown(gtFilter).createGTScanRequest();
         } else {
             scanRequest = null;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 0ce6b4c..e2bac3d 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -57,20 +57,14 @@ public class GTScanRequest {
     private String[] aggrMetricsFuncs;
 
     // hint to storage behavior
-    private boolean allowPreAggregation = true;
-    private double aggrCacheGB = 0; // 0 means no row/memory limit; positive means memory limit in GB; negative means row limit
-
-    public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet columns, TupleFilter filterPushDown) {
-        this(info, ranges, columns, null, null, null, filterPushDown, true, 0);
-    }
-
-    public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
-            ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown) {
-        this(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, true, 0);
-    }
-
-    public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
-            ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowPreAggregation, double aggrCacheGB) {
+    private boolean allowStorageAggregation;
+    private double aggCacheMemThreshold;
+    private int storageScanRowNumThreshold;
+    private int storagePushDownLimit;
+
+    GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
+            ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowStorageAggregation, //
+            double aggCacheMemThreshold, int storageScanRowNumThreshold, int storagePushDownLimit) {
         this.info = info;
         if (ranges == null) {
             this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info)));
@@ -84,8 +78,10 @@ public class GTScanRequest {
         this.aggrMetrics = aggrMetrics;
         this.aggrMetricsFuncs = aggrMetricsFuncs;
 
-        this.allowPreAggregation = allowPreAggregation;
-        this.aggrCacheGB = aggrCacheGB;
+        this.allowStorageAggregation = allowStorageAggregation;
+        this.aggCacheMemThreshold = aggCacheMemThreshold;
+        this.storageScanRowNumThreshold = storageScanRowNumThreshold;
+        this.storagePushDownLimit = storagePushDownLimit;
 
         validate(info);
     }
@@ -143,7 +139,7 @@ public class GTScanRequest {
     }
 
     public IGTScanner decorateScanner(IGTScanner scanner) throws IOException {
-        return decorateScanner(scanner, true, true);
+        return decorateScanner(scanner, true, true, Long.MAX_VALUE);
     }
 
     /**
@@ -152,7 +148,7 @@ public class GTScanRequest {
      * <p/>
      * Refer to CoprocessorBehavior for explanation
      */
-    public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr) throws IOException {
+    public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr, long deadline) throws IOException {
         IGTScanner result = scanner;
         if (!doFilter) { //Skip reading this section if you're not profiling! 
             int scanned = lookAndForget(result);
@@ -169,11 +165,11 @@ public class GTScanRequest {
                 return new EmptyGTScanner(scanned);
             }
 
-            if (!this.allowPreAggregation) {
+            if (!this.isAllowStorageAggregation()) {
                 logger.info("pre aggregation is not beneficial, skip it");
             } else if (this.hasAggregation()) {
                 logger.info("pre aggregating results before returning");
-                result = new GTAggregateScanner(result, this);
+                result = new GTAggregateScanner(result, this, deadline);
             } else {
                 logger.info("has no aggregation, skip it");
             }
@@ -235,6 +231,10 @@ public class GTScanRequest {
         return filterPushDown;
     }
 
+    public ImmutableBitSet getDimensions() {
+        return this.getColumns().andNot(this.getAggrMetrics());
+    }
+
     public ImmutableBitSet getAggrGroupBy() {
         return aggrGroupBy;
     }
@@ -247,34 +247,41 @@ public class GTScanRequest {
         return aggrMetricsFuncs;
     }
 
-    public boolean isAllowPreAggregation() {
-        return allowPreAggregation;
+    public boolean isAllowStorageAggregation() {
+        return allowStorageAggregation;
     }
 
-    public void setAllowPreAggregation(boolean allowPreAggregation) {
-        this.allowPreAggregation = allowPreAggregation;
+    public void setAllowStorageAggregation(boolean allowStorageAggregation) {
+        this.allowStorageAggregation = allowStorageAggregation;
     }
 
-    public double getAggrCacheGB() {
-        if (aggrCacheGB < 0)
+    public double getAggCacheMemThreshold() {
+        if (aggCacheMemThreshold < 0)
             return 0;
         else
-            return aggrCacheGB;
+            return aggCacheMemThreshold;
     }
 
-    public void setAggrCacheGB(double gb) {
-        this.aggrCacheGB = gb;
+    public void setAggCacheMemThreshold(double gb) {
+        this.aggCacheMemThreshold = gb;
     }
 
-    public int getRowLimit() {
-        if (aggrCacheGB < 0)
-            return (int) -aggrCacheGB;
-        else
-            return 0;
+    public int getStorageScanRowNumThreshold() {
+        return storageScanRowNumThreshold;
+    }
+
+    public void setStorageScanRowNumThreshold(int storageScanRowNumThreshold) {
+        logger.info("storageScanRowNumThreshold is set to " + storageScanRowNumThreshold);
+        this.storageScanRowNumThreshold = storageScanRowNumThreshold;
+    }
+
+    public int getStoragePushDownLimit() {
+        return this.storagePushDownLimit;
     }
 
-    public void setRowLimit(int limit) {
-        aggrCacheGB = -limit;
+    public void setStoragePushDownLimit(int limit) {
+        logger.info("storagePushDownLimit is set to " + storagePushDownLimit);
+        this.storagePushDownLimit = limit;
     }
 
     public List<Integer> getRequiredMeasures() {
@@ -323,8 +330,10 @@ public class GTScanRequest {
             ImmutableBitSet.serializer.serialize(value.aggrGroupBy, out);
             ImmutableBitSet.serializer.serialize(value.aggrMetrics, out);
             BytesUtil.writeAsciiStringArray(value.aggrMetricsFuncs, out);
-            BytesUtil.writeVInt(value.allowPreAggregation ? 1 : 0, out);
-            out.putDouble(value.aggrCacheGB);
+            BytesUtil.writeVInt(value.allowStorageAggregation ? 1 : 0, out);
+            out.putDouble(value.aggCacheMemThreshold);
+            BytesUtil.writeVInt(value.storageScanRowNumThreshold, out);
+            BytesUtil.writeVInt(value.storagePushDownLimit, out);
         }
 
         @Override
@@ -353,8 +362,13 @@ public class GTScanRequest {
             String[] sAggrMetricFuncs = BytesUtil.readAsciiStringArray(in);
             boolean sAllowPreAggr = (BytesUtil.readVInt(in) == 1);
             double sAggrCacheGB = in.getDouble();
+            int storageScanRowNumThreshold = BytesUtil.readVInt(in);
+            int storagePushDownLimit = BytesUtil.readVInt(in);
 
-            return new GTScanRequest(sInfo, sRanges, sColumns, sAggGroupBy, sAggrMetrics, sAggrMetricFuncs, sGTFilter, sAllowPreAggr, sAggrCacheGB);
+            return new GTScanRequestBuilder().setInfo(sInfo).setRanges(sRanges).setDimensions(sColumns).//
+            setAggrGroupBy(sAggGroupBy).setAggrMetrics(sAggrMetrics).setAggrMetricsFuncs(sAggrMetricFuncs).//
+            setFilterPushDown(sGTFilter).setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB).//
+            setStorageScanRowNumThreshold(storageScanRowNumThreshold).setStoragePushDownLimit(storagePushDownLimit).createGTScanRequest();
         }
 
         private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
new file mode 100644
index 0000000..49ec759
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.filter.TupleFilter;
+
+public class GTScanRequestBuilder {
+    private GTInfo info;
+    private List<GTScanRange> ranges;
+    private TupleFilter filterPushDown;
+    private ImmutableBitSet dimensions;
+    private ImmutableBitSet aggrGroupBy = null;
+    private ImmutableBitSet aggrMetrics = null;
+    private String[] aggrMetricsFuncs = null;
+    private boolean allowStorageAggregation = true;
+    private double aggCacheMemThreshold = 0;
+    private int storageScanRowNumThreshold = Integer.MAX_VALUE;// storage should terminate itself when $storageScanRowNumThreshold cuboid rows are scanned, and throw exception.   
+    private int storagePushDownLimit = Integer.MAX_VALUE;// storage can quit working when $toragePushDownLimit aggregated rows are produced. 
+
+    public GTScanRequestBuilder setInfo(GTInfo info) {
+        this.info = info;
+        return this;
+    }
+
+    public GTScanRequestBuilder setRanges(List<GTScanRange> ranges) {
+        this.ranges = ranges;
+        return this;
+    }
+
+    public GTScanRequestBuilder setFilterPushDown(TupleFilter filterPushDown) {
+        this.filterPushDown = filterPushDown;
+        return this;
+    }
+
+    public GTScanRequestBuilder setDimensions(ImmutableBitSet dimensions) {
+        this.dimensions = dimensions;
+        return this;
+    }
+
+    public GTScanRequestBuilder setAggrGroupBy(ImmutableBitSet aggrGroupBy) {
+        this.aggrGroupBy = aggrGroupBy;
+        return this;
+    }
+
+    public GTScanRequestBuilder setAggrMetrics(ImmutableBitSet aggrMetrics) {
+        this.aggrMetrics = aggrMetrics;
+        return this;
+    }
+
+    public GTScanRequestBuilder setAggrMetricsFuncs(String[] aggrMetricsFuncs) {
+        this.aggrMetricsFuncs = aggrMetricsFuncs;
+        return this;
+    }
+
+    public GTScanRequestBuilder setAllowStorageAggregation(boolean allowStorageAggregation) {
+        this.allowStorageAggregation = allowStorageAggregation;
+        return this;
+    }
+
+    public GTScanRequestBuilder setAggCacheMemThreshold(double aggCacheMemThreshold) {
+        this.aggCacheMemThreshold = aggCacheMemThreshold;
+        return this;
+    }
+
+    public GTScanRequestBuilder setStorageScanRowNumThreshold(int storageScanRowNumThreshold) {
+        this.storageScanRowNumThreshold = storageScanRowNumThreshold;
+        return this;
+    }
+
+    public GTScanRequestBuilder setStoragePushDownLimit(int storagePushDownLimit) {
+        this.storagePushDownLimit = storagePushDownLimit;
+        return this;
+    }
+
+    public GTScanRequest createGTScanRequest() {
+        return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, allowStorageAggregation, aggCacheMemThreshold, storageScanRowNumThreshold, storagePushDownLimit);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
new file mode 100644
index 0000000..e92dae3
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+public class GTScanTimeoutException extends RuntimeException {
+
+    public GTScanTimeoutException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
index 680ba33..589f37c 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
@@ -29,6 +29,7 @@ import org.apache.kylin.gridtable.GTInfo.Builder;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTSampleCodeSystem;
 import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
@@ -110,7 +111,7 @@ public class GTScannerBenchmark {
     @SuppressWarnings("unused")
     private void testAggregate(ImmutableBitSet groupBy) throws IOException {
         long t = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(dimensions).setAggrGroupBy(groupBy).setAggrMetrics(metrics).setAggrMetricsFuncs(aggrFuncs).setFilterPushDown(null).createGTScanRequest();
         IGTScanner scanner = req.decorateScanner(gen.generate(N));
 
         long count = 0;
@@ -154,7 +155,7 @@ public class GTScannerBenchmark {
     @SuppressWarnings("unused")
     private void testFilter(TupleFilter filter) throws IOException {
         long t = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(info, null, info.getAllColumns(), filter);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(info.getAllColumns()).setFilterPushDown(filter).createGTScanRequest();
         IGTScanner scanner = req.decorateScanner(gen.generate(N));
 
         long count = 0;

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
index ae86e46..40a5e01 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
@@ -31,6 +31,7 @@ import org.apache.kylin.gridtable.GTInfo.Builder;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTSampleCodeSystem;
 import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.benchmark.SortedGTRecordGenerator.Randomizer;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
@@ -132,7 +133,7 @@ public class GTScannerBenchmark2 {
     @SuppressWarnings("unused")
     private void testAggregate(ImmutableBitSet groupBy) throws IOException {
         long t = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(dimensions).setAggrGroupBy(groupBy).setAggrMetrics(metrics).setAggrMetricsFuncs(aggrFuncs).setFilterPushDown(null).createGTScanRequest();
         IGTScanner scanner = req.decorateScanner(gen.generate(N));
 
         long count = 0;
@@ -176,7 +177,7 @@ public class GTScannerBenchmark2 {
     @SuppressWarnings("unused")
     private void testFilter(TupleFilter filter) throws IOException {
         long t = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(info, null, info.getAllColumns(), filter);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(info.getAllColumns()).setFilterPushDown(filter).createGTScanRequest();
         IGTScanner scanner = req.decorateScanner(gen.generate(N));
 
         long count = 0;

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
index ceaf80d..6355f4a 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
@@ -26,7 +26,7 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.gridtable.GTBuilder;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.UnitTestSupport;
@@ -84,7 +84,7 @@ public class ConcurrentDiskStoreTest extends LocalFileMetadataTestCase {
             t[i] = new Thread() {
                 public void run() {
                     try {
-                        IGTScanner scanner = table.scan(new GTScanRequest(table.getInfo(), null, null, null));
+                        IGTScanner scanner = table.scan(new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
                         int i = 0;
                         for (GTRecord r : scanner) {
                             assertEquals(data.get(i++), r);

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
index 807a6e3..06ade1c 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
@@ -27,7 +27,7 @@ import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.gridtable.GTBuilder;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.UnitTestSupport;
@@ -100,7 +100,7 @@ public class MemDiskStoreTest extends LocalFileMetadataTestCase {
         }
         builder.close();
 
-        IGTScanner scanner = table.scan(new GTScanRequest(info, null, null, null));
+        IGTScanner scanner = table.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
         int i = 0;
         for (GTRecord r : scanner) {
             assertEquals(data.get(i++), r);

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
index 4160f86..b5f6de7 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
@@ -84,10 +84,10 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase {
             }
         };
 
-        GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null);
-        scanRequest.setAggrCacheGB(0.5);
+        GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(INFO).setRanges(null).setDimensions(new ImmutableBitSet(0, 3)).setAggrGroupBy(new ImmutableBitSet(0, 3)).setAggrMetrics(new ImmutableBitSet(3, 6)).setAggrMetricsFuncs(new String[] { "SUM", "SUM", "COUNT_DISTINCT" }).setFilterPushDown(null).createGTScanRequest();
+        scanRequest.setAggCacheMemThreshold(0.5);
 
-        GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest);
+        GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest, Long.MAX_VALUE);
 
         int count = 0;
         for (GTRecord record : scanner) {
@@ -127,10 +127,10 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase {
         };
 
         // all-in-mem testcase
-        GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(1, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null);
-        scanRequest.setAggrCacheGB(0.5);
+        GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(INFO).setRanges(null).setDimensions(new ImmutableBitSet(0, 3)).setAggrGroupBy(new ImmutableBitSet(1, 3)).setAggrMetrics(new ImmutableBitSet(3, 6)).setAggrMetricsFuncs(new String[] { "SUM", "SUM", "COUNT_DISTINCT" }).setFilterPushDown(null).createGTScanRequest();
+        scanRequest.setAggCacheMemThreshold(0.5);
 
-        GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest);
+        GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest, Long.MAX_VALUE);
 
         int count = 0;
         for (GTRecord record : scanner) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
index af39e21..74338af 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -256,7 +256,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
 
     @Test
     public void verifyFirstRow() throws IOException {
-        doScanAndVerify(table, new GTScanRequest(table.getInfo(), null, null, null), "[1421193600000, 30, Yang, 10, 10.5]", //
+        doScanAndVerify(table, new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest(), "[1421193600000, 30, Yang, 10, 10.5]", //
                 "[1421193600000, 30, Luke, 10, 10.5]", //
                 "[1421280000000, 20, Dong, 10, 10.5]", //
                 "[1421280000000, 20, Jason, 10, 10.5]", //
@@ -276,7 +276,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer);
 
         Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs());
-        Assert.assertEquals(origin.getAggrCacheGB(), sGTScanRequest.getAggrCacheGB(), 0.01);
+        Assert.assertEquals(origin.getAggCacheMemThreshold(), sGTScanRequest.getAggCacheMemThreshold(), 0.01);
         return sGTScanRequest;
     }
 
@@ -289,7 +289,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
         LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
 
-        GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[]{"sum"}).setFilterPushDown(filter).createGTScanRequest();
 
         // note the unEvaluatable column 1 in filter is added to group by
         assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
@@ -305,7 +305,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
         LogicalTupleFilter filter = and(fComp1, fComp2);
 
-        GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[]{"sum"}).setFilterPushDown(filter).createGTScanRequest();
         // note the evaluatable column 1 in filter is added to returned columns but not in group by
         assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
 
@@ -334,7 +334,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
     @SuppressWarnings("unused")
     private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter) throws IOException {
         long start = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(info, null, null, filter);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(filter).createGTScanRequest();
         IGTScanner scanner = table.scan(req);
         int i = 0;
         for (GTRecord r : scanner) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
index 2abe928..fd571d0 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
@@ -90,7 +90,7 @@ public class SimpleGridTableTest extends LocalFileMetadataTestCase {
     }
 
     private IGTScanner scan(GridTable table) throws IOException {
-        GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, null);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
         IGTScanner scanner = table.scan(req);
         for (GTRecord r : scanner) {
             Object[] v = r.getValues();
@@ -106,7 +106,7 @@ public class SimpleGridTableTest extends LocalFileMetadataTestCase {
     }
 
     private IGTScanner scanAndAggregate(GridTable table) throws IOException {
-        GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0, 2)).setAggrMetrics(setOf(3, 4)).setAggrMetricsFuncs(new String[]{"count", "sum"}).setFilterPushDown(null).createGTScanRequest();
         IGTScanner scanner = table.scan(req);
         int i = 0;
         for (GTRecord r : scanner) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
index 7d401ec..742cfba 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
@@ -28,7 +28,7 @@ import org.apache.kylin.metadata.model.TblColRef;
  * 
  * @author yangli9
  */
-public interface ITuple extends IEvaluatableTuple {
+public interface ITuple extends IEvaluatableTuple, Cloneable {
 
     List<String> getAllFields();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
index 14d717e..54e5786 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
@@ -55,6 +55,11 @@ public class Tuple implements ITuple {
     }
 
     @Override
+    public Object clone() {
+        return makeCopy();
+    }
+
+    @Override
     public ITuple makeCopy() {
         Tuple ret = new Tuple(this.info);
         for (int i = 0; i < this.values.length; ++i) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 90a2e43..acb4960 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -104,6 +104,10 @@ public class StorageContext {
         return this.enableLimit;
     }
 
+    public int getStoragePushDownLimit() {
+        return this.isLimitEnabled() ? this.getOffset() + this.getLimit() : Integer.MAX_VALUE;
+    }
+    
     public void markSort() {
         this.hasSort = true;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index 9ca53f9..83ee6c7 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -69,10 +69,10 @@ public class CubeSegmentScanner implements IGTScanner {
         }
         scanRequest = scanRangePlanner.planScanRequest();
         if (scanRequest != null) {
-            scanRequest.setAllowPreAggregation(context.isNeedStorageAggregation());
-            scanRequest.setAggrCacheGB(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
-            if (context.isLimitEnabled())
-                scanRequest.setRowLimit(context.getLimit());
+            scanRequest.setAllowStorageAggregation(context.isNeedStorageAggregation());
+            scanRequest.setAggCacheMemThreshold(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
+            scanRequest.setStorageScanRowNumThreshold(context.getThreshold());//TODO: devide by shard number?
+            scanRequest.setStoragePushDownLimit(context.getStoragePushDownLimit());
         }
         scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 68556d6..0f96e3c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -18,10 +18,11 @@
 
 package org.apache.kylin.storage.gtrecord;
 
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
+import java.util.Map.Entry;
 
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.Dictionary;
@@ -36,9 +37,11 @@ import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -62,8 +65,10 @@ public class CubeTupleConverter {
 
     final int nSelectedDims;
 
+    final int[] dimensionIndexOnTuple;
+
     public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
-            Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
+                              Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
         this.cubeSeg = cubeSeg;
         this.cuboid = cuboid;
         this.tupleInfo = returnTupleInfo;
@@ -83,6 +88,20 @@ public class CubeTupleConverter {
         advMeasureFillers = Lists.newArrayListWithCapacity(1);
         advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
 
+        // dimensionIndexOnTuple is for SQL with limit
+        List<Integer> temp = Lists.newArrayList();
+        for (TblColRef dim : cuboid.getColumns()) {
+            if (tupleInfo.hasColumn(dim)) {
+                temp.add(tupleInfo.getColumnIndex(dim));
+            }
+        }
+        dimensionIndexOnTuple = new int[temp.size()];
+        for (int i = 0; i < temp.size(); i++) {
+            dimensionIndexOnTuple[i] = temp.get(i);
+        }
+        
+        ////////////
+
         int iii = 0;
 
         // pre-calculate dimension index mapping to tuple
@@ -90,6 +109,11 @@ public class CubeTupleConverter {
             int i = mapping.getIndexOf(dim);
             gtColIdx[iii] = i;
             tupleIdx[iii] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
+
+            //            if (tupleIdx[iii] == -1) {
+            //                throw new IllegalStateException("dim not used in tuple:" + dim);
+            //            }
+
             iii++;
         }
 
@@ -131,6 +155,44 @@ public class CubeTupleConverter {
         }
     }
 
+    public Comparator<ITuple> getTupleDimensionComparator() {
+        return new Comparator<ITuple>() {
+            @Override
+            public int compare(ITuple o1, ITuple o2) {
+                Preconditions.checkNotNull(o1);
+                Preconditions.checkNotNull(o2);
+                for (int i = 0; i < dimensionIndexOnTuple.length; i++) {
+                    int index = dimensionIndexOnTuple[i];
+
+                    if (index == -1) {
+                        //TODO: 
+                        continue;
+                    }
+
+                    Comparable a = (Comparable) o1.getAllValues()[index];
+                    Comparable b = (Comparable) o2.getAllValues()[index];
+
+                    if (a == null && b == null) {
+                        continue;
+                    } else if (a == null) {
+                        return 1;
+                    } else if (b == null) {
+                        return -1;
+                    } else {
+                        int temp = a.compareTo(b);
+                        if (temp != 0) {
+                            return temp;
+                        } else {
+                            continue;
+                        }
+                    }
+                }
+
+                return 0;
+            }
+        };
+    }
+
     // load only needed dictionaries
     private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) {
         Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
@@ -201,55 +263,55 @@ public class CubeTupleConverter {
             return null;
 
         switch (deriveInfo.type) {
-        case LOOKUP:
-            return new IDerivedColumnFiller() {
-                CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
-                LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.dimension);
-                int[] derivedColIdx = initDerivedColIdx();
-                Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
-
-                private int[] initDerivedColIdx() {
-                    int[] idx = new int[deriveInfo.columns.length];
-                    for (int i = 0; i < idx.length; i++) {
-                        idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex();
+            case LOOKUP:
+                return new IDerivedColumnFiller() {
+                    CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
+                    LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.dimension);
+                    int[] derivedColIdx = initDerivedColIdx();
+                    Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
+
+                    private int[] initDerivedColIdx() {
+                        int[] idx = new int[deriveInfo.columns.length];
+                        for (int i = 0; i < idx.length; i++) {
+                            idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex();
+                        }
+                        return idx;
                     }
-                    return idx;
-                }
 
-                @Override
-                public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
-                    for (int i = 0; i < hostTmpIdx.length; i++) {
-                        lookupKey.data[i] = CubeTupleConverter.toString(gtValues[hostTmpIdx[i]]);
-                    }
+                    @Override
+                    public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
+                        for (int i = 0; i < hostTmpIdx.length; i++) {
+                            lookupKey.data[i] = CubeTupleConverter.toString(gtValues[hostTmpIdx[i]]);
+                        }
 
-                    String[] lookupRow = lookupTable.getRow(lookupKey);
+                        String[] lookupRow = lookupTable.getRow(lookupKey);
 
-                    if (lookupRow != null) {
-                        for (int i = 0; i < derivedTupleIdx.length; i++) {
-                            if (derivedTupleIdx[i] >= 0) {
-                                String value = lookupRow[derivedColIdx[i]];
-                                tuple.setDimensionValue(derivedTupleIdx[i], value);
+                        if (lookupRow != null) {
+                            for (int i = 0; i < derivedTupleIdx.length; i++) {
+                                if (derivedTupleIdx[i] >= 0) {
+                                    String value = lookupRow[derivedColIdx[i]];
+                                    tuple.setDimensionValue(derivedTupleIdx[i], value);
+                                }
                             }
-                        }
-                    } else {
-                        for (int i = 0; i < derivedTupleIdx.length; i++) {
-                            if (derivedTupleIdx[i] >= 0) {
-                                tuple.setDimensionValue(derivedTupleIdx[i], null);
+                        } else {
+                            for (int i = 0; i < derivedTupleIdx.length; i++) {
+                                if (derivedTupleIdx[i] >= 0) {
+                                    tuple.setDimensionValue(derivedTupleIdx[i], null);
+                                }
                             }
                         }
                     }
-                }
-            };
-        case PK_FK:
-            return new IDerivedColumnFiller() {
-                @Override
-                public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
-                    // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns()
-                    tuple.setDimensionValue(derivedTupleIdx[0], CubeTupleConverter.toString(gtValues[hostTmpIdx[0]]));
-                }
-            };
-        default:
-            throw new IllegalArgumentException();
+                };
+            case PK_FK:
+                return new IDerivedColumnFiller() {
+                    @Override
+                    public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
+                        // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns()
+                        tuple.setDimensionValue(derivedTupleIdx[0], CubeTupleConverter.toString(gtValues[hostTmpIdx[0]]));
+                    }
+                };
+            default:
+                throw new IllegalArgumentException();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/FetchSourceAwareIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/FetchSourceAwareIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/FetchSourceAwareIterator.java
new file mode 100644
index 0000000..cb83819
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/FetchSourceAwareIterator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Iterator;
+
+interface FetchSourceAwareIterator<F> extends IFetchSourceAware<F>, Iterator<F> {
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 7acf186..ae5240b 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -101,13 +101,13 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
         boolean exactAggregation = isExactAggregation(cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation);
         context.setExactAggregation(exactAggregation);
-        context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD, exactAggregation));
 
         // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
         TupleFilter filterD = translateDerived(filter, groupsD);
 
+        context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD, exactAggregation));
+        enableStoragePushDownLimit(cuboid, groups, derivedPostAggregation, groupsD, filter, sqlDigest.aggregations, context);
         setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
-        setLimit(filter, context);
 
         List<CubeSegmentScanner> scanners = Lists.newArrayList();
         for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
@@ -227,6 +227,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         return !isExactAggregation;
     }
 
+    //exact aggregation was introduced back when we had some measures (like holistic distinct count) that is sensitive
+    //to post aggregation. Now that we don't have such measure any more, isExactAggregation should be useless (at least in v2 storage and above)
     public boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
         boolean exact = true;
 
@@ -372,11 +374,44 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         }
     }
 
-    private void setLimit(TupleFilter filter, StorageContext context) {
-        boolean goodAggr = context.isExactAggregation();
+    private void enableStoragePushDownLimit(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Collection<FunctionDesc> functionDescs, StorageContext context) {
+        boolean possible = true;
+
         boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
-        boolean goodSort = context.hasSort() == false;
-        if (goodAggr && goodFilter && goodSort) {
+        if (!goodFilter) {
+            possible = false;
+            logger.info("Storage limit push down is impossible because the filter is unevaluatable");
+        }
+
+        boolean goodSort = !context.hasSort();
+        if (!goodSort) {
+            possible = false;
+            logger.info("Storage limit push down is impossible because the query has order by");
+        }
+
+        // derived aggregation is bad, unless expanded columns are already in group by
+        if (!groups.containsAll(derivedPostAggregation)) {
+            possible = false;
+            logger.info("Storage limit push down is impossible because derived column require post aggregation: " + derivedPostAggregation);
+        }
+
+        //if groupsD is clustered at "head" of the rowkey, then limit push down is possible
+        int size = groupsD.size();
+        if (!groupsD.containsAll(cuboid.getColumns().subList(0, size))) {
+            possible = false;
+            logger.info("Storage limit push down is impossible because groupD is not clustered at head, groupsD: " + groupsD //
+                    + " with cuboid columns: " + cuboid.getColumns());
+        }
+
+        //if exists measures like max(cal_dt), then it's not a perfect cuboid match, cannot apply limit
+        for (FunctionDesc functionDesc : functionDescs) {
+            if (functionDesc.isDimensionAsMetric()) {
+                possible = false;
+                logger.info("Storage limit push down is impossible because {} isDimensionAsMetric ", functionDesc);
+            }
+        }
+
+        if (possible) {
             logger.info("Enable limit " + context.getLimit());
             context.enableLimit();
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IFetchSourceAware.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IFetchSourceAware.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IFetchSourceAware.java
new file mode 100644
index 0000000..d51ca4c
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IFetchSourceAware.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Iterator;
+
+public interface IFetchSourceAware<E> {
+    public Iterator<? extends E> getFetchSource();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PeekingImpl.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PeekingImpl.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PeekingImpl.java
new file mode 100644
index 0000000..96a232f
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PeekingImpl.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Iterator;
+
+import com.google.common.collect.PeekingIterator;
+
+/**
+ * copied from guava, change iterator access modifier to public
+ * 
+ * Implementation of PeekingIterator that avoids peeking unless necessary.
+ */
+class PeekingImpl<E> implements PeekingIterator<E> {
+
+    public final Iterator<? extends E> iterator;
+    private boolean hasPeeked;
+    private E peekedElement;
+
+    public PeekingImpl(Iterator<? extends E> iterator) {
+        this.iterator = checkNotNull(iterator);
+    }
+
+    @Override
+    public boolean hasNext() {
+        return hasPeeked || iterator.hasNext();
+    }
+
+    @Override
+    public E next() {
+        if (!hasPeeked) {
+            return iterator.next();
+        }
+        E result = peekedElement;
+        hasPeeked = false;
+        peekedElement = null;
+        return result;
+    }
+
+    @Override
+    public void remove() {
+        checkState(!hasPeeked, "Can't remove after you've peeked at next");
+        iterator.remove();
+    }
+
+    @Override
+    public E peek() {
+        if (!hasPeeked) {
+            peekedElement = iterator.next();
+            hasPeeked = true;
+        }
+        return peekedElement;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
new file mode 100644
index 0000000..61267ae
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+import org.apache.kylin.storage.StorageContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class SegmentCubeTupleIterator implements ITupleIterator {
+
+    private static final Logger logger = LoggerFactory.getLogger(SegmentCubeTupleIterator.class);
+
+    protected final CubeSegmentScanner scanner;
+    protected final Cuboid cuboid;
+    protected final Set<TblColRef> selectedDimensions;
+    protected final Set<FunctionDesc> selectedMetrics;
+    protected final TupleInfo tupleInfo;
+    protected final Tuple tuple;
+    protected final StorageContext context;
+
+    protected Iterator<GTRecord> gtItr;
+    protected CubeTupleConverter cubeTupleConverter;
+    protected Tuple next;
+
+    private List<IAdvMeasureFiller> advMeasureFillers;
+    private int advMeasureRowsRemaining;
+    private int advMeasureRowIndex;
+
+    public SegmentCubeTupleIterator(CubeSegmentScanner scanner, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
+            Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
+        this.scanner = scanner;
+        this.cuboid = cuboid;
+        this.selectedDimensions = selectedDimensions;
+        this.selectedMetrics = selectedMetrics;
+        this.tupleInfo = returnTupleInfo;
+        this.tuple = new Tuple(returnTupleInfo);
+        this.context = context;
+        this.gtItr = getGTItr(scanner);
+        this.cubeTupleConverter = new CubeTupleConverter(scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+    }
+
+    private Iterator<GTRecord> getGTItr(CubeSegmentScanner scanner) {
+        return scanner.iterator();
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (next != null)
+            return true;
+
+        // consume any left rows from advanced measure filler
+        if (advMeasureRowsRemaining > 0) {
+            for (IAdvMeasureFiller filler : advMeasureFillers) {
+                filler.fillTuple(tuple, advMeasureRowIndex);
+            }
+            advMeasureRowIndex++;
+            advMeasureRowsRemaining--;
+            next = tuple;
+            return true;
+        }
+
+        // now we have a GTRecord
+        if (!gtItr.hasNext()) {
+            return false;
+        }
+        GTRecord curRecord = gtItr.next();
+
+        Preconditions.checkNotNull(cubeTupleConverter);
+
+        // translate into tuple
+        advMeasureFillers = cubeTupleConverter.translateResult(curRecord, tuple);
+
+        // the simple case
+        if (advMeasureFillers == null) {
+            next = tuple;
+            return true;
+        }
+
+        // advanced measure filling, like TopN, will produce multiple tuples out of one record
+        advMeasureRowsRemaining = -1;
+        for (IAdvMeasureFiller filler : advMeasureFillers) {
+            if (advMeasureRowsRemaining < 0)
+                advMeasureRowsRemaining = filler.getNumOfRows();
+            if (advMeasureRowsRemaining != filler.getNumOfRows())
+                throw new IllegalStateException();
+        }
+        if (advMeasureRowsRemaining < 0)
+            throw new IllegalStateException();
+
+        advMeasureRowIndex = 0;
+        return hasNext();
+    }
+
+    @Override
+    public ITuple next() {
+        // fetch next record
+        if (next == null) {
+            hasNext();
+            if (next == null)
+                throw new NoSuchElementException();
+        }
+
+        ITuple result = next;
+        next = null;
+        return result;
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close() {
+        close(scanner);
+    }
+
+    protected void close(CubeSegmentScanner scanner) {
+        try {
+            scanner.close();
+        } catch (IOException e) {
+            logger.error("Exception when close CubeScanner", e);
+        }
+    }
+
+    public CubeTupleConverter getCubeTupleConverter() {
+        return cubeTupleConverter;
+    }
+}