You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2016/08/10 04:56:56 UTC

[1/2] kylin git commit: KYLIN-1936 improve enable limit logic (exactAggregation is too strict)

Repository: kylin
Updated Branches:
  refs/heads/master c67891d26 -> 28e942306


http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
index 3c992d2..ff7fb2b 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SequentialCubeTupleIterator.java
@@ -21,153 +21,77 @@ package org.apache.kylin.storage.gtrecord;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
-import java.util.NoSuchElementException;
 import java.util.Set;
 
+import javax.annotation.Nullable;
+
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
-import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
 import org.apache.kylin.storage.StorageContext;
-import org.apache.kylin.storage.exception.ScanOutOfLimitException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
 public class SequentialCubeTupleIterator implements ITupleIterator {
 
     private static final Logger logger = LoggerFactory.getLogger(SequentialCubeTupleIterator.class);
 
-    protected final Cuboid cuboid;
-    protected final Set<TblColRef> selectedDimensions;
-    protected final Set<FunctionDesc> selectedMetrics;
-    protected final TupleInfo tupleInfo;
-    protected final Tuple tuple;
-    protected final Iterator<CubeSegmentScanner> scannerIterator;
-    protected final StorageContext context;
-
-    protected CubeSegmentScanner curScanner;
-    protected Iterator<GTRecord> curRecordIterator;
-    protected CubeTupleConverter curTupleConverter;
-    protected Tuple next;
-
-    private List<IAdvMeasureFiller> advMeasureFillers;
-    private int advMeasureRowsRemaining;
-    private int advMeasureRowIndex;
+    protected List<CubeSegmentScanner> scanners;
+    protected List<SegmentCubeTupleIterator> segmentCubeTupleIterators;
+    protected Iterator<ITuple> tupleIterator;
+    protected final int storagePushDownLimit;
+    protected StorageContext context;
 
     private int scanCount;
     private int scanCountDelta;
 
     public SequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
             Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
-        this.cuboid = cuboid;
-        this.selectedDimensions = selectedDimensions;
-        this.selectedMetrics = selectedMetrics;
-        this.tupleInfo = returnTupleInfo;
-        this.tuple = new Tuple(returnTupleInfo);
-        this.scannerIterator = scanners.iterator();
         this.context = context;
-    }
+        this.scanners = scanners;
 
-    @Override
-    public boolean hasNext() {
-        if (next != null)
-            return true;
-
-        if (hitLimitAndThreshold())
-            return false;
-
-        // consume any left rows from advanced measure filler
-        if (advMeasureRowsRemaining > 0) {
-            for (IAdvMeasureFiller filler : advMeasureFillers) {
-                filler.fillTuple(tuple, advMeasureRowIndex);
-            }
-            advMeasureRowIndex++;
-            advMeasureRowsRemaining--;
-            next = tuple;
-            return true;
+        segmentCubeTupleIterators = Lists.newArrayList();
+        for (CubeSegmentScanner scanner : scanners) {
+            segmentCubeTupleIterators.add(new SegmentCubeTupleIterator(scanner, cuboid, selectedDimensions, selectedMetrics, returnTupleInfo, context));
         }
-
-        // get the next GTRecord
-        if (curScanner == null) {
-            if (scannerIterator.hasNext()) {
-                curScanner = scannerIterator.next();
-                curRecordIterator = curScanner.iterator();
-                if (curRecordIterator.hasNext()) {
-                    //if the segment does not has any tuples, don't bother to create a converter
-                    curTupleConverter = new CubeTupleConverter(curScanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+        
+        this.storagePushDownLimit = context.getStoragePushDownLimit();
+        if (storagePushDownLimit > KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax()) {
+            //normal case
+            tupleIterator = Iterators.concat(segmentCubeTupleIterators.iterator());
+        } else {
+            //query with limit
+            Iterator<Iterator<ITuple>> transformed = Iterators.transform(segmentCubeTupleIterators.iterator(), new Function<SegmentCubeTupleIterator, Iterator<ITuple>>() {
+                @Nullable
+                @Override
+                public Iterator<ITuple> apply(@Nullable SegmentCubeTupleIterator input) {
+                    return input;
                 }
-            } else {
-                return false;
-            }
-        }
-        if (curRecordIterator.hasNext() == false) {
-            close(curScanner);
-            curScanner = null;
-            curRecordIterator = null;
-            curTupleConverter = null;
-            return hasNext();
+            });
+            tupleIterator = new SortedIteratorMergerWithLimit<ITuple>(transformed, storagePushDownLimit, segmentCubeTupleIterators.get(0).getCubeTupleConverter().getTupleDimensionComparator()).getIterator();
         }
-
-        // now we have a GTRecord
-        GTRecord curRecord = curRecordIterator.next();
-
-        // translate into tuple
-        advMeasureFillers = curTupleConverter.translateResult(curRecord, tuple);
-
-        // the simple case
-        if (advMeasureFillers == null) {
-            next = tuple;
-            return true;
-        }
-
-        // advanced measure filling, like TopN, will produce multiple tuples out of one record
-        advMeasureRowsRemaining = -1;
-        for (IAdvMeasureFiller filler : advMeasureFillers) {
-            if (advMeasureRowsRemaining < 0)
-                advMeasureRowsRemaining = filler.getNumOfRows();
-            if (advMeasureRowsRemaining != filler.getNumOfRows())
-                throw new IllegalStateException();
-        }
-        if (advMeasureRowsRemaining < 0)
-            throw new IllegalStateException();
-
-        advMeasureRowIndex = 0;
-        return hasNext();
     }
 
-    private boolean hitLimitAndThreshold() {
-        // check limit
-        if (context.isLimitEnabled() && scanCount >= context.getLimit() + context.getOffset()) {
-            return true;
-        }
-        // check threshold
-        if (scanCount >= context.getThreshold()) {
-            throw new ScanOutOfLimitException("Scan row count exceeded threshold: " + context.getThreshold() + ", please add filter condition to narrow down backend scan range, like where clause.");
-        }
-        return false;
+    @Override
+    public boolean hasNext() {
+        return tupleIterator.hasNext();
     }
 
     @Override
     public ITuple next() {
-        // fetch next record
-        if (next == null) {
-            hasNext();
-            if (next == null)
-                throw new NoSuchElementException();
-        }
-
         scanCount++;
         if (++scanCountDelta >= 1000)
             flushScanCountDelta();
 
-        ITuple result = next;
-        next = null;
-        return result;
+        return tupleIterator.next();
     }
 
     @Override
@@ -181,11 +105,8 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
         // close all the remaining segmentIterator
         flushScanCountDelta();
 
-        if (curScanner != null)
-            close(curScanner);
-
-        while (scannerIterator.hasNext()) {
-            close(scannerIterator.next());
+        for (SegmentCubeTupleIterator iterator : segmentCubeTupleIterators) {
+            iterator.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
new file mode 100644
index 0000000..d5aa9d0
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMerger.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * a merger that utilizes the sorted nature of input iterators
+ */
+public class SortedIteratorMerger<E> {
+
+    private Iterator<Iterator<E>> shardSubsets;
+    private Comparator<E> comparator;
+
+    public SortedIteratorMerger(Iterator<Iterator<E>> shardSubsets, Comparator<E> comparator) {
+        this.shardSubsets = shardSubsets;
+        this.comparator = comparator;
+    }
+
+    public Iterator<E> getIterator() {
+        final PriorityQueue<PeekingImpl<E>> heap = new PriorityQueue<PeekingImpl<E>>(11, new Comparator<PeekingImpl<E>>() {
+            @Override
+            public int compare(PeekingImpl<E> o1, PeekingImpl<E> o2) {
+                return comparator.compare(o1.peek(), o2.peek());
+            }
+        });
+
+        while (shardSubsets.hasNext()) {
+            Iterator<E> iterator = shardSubsets.next();
+            PeekingImpl<E> peekingIterator = new PeekingImpl<>(iterator);
+            if (peekingIterator.hasNext()) {
+                heap.offer(peekingIterator);
+            }
+        }
+
+        return getIteratorInternal(heap);
+    }
+
+    protected Iterator<E> getIteratorInternal(PriorityQueue<PeekingImpl<E>> heap) {
+        return new MergedIterator<E>(heap, comparator);
+    }
+
+    private static class MergedIterator<E> implements Iterator<E> {
+
+        private final PriorityQueue<PeekingImpl<E>> heap;
+        private final Comparator<E> comparator;
+
+        public MergedIterator(PriorityQueue<PeekingImpl<E>> heap, Comparator<E> comparator) {
+            this.heap = heap;
+            this.comparator = comparator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return !heap.isEmpty();
+        }
+
+        @Override
+        public E next() {
+            PeekingImpl<E> poll = heap.poll();
+            E current = poll.next();
+            if (poll.hasNext()) {
+
+                //TODO: remove this check when validated
+                Preconditions.checkState(comparator.compare(current, poll.peek()) < 0, "Not sorted! current: " + current + " Next: " + poll.peek());
+
+                heap.offer(poll);
+            }
+            return current;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+     
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimit.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimit.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimit.java
new file mode 100644
index 0000000..0e40150
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimit.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * the limit here correspond to the the limit in sql
+ * if the SQL ends with "limit N", then each shard will return N "smallest" records
+ * The query sever side will use a heap to pick right records.
+ * 
+ * There're two usage of SortedIteratorMergerWithLimit in kylin
+ * One at GTRecord level and the other at Tuple Level
+ * The first is to deal with cuboid shards among the same segment
+ * and the second is to deal with multiple segments
+ * 
+ * Let's use single-segment as an example:
+ * suppose we have a "limit 2" in SQL, and we have three shards in the segment
+ * the first returns (1,2), the second returns (1,3) and the third returns (2,3)
+ * each subset is guaranteed to be sorted. (that's why it's called "SortedIterator Merger")
+ * SortedIteratorMergerWithLimit will merge these three subsets and return (1,1,2,2)
+ * 
+ */
+public class SortedIteratorMergerWithLimit<E extends Cloneable> extends SortedIteratorMerger<E> {
+    private int limit;
+    private Comparator<E> comparator;
+
+    public SortedIteratorMergerWithLimit(Iterator<Iterator<E>> shardSubsets, int limit, Comparator<E> comparator) {
+        super(shardSubsets, comparator);
+        this.limit = limit;
+        this.comparator = comparator;
+    }
+
+    protected Iterator<E> getIteratorInternal(PriorityQueue<PeekingImpl<E>> heap) {
+        return new MergedIteratorWithLimit<E>(heap, limit, comparator);
+    }
+
+    static class MergedIteratorWithLimit<E extends Cloneable> implements Iterator<E> {
+
+        private final PriorityQueue<PeekingImpl<E>> heap;
+        private final Comparator<E> comparator;
+
+        private boolean nextFetched = false;
+        private E fetched = null;
+        private E last = null;
+
+        private int limit;
+        private int limitProgress = 0;
+
+        private PeekingImpl<E> lastSource = null;
+
+        public MergedIteratorWithLimit(PriorityQueue<PeekingImpl<E>> heap, int limit, Comparator<E> comparator) {
+            this.heap = heap;
+            this.limit = limit;
+            this.comparator = comparator;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (nextFetched) {
+                return true;
+            }
+
+            if (lastSource != null && lastSource.hasNext()) {
+                if (lastSource.hasNext()) {
+                    heap.offer(lastSource);
+                } else {
+                    lastSource = null;
+                }
+            }
+
+            if (!heap.isEmpty()) {
+                PeekingImpl<E> first = heap.poll();
+                E current = first.next();
+                try {
+                    current = (E) current.getClass().getMethod("clone").invoke(current);
+                } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
+                    throw new RuntimeException(e);
+                }
+
+                lastSource = first;
+
+                Preconditions.checkState(current != null);
+
+                if (last == null || comparator.compare(current, last) != 0) {
+                    if (++limitProgress > limit) {
+                        return false;
+                    }
+                }
+                nextFetched = true;
+                fetched = current;
+
+                return true;
+            } else {
+                return false;
+            }
+        }
+
+        @Override
+        public E next() {
+            if (!nextFetched) {
+                throw new IllegalStateException("Should hasNext() before next()");
+            }
+
+            //TODO: remove this check when validated
+            if (last != null) {
+                Preconditions.checkState(comparator.compare(last, fetched) <= 0, "Not sorted! last: " + last + " fetched: " + fetched);
+            }
+
+            last = fetched;
+            nextFetched = false;
+
+            return fetched;
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerTest.java
new file mode 100644
index 0000000..f09844a
--- /dev/null
+++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SortedIteratorMergerTest {
+
+    private Comparator<Integer> getComp() {
+        return new Comparator<Integer>() {
+            @Override
+            public int compare(Integer o1, Integer o2) {
+                return o1 - o2;
+            }
+        };
+    }
+
+    @Test
+    public void basic1() {
+
+        List<Integer> a = Lists.newArrayList(1, 2, 3);
+        List<Integer> b = Lists.newArrayList(1, 2, 3);
+        List<Integer> c = Lists.newArrayList(1, 2, 5);
+        List<Iterator<Integer>> input = Lists.newArrayList();
+        input.add(a.iterator());
+        input.add(b.iterator());
+        input.add(c.iterator());
+        SortedIteratorMerger<Integer> merger = new SortedIteratorMerger<Integer>(input.iterator(), getComp());
+        Iterator<Integer> iterator = merger.getIterator();
+        List<Integer> result = Lists.newArrayList();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+        Assert.assertEquals(Lists.newArrayList(1, 1, 1, 2, 2, 2, 3, 3, 5), result);
+    }
+
+    @Test
+    public void basic2() {
+
+        List<Integer> a = Lists.newArrayList(2);
+        List<Integer> b = Lists.newArrayList();
+        List<Integer> c = Lists.newArrayList(1, 2, 5);
+        List<Iterator<Integer>> input = Lists.newArrayList();
+        input.add(a.iterator());
+        input.add(b.iterator());
+        input.add(c.iterator());
+        SortedIteratorMerger<Integer> merger = new SortedIteratorMerger<Integer>(input.iterator(), getComp());
+        Iterator<Integer> iterator = merger.getIterator();
+        List<Integer> result = Lists.newArrayList();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+        Assert.assertEquals(Lists.newArrayList(1, 2, 2, 5), result);
+    }
+
+    @Test
+    public void basic3() {
+
+        List<Integer> a = Lists.newArrayList();
+        List<Integer> b = Lists.newArrayList();
+        List<Integer> c = Lists.newArrayList();
+        List<Iterator<Integer>> input = Lists.newArrayList();
+        input.add(a.iterator());
+        input.add(b.iterator());
+        input.add(c.iterator());
+        SortedIteratorMerger<Integer> merger = new SortedIteratorMerger<Integer>(input.iterator(), getComp());
+        Iterator<Integer> iterator = merger.getIterator();
+        List<Integer> result = Lists.newArrayList();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+        Assert.assertEquals(Lists.newArrayList(), result);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java
new file mode 100644
index 0000000..1627b4f
--- /dev/null
+++ b/core-storage/src/test/java/org/apache/kylin/storage/gtrecord/SortedIteratorMergerWithLimitTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class SortedIteratorMergerWithLimitTest {
+    class CloneableInteger implements Cloneable {
+        int value;
+
+        public CloneableInteger(int value) {
+            this.value = value;
+        }
+
+        @Override
+        public Object clone() {
+            return new CloneableInteger(value);
+        }
+
+        @Override
+        public int hashCode() {
+            return value;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            CloneableInteger that = (CloneableInteger) o;
+
+            return value == that.value;
+
+        }
+    }
+
+    private Comparator<CloneableInteger> getComp() {
+        return new Comparator<CloneableInteger>() {
+            @Override
+            public int compare(CloneableInteger o1, CloneableInteger o2) {
+                return o1.value - o2.value;
+            }
+        };
+    }
+
+    @Test
+    public void basic1() {
+
+        List<CloneableInteger> a = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(3));
+        List<CloneableInteger> b = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(3));
+        List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(5));
+        List<Iterator<CloneableInteger>> input = Lists.newArrayList();
+        input.add(a.iterator());
+        input.add(b.iterator());
+        input.add(c.iterator());
+        SortedIteratorMergerWithLimit<CloneableInteger> merger = new SortedIteratorMergerWithLimit<CloneableInteger>(input.iterator(), 3, getComp());
+        Iterator<CloneableInteger> iterator = merger.getIterator();
+        List<CloneableInteger> result = Lists.newArrayList();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+        Assert.assertEquals(Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(1), new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(2), new CloneableInteger(2), new CloneableInteger(3), new CloneableInteger(3)), result);
+    }
+
+    @Test
+    public void basic2() {
+
+        List<CloneableInteger> a = Lists.newArrayList(new CloneableInteger(2));
+        List<CloneableInteger> b = Lists.newArrayList();
+        List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(5));
+        List<Iterator<CloneableInteger>> input = Lists.newArrayList();
+        input.add(a.iterator());
+        input.add(b.iterator());
+        input.add(c.iterator());
+        SortedIteratorMergerWithLimit<CloneableInteger> merger = new SortedIteratorMergerWithLimit<CloneableInteger>(input.iterator(), 3, getComp());
+        Iterator<CloneableInteger> iterator = merger.getIterator();
+        List<CloneableInteger> result = Lists.newArrayList();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+        Assert.assertEquals(Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(2), new CloneableInteger(5)), result);
+    }
+
+    @Test(expected = IllegalStateException.class)
+    public void basic3() {
+
+        List<CloneableInteger> a = Lists.newArrayList(new CloneableInteger(2), new CloneableInteger(1));
+        List<CloneableInteger> b = Lists.newArrayList();
+        List<CloneableInteger> c = Lists.newArrayList(new CloneableInteger(1), new CloneableInteger(2), new CloneableInteger(5));
+        List<Iterator<CloneableInteger>> input = Lists.newArrayList();
+        input.add(a.iterator());
+        input.add(b.iterator());
+        input.add(c.iterator());
+        SortedIteratorMergerWithLimit<CloneableInteger> merger = new SortedIteratorMergerWithLimit<CloneableInteger>(input.iterator(), 3, getComp());
+        Iterator<CloneableInteger> iterator = merger.getIterator();
+        List<CloneableInteger> result = Lists.newArrayList();
+        while (iterator.hasNext()) {
+            result.add(iterator.next());
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/kylin-it/src/test/java/org/apache/kylin/query/HackedDbUnitAssert.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/HackedDbUnitAssert.java b/kylin-it/src/test/java/org/apache/kylin/query/HackedDbUnitAssert.java
new file mode 100644
index 0000000..c295430
--- /dev/null
+++ b/kylin-it/src/test/java/org/apache/kylin/query/HackedDbUnitAssert.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.query;
+
+import org.dbunit.DatabaseUnitException;
+import org.dbunit.assertion.DbUnitAssert;
+import org.dbunit.assertion.FailureHandler;
+import org.dbunit.dataset.Column;
+import org.dbunit.dataset.Columns;
+import org.dbunit.dataset.DataSetException;
+import org.dbunit.dataset.ITable;
+import org.dbunit.dataset.ITableMetaData;
+import org.dbunit.dataset.datatype.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * dirty hack to support checking result of SQL with limit
+ */
+public class HackedDbUnitAssert extends DbUnitAssert {
+    private static final Logger logger = LoggerFactory.getLogger(HackedDbUnitAssert.class);
+
+    public void assertEquals(ITable expectedTable, ITable actualTable, FailureHandler failureHandler) throws DatabaseUnitException {
+        logger.trace("assertEquals(expectedTable, actualTable, failureHandler) - start");
+        logger.debug("assertEquals: expectedTable={}", expectedTable);
+        logger.debug("assertEquals: actualTable={}", actualTable);
+        logger.debug("assertEquals: failureHandler={}", failureHandler);
+
+        // Do not continue if same instance
+        if (expectedTable == actualTable) {
+            logger.debug("The given tables reference the same object. Will return immediately. (Table={})", expectedTable);
+            return;
+        }
+
+        if (failureHandler == null) {
+            logger.debug("FailureHandler is null. Using default implementation");
+            failureHandler = getDefaultFailureHandler();
+        }
+
+        ITableMetaData expectedMetaData = expectedTable.getTableMetaData();
+        ITableMetaData actualMetaData = actualTable.getTableMetaData();
+        String expectedTableName = expectedMetaData.getTableName();
+
+        //        // Verify row count
+        //        int expectedRowsCount = expectedTable.getRowCount();
+        //        int actualRowsCount = actualTable.getRowCount();
+        //        if (expectedRowsCount != actualRowsCount) {
+        //            String msg = "row count (table=" + expectedTableName + ")";
+        //            Error error =
+        //                    failureHandler.createFailure(msg, String
+        //                            .valueOf(expectedRowsCount), String
+        //                            .valueOf(actualRowsCount));
+        //            logger.error(error.toString());
+        //            throw error;
+        //        }
+
+        // if both tables are empty, it is not necessary to compare columns, as
+        // such
+        // comparison
+        // can fail if column metadata is different (which could occurs when
+        // comparing empty tables)
+        if (expectedTable.getRowCount() == 0 &&  actualTable.getRowCount() == 0) {
+            logger.debug("Tables are empty, hence equals.");
+            return;
+        }
+
+        // Put the columns into the same order
+        Column[] expectedColumns = Columns.getSortedColumns(expectedMetaData);
+        Column[] actualColumns = Columns.getSortedColumns(actualMetaData);
+
+        // Verify columns
+        Columns.ColumnDiff columnDiff = Columns.getColumnDiff(expectedMetaData, actualMetaData);
+        if (columnDiff.hasDifference()) {
+            String message = columnDiff.getMessage();
+            Error error = failureHandler.createFailure(message, Columns.getColumnNamesAsString(expectedColumns), Columns.getColumnNamesAsString(actualColumns));
+            logger.error(error.toString());
+            throw error;
+        }
+
+        // Get the datatypes to be used for comparing the sorted columns
+        ComparisonColumn[] comparisonCols = getComparisonColumns(expectedTableName, expectedColumns, actualColumns, failureHandler);
+
+        // Finally compare the data
+        compareData(expectedTable, actualTable, comparisonCols, failureHandler);
+    }
+
+    protected void compareData(ITable expectedTable, ITable actualTable, ComparisonColumn[] comparisonCols, FailureHandler failureHandler) throws DataSetException {
+        logger.debug("compareData(expectedTable={}, actualTable={}, " + "comparisonCols={}, failureHandler={}) - start", new Object[] { expectedTable, actualTable, comparisonCols, failureHandler });
+
+        if (expectedTable == null) {
+            throw new NullPointerException("The parameter 'expectedTable' must not be null");
+        }
+        if (actualTable == null) {
+            throw new NullPointerException("The parameter 'actualTable' must not be null");
+        }
+        if (comparisonCols == null) {
+            throw new NullPointerException("The parameter 'comparisonCols' must not be null");
+        }
+        if (failureHandler == null) {
+            throw new NullPointerException("The parameter 'failureHandler' must not be null");
+        }
+
+        for (int index = 0; index < actualTable.getRowCount(); index++) {
+            if (!findRowInExpectedTable(expectedTable, actualTable, comparisonCols, failureHandler, index)) {
+                throw new IllegalStateException();
+            }
+        }
+
+    }
+
+    private boolean findRowInExpectedTable(ITable expectedTable, ITable actualTable, ComparisonColumn[] comparisonCols, FailureHandler failureHandler, int index) throws DataSetException {
+
+        // iterate over all rows
+        for (int i = 0; i < expectedTable.getRowCount(); i++) {
+
+            // iterate over all columns of the current row
+            for (int j = 0; j < comparisonCols.length; j++) {
+                ComparisonColumn compareColumn = comparisonCols[j];
+
+                String columnName = compareColumn.getColumnName();
+                DataType dataType = compareColumn.getDataType();
+
+                Object expectedValue = expectedTable.getValue(i, columnName);
+                Object actualValue = actualTable.getValue(index, columnName);
+
+                // Compare the values
+                if (skipCompare(columnName, expectedValue, actualValue)) {
+                    if (logger.isTraceEnabled()) {
+                        logger.trace("ignoring comparison " + expectedValue + "=" + actualValue + " on column " + columnName);
+                    }
+                    continue;
+                }
+
+                if (dataType.compare(expectedValue, actualValue) != 0) {
+                    break;
+
+                    //                    Difference diff = new Difference(expectedTable, actualTable, i, columnName, expectedValue, actualValue);
+                    //
+                    //                    // Handle the difference (throw error immediately or something else)
+                    //                    failureHandler.handle(diff);
+                } else {
+                    if (j == comparisonCols.length - 1) {
+                        return true;
+                    } else {
+                        continue;
+                    }
+                }
+            }
+        }
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
index a6e7956..2c428ec 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java
@@ -35,6 +35,7 @@ import org.apache.kylin.query.enumerator.OLAPQuery;
 import org.apache.kylin.query.relnode.OLAPContext;
 import org.apache.kylin.query.routing.Candidate;
 import org.apache.kylin.query.schema.OLAPSchemaFactory;
+import org.apache.kylin.storage.hbase.HBaseStorage;
 import org.apache.kylin.storage.hbase.cube.v1.coprocessor.observer.ObserverEnabler;
 import org.dbunit.database.DatabaseConnection;
 import org.dbunit.database.IDatabaseConnection;
@@ -123,7 +124,7 @@ public class ITKylinQueryTest extends KylinTestBase {
     @Test
     public void testSingleExecuteQuery() throws Exception {
 
-        String queryFileName = getQueryFolderPrefix() + "src/test/resources/query/sql_tableau/query20.sql";
+        String queryFileName = getQueryFolderPrefix() + "src/test/resources/query/temp/query01.sql";
 
         File sqlFile = new File(queryFileName);
         String sql = getTextFromFile(sqlFile);
@@ -187,7 +188,7 @@ public class ITKylinQueryTest extends KylinTestBase {
     @Test
     public void testPreciselyDistinctCountQuery() throws Exception {
         if ("left".equalsIgnoreCase(joinType)) {
-            execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_distinct_precisely", null, true);
+            execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/temp", null, true);
         }
     }
 
@@ -257,6 +258,13 @@ public class ITKylinQueryTest extends KylinTestBase {
     }
 
     @Test
+    public void testLimitCorrectness() throws Exception {
+        if (HBaseStorage.overwriteStorageQuery == null) {//v1 query engine will not work
+            execLimitAndValidate(getQueryFolderPrefix() + "src/test/resources/query/sql");
+        }
+    }
+
+    @Test
     public void testTopNQuery() throws Exception {
         if ("left".equalsIgnoreCase(joinType)) {
             this.execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_topn", null, true);

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
----------------------------------------------------------------------
diff --git a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
index 0511971..4e59815 100644
--- a/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
+++ b/kylin-it/src/test/java/org/apache/kylin/query/KylinTestBase.java
@@ -34,6 +34,7 @@ import java.sql.Statement;
 import java.sql.Timestamp;
 import java.sql.Types;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
@@ -62,6 +63,33 @@ import com.google.common.io.Files;
  */
 public class KylinTestBase {
 
+    class ObjectArray {
+        Object[] data;
+
+        public ObjectArray(Object[] data) {
+            this.data = data;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            ObjectArray that = (ObjectArray) o;
+
+            // Probably incorrect - comparing Object[] arrays with Arrays.equals
+            return Arrays.equals(data, that.data);
+
+        }
+
+        @Override
+        public int hashCode() {
+            return Arrays.hashCode(data);
+        }
+    }
+
     // Hack for the different constant integer type between optiq (INTEGER) and
     // h2 (BIGINT)
     public static class TestH2DataTypeFactory extends H2DataTypeFactory {
@@ -357,6 +385,50 @@ public class KylinTestBase {
         }
     }
 
+    protected void execLimitAndValidate(String queryFolder) throws Exception {
+        printInfo("---------- test folder: " + new File(queryFolder).getAbsolutePath());
+
+        int appendLimitQueries = 0;
+        List<File> sqlFiles = getFilesFromFolder(new File(queryFolder), ".sql");
+        for (File sqlFile : sqlFiles) {
+            String queryName = StringUtils.split(sqlFile.getName(), '.')[0];
+            String sql = getTextFromFile(sqlFile);
+
+            String sqlWithLimit;
+            if (sql.toLowerCase().contains("limit ")) {
+                sqlWithLimit = sql;
+            } else {
+                sqlWithLimit = sql + " limit 5";
+                appendLimitQueries++;
+            }
+
+            // execute Kylin
+            printInfo("Query Result from Kylin - " + queryName + "  (" + queryFolder + ")");
+            IDatabaseConnection kylinConn = new DatabaseConnection(cubeConnection);
+            ITable kylinTable = executeQuery(kylinConn, queryName, sqlWithLimit, false);
+
+            // execute H2
+            printInfo("Query Result from H2 - " + queryName);
+            H2Connection h2Conn = new H2Connection(h2Connection, null);
+            h2Conn.getConfig().setProperty(DatabaseConfig.PROPERTY_DATATYPE_FACTORY, new TestH2DataTypeFactory());
+            ITable h2Table = executeQuery(h2Conn, queryName, sql, false);
+
+            try {
+                HackedDbUnitAssert hackedDbUnitAssert = new HackedDbUnitAssert();
+                hackedDbUnitAssert.assertEquals(h2Table, kylinTable);
+            } catch (Throwable t) {
+                printInfo("execAndCompQuery failed on: " + sqlFile.getAbsolutePath());
+                throw t;
+            }
+
+            compQueryCount++;
+            if (kylinTable.getRowCount() == 0) {
+                zeroResultQueries.add(sql);
+            }
+        }
+        printInfo("Queries appended with limit: " + appendLimitQueries);
+    }
+
     protected void execAndCompQuery(String queryFolder, String[] exclusiveQuerys, boolean needSort) throws Exception {
         printInfo("---------- test folder: " + new File(queryFolder).getAbsolutePath());
         Set<String> exclusiveSet = buildExclusiveSet(exclusiveQuerys);

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
index da8e7ce..3fdb92f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/HBaseScannerBenchmark.java
@@ -29,7 +29,7 @@ import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTInfo.Builder;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTSampleCodeSystem;
-import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.IGTWriter;
 import org.apache.kylin.gridtable.benchmark.SortedGTRecordGenerator;
@@ -109,7 +109,7 @@ public class HBaseScannerBenchmark {
     private void testScanRaw(String msg) throws IOException {
         long t = System.currentTimeMillis();
 
-        IGTScanner scan = simpleStore.scan(new GTScanRequest(info, null, null, null));
+        IGTScanner scan = simpleStore.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
         ResultScanner innerScanner = ((SimpleHBaseStore.Reader) scan).getHBaseScanner();
         int count = 0;
         for (Result r : innerScanner) {
@@ -125,7 +125,7 @@ public class HBaseScannerBenchmark {
     private void testScanRecords(String msg) throws IOException {
         long t = System.currentTimeMillis();
 
-        IGTScanner scan = simpleStore.scan(new GTScanRequest(info, null, null, null));
+        IGTScanner scan = simpleStore.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
         int count = 0;
         for (GTRecord rec : scan) {
             count++;

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
index 1cebdea..4c599d9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
@@ -21,20 +21,11 @@ package org.apache.kylin.storage.hbase.cube.v2;
 import java.io.IOException;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
-import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.zip.DataFormatException;
 
-import javax.annotation.Nullable;
-
-import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
@@ -52,7 +43,6 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.gridtable.GTInfo;
-import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRange;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
@@ -67,8 +57,6 @@ import org.apache.kylin.storage.hbase.cube.v2.coprocessor.endpoint.generated.Cub
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.HBaseZeroCopyByteString;
@@ -79,153 +67,6 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
 
     private static ExecutorService executorService = new LoggableCachedThreadPool();
 
-    static class ExpectedSizeIterator implements Iterator<byte[]> {
-
-        BlockingQueue<byte[]> queue;
-
-        int expectedSize;
-        int current = 0;
-        long timeout;
-        long timeoutTS;
-        volatile Throwable coprocException;
-
-        public ExpectedSizeIterator(int expectedSize) {
-            this.expectedSize = expectedSize;
-            this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
-
-            Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
-            this.timeout = hconf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5) * hconf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60000);
-            this.timeout = Math.max(this.timeout, 5 * 60000);
-            this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes();
-
-            if (BackdoorToggles.getQueryTimeout() != -1) {
-                this.timeout = BackdoorToggles.getQueryTimeout();
-            }
-
-            this.timeout *= 1.1; // allow for some delay
-
-            logger.info("Timeout for ExpectedSizeIterator is: " + this.timeout);
-
-            this.timeoutTS = System.currentTimeMillis() + this.timeout;
-        }
-
-        @Override
-        public boolean hasNext() {
-            return (current < expectedSize);
-        }
-
-        @Override
-        public byte[] next() {
-            if (current >= expectedSize) {
-                throw new IllegalStateException("Won't have more data");
-            }
-            try {
-                current++;
-                byte[] ret = null;
-
-                while (ret == null && coprocException == null && timeoutTS - System.currentTimeMillis() > 0) {
-                    ret = queue.poll(5000, TimeUnit.MILLISECONDS);
-                }
-
-                if (coprocException != null) {
-                    throw new RuntimeException("Error in coprocessor", coprocException);
-                } else if (ret == null) {
-                    throw new RuntimeException("Timeout visiting cube!");
-                } else {
-                    return ret;
-                }
-            } catch (InterruptedException e) {
-                throw new RuntimeException("Error when waiting queue", e);
-            }
-        }
-
-        @Override
-        public void remove() {
-            throw new NotImplementedException();
-        }
-
-        public void append(byte[] data) {
-            try {
-                queue.put(data);
-            } catch (InterruptedException e) {
-                throw new RuntimeException("error when waiting queue", e);
-            }
-        }
-
-        public long getTimeout() {
-            return timeout;
-        }
-
-        public void notifyCoprocException(Throwable ex) {
-            coprocException = ex;
-        }
-    }
-
-    static class EndpointResultsAsGTScanner implements IGTScanner {
-        private GTInfo info;
-        private Iterator<byte[]> blocks;
-        private ImmutableBitSet columns;
-        private long totalScannedCount;
-
-        public EndpointResultsAsGTScanner(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns, long totalScannedCount) {
-            this.info = info;
-            this.blocks = blocks;
-            this.columns = columns;
-            this.totalScannedCount = totalScannedCount;
-        }
-
-        @Override
-        public GTInfo getInfo() {
-            return info;
-        }
-
-        @Override
-        public long getScannedRowCount() {
-            return totalScannedCount;
-        }
-
-        @Override
-        public void close() throws IOException {
-            //do nothing
-        }
-
-        @Override
-        public Iterator<GTRecord> iterator() {
-            return Iterators.concat(Iterators.transform(blocks, new Function<byte[], Iterator<GTRecord>>() {
-                @Nullable
-                @Override
-                public Iterator<GTRecord> apply(@Nullable final byte[] input) {
-
-                    return new Iterator<GTRecord>() {
-                        private ByteBuffer inputBuffer = null;
-                        private GTRecord oneRecord = null;
-
-                        @Override
-                        public boolean hasNext() {
-                            if (inputBuffer == null) {
-                                inputBuffer = ByteBuffer.wrap(input);
-                                oneRecord = new GTRecord(info);
-                            }
-
-                            return inputBuffer.position() < inputBuffer.limit();
-                        }
-
-                        @Override
-                        public GTRecord next() {
-                            oneRecord.loadColumns(columns, inputBuffer);
-                            return oneRecord;
-                        }
-
-                        @Override
-                        public void remove() {
-                            throw new UnsupportedOperationException();
-                        }
-                    };
-                }
-            }));
-        }
-    }
-
     public CubeHBaseEndpointRPC(CubeSegment cubeSeg, Cuboid cuboid, GTInfo fullGTInfo) {
         super(cubeSeg, cuboid, fullGTInfo);
     }
@@ -345,7 +186,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
         builder.setRowkeyPreambleSize(cubeSeg.getRowKeyPreambleSize());
         builder.setBehavior(toggle);
         builder.setStartTime(System.currentTimeMillis());
-        builder.setTimeout(epResultItr.getTimeout());
+        builder.setTimeout(epResultItr.getRpcTimeout());
         builder.setKylinProperties(kylinConfig.getConfigAsString());
 
         for (final Pair<byte[], byte[]> epRange : getEPKeyRanges(cuboidBaseShard, shardNum, totalShards)) {
@@ -407,7 +248,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
                     }
 
                     if (abnormalFinish[0]) {
-                        Throwable ex = new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout, failing current query...");
+                        Throwable ex = new RuntimeException(logHeader + "The coprocessor thread stopped itself due to scan timeout or scan threshold(check region server log), failing current query...");
                         logger.error(logHeader + "Error when visiting cubes by endpoint", ex); // double log coz the query thread may already timeout
                         epResultItr.notifyCoprocException(ex);
                         return;
@@ -416,7 +257,7 @@ public class CubeHBaseEndpointRPC extends CubeHBaseRPC {
             });
         }
 
-        return new EndpointResultsAsGTScanner(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get());
+        return new GTBlobScatter(fullGTInfo, epResultItr, scanRequest.getColumns(), totalScannedCount.get(), scanRequest.getStoragePushDownLimit());
     }
 
     private String getStatsString(byte[] region, CubeVisitResponse result) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
new file mode 100644
index 0000000..4e0d15e
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/ExpectedSizeIterator.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.debug.BackdoorToggles;
+import org.apache.kylin.storage.hbase.HBaseConnection;
+
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+class ExpectedSizeIterator implements Iterator<byte[]> {
+
+    BlockingQueue<byte[]> queue;
+
+    int expectedSize;
+    int current = 0;
+    long rpcTimeout;
+    long timeout;
+    long timeoutTS;
+    volatile Throwable coprocException;
+
+    public ExpectedSizeIterator(int expectedSize) {
+        this.expectedSize = expectedSize;
+        this.queue = new ArrayBlockingQueue<byte[]>(expectedSize);
+
+        Configuration hconf = HBaseConnection.getCurrentHBaseConfiguration();
+        this.rpcTimeout = hconf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+        this.timeout = this.rpcTimeout * hconf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+        CubeHBaseEndpointRPC.logger.info("rpc timeout is {} and after multiply retry times become {}", this.rpcTimeout, this.timeout);
+        this.timeout = Math.max(this.timeout, 5 * 60000);
+        this.timeout *= KylinConfig.getInstanceFromEnv().getCubeVisitTimeoutTimes();
+
+        if (BackdoorToggles.getQueryTimeout() != -1) {
+            this.timeout = BackdoorToggles.getQueryTimeout();
+        }
+
+        this.timeout *= 1.1; // allow for some delay
+
+        CubeHBaseEndpointRPC.logger.info("Final Timeout for ExpectedSizeIterator is: " + this.timeout);
+
+        this.timeoutTS = System.currentTimeMillis() + this.timeout;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return (current < expectedSize);
+    }
+
+    @Override
+    public byte[] next() {
+        if (current >= expectedSize) {
+            throw new IllegalStateException("Won't have more data");
+        }
+        try {
+            current++;
+            byte[] ret = null;
+
+            while (ret == null && coprocException == null && timeoutTS > System.currentTimeMillis()) {
+                ret = queue.poll(5000, TimeUnit.MILLISECONDS);
+            }
+
+            if (coprocException != null) {
+                throw new RuntimeException("Error in coprocessor", coprocException);
+            } else if (ret == null) {
+                throw new RuntimeException("Timeout visiting cube!");
+            } else {
+                return ret;
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException("Error when waiting queue", e);
+        }
+    }
+
+    @Override
+    public void remove() {
+        throw new NotImplementedException();
+    }
+
+    public void append(byte[] data) {
+        try {
+            queue.put(data);
+        } catch (InterruptedException e) {
+            throw new RuntimeException("error when waiting queue", e);
+        }
+    }
+
+    public long getRpcTimeout() {
+        return this.rpcTimeout;
+    }
+
+    public void notifyCoprocException(Throwable ex) {
+        coprocException = ex;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java
new file mode 100644
index 0000000..631510e
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/GTBlobScatter.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.hbase.cube.v2;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Iterator;
+
+import javax.annotation.Nullable;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.gridtable.GTInfo;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.storage.gtrecord.SortedIteratorMergerWithLimit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+
+/**
+ * scatter the blob returned from region server to a iterable of gtrecords
+ */
+class GTBlobScatter implements IGTScanner {
+
+    private static final Logger logger = LoggerFactory.getLogger(GTBlobScatter.class);
+
+    private GTInfo info;
+    private Iterator<byte[]> blocks;
+    private ImmutableBitSet columns;
+    private long totalScannedCount;
+    private int storagePushDownLimit = -1;
+
+    public GTBlobScatter(GTInfo info, Iterator<byte[]> blocks, ImmutableBitSet columns, long totalScannedCount, int storagePushDownLimit) {
+        this.info = info;
+        this.blocks = blocks;
+        this.columns = columns;
+        this.totalScannedCount = totalScannedCount;
+        this.storagePushDownLimit = storagePushDownLimit;
+    }
+
+    @Override
+    public GTInfo getInfo() {
+        return info;
+    }
+
+    @Override
+    public long getScannedRowCount() {
+        return totalScannedCount;
+    }
+
+    @Override
+    public void close() throws IOException {
+        //do nothing
+    }
+
+    @Override
+    public Iterator<GTRecord> iterator() {
+        Iterator<Iterator<GTRecord>> shardSubsets = Iterators.transform(blocks, new GTBlobScatterFunc());
+        if (storagePushDownLimit <= KylinConfig.getInstanceFromEnv().getStoragePushDownLimitMax()) {
+            return new SortedIteratorMergerWithLimit<GTRecord>(shardSubsets, storagePushDownLimit, GTRecord.getPrimaryKeyComparator()).getIterator();
+        } else {
+            return Iterators.concat(shardSubsets);
+        }
+    }
+
+    class GTBlobScatterFunc implements Function<byte[], Iterator<GTRecord>> {
+        @Nullable
+        @Override
+        public Iterator<GTRecord> apply(@Nullable final byte[] input) {
+
+            return new Iterator<GTRecord>() {
+                private ByteBuffer inputBuffer = null;
+                //rotate between two buffer GTRecord to support SortedIteratorMergerWithLimit, which will peek one more GTRecord
+                private GTRecord firstRecord = null;
+                private GTRecord secondRecord = null;
+                private GTRecord thirdRecord = null;
+                private GTRecord fourthRecord = null;
+                private int counter = 0;
+
+                @Override
+                public boolean hasNext() {
+                    if (inputBuffer == null) {
+                        inputBuffer = ByteBuffer.wrap(input);
+                        firstRecord = new GTRecord(info);
+                        secondRecord = new GTRecord(info);
+                        thirdRecord = new GTRecord(info);
+                        fourthRecord = new GTRecord(info);
+                    }
+
+                    return inputBuffer.position() < inputBuffer.limit();
+                }
+
+                @Override
+                public GTRecord next() {
+                    firstRecord.loadColumns(columns, inputBuffer);
+                    //logger.info("A GTRecord: " + System.identityHashCode(this) + " " + firstRecord + " " + System.identityHashCode(firstRecord));
+                    return firstRecord;
+                    //                    GTRecord temp = new GTRecord(info);
+                    //                    temp.loadColumns(columns, inputBuffer);
+                    //                    return temp;
+
+                    //                    counter++;
+                    //                    int index = counter % 4;
+                    //                    if (index == 1) {
+                    //                        firstRecord.loadColumns(columns, inputBuffer);
+                    //                        //logger.info("A GTRecord: " + System.identityHashCode(this) + " " + firstRecord + " " + System.identityHashCode(firstRecord));
+                    //                        return firstRecord;
+                    //                    } else if (index == 2) {
+                    //                        secondRecord.loadColumns(columns, inputBuffer);
+                    //                        //logger.info("B GTRecord: " + System.identityHashCode(this) + " " + secondRecord + " " + System.identityHashCode(secondRecord));
+                    //                        return secondRecord;
+                    //                    } else if (index == 3) {
+                    //                        thirdRecord.loadColumns(columns, inputBuffer);
+                    //                        //logger.info("C GTRecord: " + System.identityHashCode(this) + " " + thirdRecord + " " + System.identityHashCode(thirdRecord));
+                    //                        return thirdRecord;
+                    //                    } else {
+                    //                        fourthRecord.loadColumns(columns, inputBuffer);
+                    //                        //logger.info("D GTRecord: " + System.identityHashCode(this) + " " + fourthRecord + " " + System.identityHashCode(fourthRecord));
+                    //                        return fourthRecord;
+                    //                    }
+                }
+
+                @Override
+                public void remove() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 5b7a26a..cbccac6 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -48,7 +48,9 @@ import org.apache.kylin.common.util.CompressionUtils;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.dimension.DimensionEncoding;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.gridtable.GTScanExceedThresholdException;
 import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanTimeoutException;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.IGTStore;
 import org.apache.kylin.measure.BufferedMeasureEncoder;
@@ -235,13 +237,12 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             }
 
             if (behavior.ordinal() < CoprocessorBehavior.SCAN_FILTER_AGGR_CHECKMEM.ordinal()) {
-                scanReq.setAggrCacheGB(0); // disable mem check if so told
+                scanReq.setAggCacheMemThreshold(0); // disable mem check if so told
             }
 
             final MutableBoolean scanNormalComplete = new MutableBoolean(true);
-            final long startTime = this.serviceStartTime;
-            final long timeout = request.getTimeout();
-            final int rowLimit = scanReq.getRowLimit();
+            final long deadline = request.getTimeout() + this.serviceStartTime;
+            final long storagePushDownLimit = scanReq.getStoragePushDownLimit();
 
             final CellListIterator cellListIterator = new CellListIterator() {
 
@@ -256,19 +257,13 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
 
                 @Override
                 public boolean hasNext() {
-                    if (rowLimit > 0 && rowLimit <= counter)
-                        return false;
 
-                    if (counter % 100000 == 1) {
-                        if (System.currentTimeMillis() - startTime > timeout) {
-                            scanNormalComplete.setValue(false);
-                            logger.error("scanner aborted because timeout");
-                            return false;
-                        }
+                    if (counter > scanReq.getStorageScanRowNumThreshold()) {
+                        throw new GTScanExceedThresholdException("Exceed scan threshold at " + counter);
                     }
 
                     if (counter % 100000 == 1) {
-                        logger.info("Scanned " + counter + " rows.");
+                        logger.info("Scanned " + counter + " rows from HBase.");
                     }
                     counter++;
                     return allCellLists.hasNext();
@@ -290,38 +285,47 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             IGTScanner rawScanner = store.scan(scanReq);
             IGTScanner finalScanner = scanReq.decorateScanner(rawScanner, //
                     behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER.ordinal(), //
-                    behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal());
+                    behavior.ordinal() >= CoprocessorBehavior.SCAN_FILTER_AGGR.ordinal(), deadline);
 
             ByteBuffer buffer = ByteBuffer.allocate(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);
 
             ByteArrayOutputStream outputStream = new ByteArrayOutputStream(BufferedMeasureEncoder.DEFAULT_BUFFER_SIZE);//ByteArrayOutputStream will auto grow
             int finalRowCount = 0;
-            for (GTRecord oneRecord : finalScanner) {
 
-                if (!scanNormalComplete.booleanValue()) {
-                    logger.error("aggregate iterator aborted because input iterator aborts");
-                    break;
-                }
+            try {
+                for (GTRecord oneRecord : finalScanner) {
 
-                if (finalRowCount % 100000 == 1) {
-                    if (System.currentTimeMillis() - startTime > timeout) {
-                        logger.error("aggregate iterator aborted because timeout");
+                    if (finalRowCount > storagePushDownLimit) {
+                        logger.info("The finalScanner aborted because storagePushDownLimit is satisfied");
                         break;
                     }
-                }
 
-                buffer.clear();
-                try {
-                    oneRecord.exportColumns(scanReq.getColumns(), buffer);
-                } catch (BufferOverflowException boe) {
-                    buffer = ByteBuffer.allocate(oneRecord.sizeOf(scanReq.getColumns()) * 2);
-                    oneRecord.exportColumns(scanReq.getColumns(), buffer);
-                }
+                    if (finalRowCount % 100000 == 1) {
+                        if (System.currentTimeMillis() > deadline) {
+                            throw new GTScanTimeoutException("finalScanner timeouts after scanned " + finalRowCount);
+                        }
+                    }
 
-                outputStream.write(buffer.array(), 0, buffer.position());
-                finalRowCount++;
+                    buffer.clear();
+                    try {
+                        oneRecord.exportColumns(scanReq.getColumns(), buffer);
+                    } catch (BufferOverflowException boe) {
+                        buffer = ByteBuffer.allocate(oneRecord.sizeOf(scanReq.getColumns()) * 2);
+                        oneRecord.exportColumns(scanReq.getColumns(), buffer);
+                    }
+
+                    outputStream.write(buffer.array(), 0, buffer.position());
+                    finalRowCount++;
+                }
+            } catch (GTScanTimeoutException e) {
+                scanNormalComplete.setValue(false);
+                logger.info("The cube visit did not finish normally because scan timeout", e);
+            } catch (GTScanExceedThresholdException e) {
+                scanNormalComplete.setValue(false);
+                logger.info("The cube visit did not finish normally because scan num exceeds threshold", e);
+            } finally {
+                finalScanner.close();
             }
-            finalScanner.close();
 
             appendProfileInfo(sb, "agg done");
 


[2/2] kylin git commit: KYLIN-1936 improve enable limit logic (exactAggregation is too strict)

Posted by ma...@apache.org.
KYLIN-1936 improve enable limit logic (exactAggregation is too strict)


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/28e94230
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/28e94230
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/28e94230

Branch: refs/heads/master
Commit: 28e942306d545ff3b5f604e8225bacad650ad8ac
Parents: c67891d
Author: Hongbin Ma <ma...@apache.org>
Authored: Fri Aug 5 12:50:27 2016 +0800
Committer: Hongbin Ma <ma...@apache.org>
Committed: Wed Aug 10 12:56:32 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |   6 +-
 .../inmemcubing/AbstractInMemCubeBuilder.java   |   3 +-
 .../cube/inmemcubing/DoggedCubeBuilder.java     |   4 +-
 .../cube/inmemcubing/InMemCubeBuilder.java      |   9 +-
 .../kylin/gridtable/GTAggregateScanner.java     |  38 ++++-
 .../java/org/apache/kylin/gridtable/GTInfo.java |   2 +-
 .../org/apache/kylin/gridtable/GTRecord.java    |  33 +++-
 .../GTScanExceedThresholdException.java         |  26 +++
 .../kylin/gridtable/GTScanRangePlanner.java     |   2 +-
 .../apache/kylin/gridtable/GTScanRequest.java   |  92 +++++-----
 .../kylin/gridtable/GTScanRequestBuilder.java   |  97 +++++++++++
 .../kylin/gridtable/GTScanTimeoutException.java |  26 +++
 .../gridtable/benchmark/GTScannerBenchmark.java |   5 +-
 .../benchmark/GTScannerBenchmark2.java          |   5 +-
 .../inmemcubing/ConcurrentDiskStoreTest.java    |   4 +-
 .../cube/inmemcubing/MemDiskStoreTest.java      |   4 +-
 .../gridtable/AggregationCacheSpillTest.java    |  12 +-
 .../kylin/gridtable/DictGridTableTest.java      |  10 +-
 .../kylin/gridtable/SimpleGridTableTest.java    |   4 +-
 .../org/apache/kylin/metadata/tuple/ITuple.java |   2 +-
 .../org/apache/kylin/metadata/tuple/Tuple.java  |   5 +
 .../apache/kylin/storage/StorageContext.java    |   4 +
 .../storage/gtrecord/CubeSegmentScanner.java    |   8 +-
 .../storage/gtrecord/CubeTupleConverter.java    | 148 +++++++++++-----
 .../gtrecord/FetchSourceAwareIterator.java      |  24 +++
 .../gtrecord/GTCubeStorageQueryBase.java        |  47 +++++-
 .../storage/gtrecord/IFetchSourceAware.java     |  25 +++
 .../kylin/storage/gtrecord/PeekingImpl.java     |  73 ++++++++
 .../gtrecord/SegmentCubeTupleIterator.java      | 162 ++++++++++++++++++
 .../gtrecord/SequentialCubeTupleIterator.java   | 151 ++++-------------
 .../storage/gtrecord/SortedIteratorMerger.java  | 100 +++++++++++
 .../gtrecord/SortedIteratorMergerWithLimit.java | 143 ++++++++++++++++
 .../gtrecord/SortedIteratorMergerTest.java      |  97 +++++++++++
 .../SortedIteratorMergerWithLimitTest.java      | 127 ++++++++++++++
 .../apache/kylin/query/HackedDbUnitAssert.java  | 169 +++++++++++++++++++
 .../apache/kylin/query/ITKylinQueryTest.java    |  12 +-
 .../org/apache/kylin/query/KylinTestBase.java   |  72 ++++++++
 .../hbase/cube/HBaseScannerBenchmark.java       |   6 +-
 .../hbase/cube/v2/CubeHBaseEndpointRPC.java     | 165 +-----------------
 .../hbase/cube/v2/ExpectedSizeIterator.java     | 116 +++++++++++++
 .../storage/hbase/cube/v2/GTBlobScatter.java    | 150 ++++++++++++++++
 .../coprocessor/endpoint/CubeVisitService.java  |  70 ++++----
 42 files changed, 1811 insertions(+), 447 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f51dce6..eb4102a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -23,9 +23,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.SortedSet;
+import java.util.Map.Entry;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -470,6 +470,10 @@ abstract public class KylinConfigBase implements Serializable {
         return Float.parseFloat(getOptional("kylin.hbase.hfile.size.gb", "2.0"));
     }
 
+    public int getStoragePushDownLimitMax() {
+        return Integer.parseInt(getOptional("kylin.query.pushdown.limit.max", "10000"));
+    }
+
     public int getScanThreshold() {
         return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
index e385ab9..d417d11 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
@@ -25,6 +25,7 @@ import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -83,7 +84,7 @@ abstract public class AbstractInMemCubeBuilder {
 
     protected void outputCuboid(long cuboidId, GridTable gridTable, ICuboidWriter output) throws IOException {
         long startTime = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(gridTable.getInfo(), null, null, null);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(gridTable.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
         IGTScanner scanner = gridTable.scan(req);
         for (GTRecord record : scanner) {
             output.write(cuboidId, record);

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 606c820..15f2241 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -35,7 +35,7 @@ import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -399,7 +399,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
                 if (cuboidIterator.hasNext()) {
                     CuboidResult cuboid = cuboidIterator.next();
                     currentCuboidId = cuboid.cuboidId;
-                    scanner = cuboid.table.scan(new GTScanRequest(cuboid.table.getInfo(), null, null, null));
+                    scanner = cuboid.table.scan(new GTScanRequestBuilder().setInfo(cuboid.table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
                     recordIterator = scanner.iterator();
                 } else {
                     return false;

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index e12e815..36d1296 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -31,8 +31,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
-import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.gridtable.CubeGridTable;
@@ -42,6 +42,7 @@ import org.apache.kylin.gridtable.GTBuilder;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.measure.topn.Counter;
@@ -329,8 +330,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         IGTScanner baseInput = new InputConverter(baseCuboid.getInfo(), input);
 
         Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
-        GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
-        GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(baseCuboid.getInfo()).setRanges(null).setDimensions(null).setAggrGroupBy(dimensionMetricsBitSet.getFirst()).setAggrMetrics(dimensionMetricsBitSet.getSecond()).setAggrMetricsFuncs(metricsAggrFuncs).setFilterPushDown(null).createGTScanRequest();
+        GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req, Long.MAX_VALUE);
         aggregationScanner.trackMemoryLevel(baseCuboidMemTracker);
 
         int count = 0;
@@ -397,7 +398,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
     private GTAggregateScanner prepareGTAggregationScanner(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
         GTInfo info = gridTable.getInfo();
-        GTScanRequest req = new GTScanRequest(info, null, null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(aggregationColumns).setAggrMetrics(measureColumns).setAggrMetricsFuncs(metricsAggrFuncs).setFilterPushDown(null).createGTScanRequest();
         GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req);
 
         // for child cuboid, some measures don't need aggregation.

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index cb23af4..ccf4895 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -63,24 +63,28 @@ public class GTAggregateScanner implements IGTScanner {
     final IGTScanner inputScanner;
     final AggregationCache aggrCache;
     final long spillThreshold;
+    final int storagePushDownLimit;//default to be Int.MAX
+    final long deadline;
 
     private int aggregatedRowCount = 0;
     private MemoryWaterLevel memTracker;
     private boolean[] aggrMask;
 
-    public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req) {
+    public GTAggregateScanner(IGTScanner inputScanner, GTScanRequest req, long deadline) {
         if (!req.hasAggregation())
             throw new IllegalStateException();
 
         this.info = inputScanner.getInfo();
-        this.dimensions = req.getColumns().andNot(req.getAggrMetrics());
+        this.dimensions = req.getDimensions();
         this.groupBy = req.getAggrGroupBy();
         this.metrics = req.getAggrMetrics();
         this.metricsAggrFuncs = req.getAggrMetricsFuncs();
         this.inputScanner = inputScanner;
         this.aggrCache = new AggregationCache();
-        this.spillThreshold = (long) (req.getAggrCacheGB() * MemoryBudgetController.ONE_GB);
+        this.spillThreshold = (long) (req.getAggCacheMemThreshold() * MemoryBudgetController.ONE_GB);
         this.aggrMask = new boolean[metricsAggrFuncs.length];
+        this.storagePushDownLimit = req.getStoragePushDownLimit();
+        this.deadline = deadline;
 
         Arrays.fill(aggrMask, true);
     }
@@ -133,8 +137,25 @@ public class GTAggregateScanner implements IGTScanner {
     public Iterator<GTRecord> iterator() {
         long count = 0;
         for (GTRecord r : inputScanner) {
+
             count++;
-            aggrCache.aggregate(r);
+
+            if (getNumOfSpills() == 0) {
+                //check limit
+                boolean ret = aggrCache.aggregate(r, storagePushDownLimit);
+
+                if (!ret) {
+                    logger.info("abort reading inputScanner because storage push down limit is hit");
+                    break;//limit is hit
+                }
+            } else {//else if dumps is not empty, it means a lot of row need aggregated, so it's less likely that limit clause is helping 
+                aggrCache.aggregate(r, Integer.MAX_VALUE);
+            }
+
+            //check deadline
+            if (count % 10000 == 1 && System.currentTimeMillis() > deadline) {
+                throw new GTScanTimeoutException("Timeout in GTAggregateScanner with scanned count " + count);
+            }
         }
         logger.info("GTAggregateScanner input rows: " + count);
         return aggrCache.iterator();
@@ -241,7 +262,7 @@ public class GTAggregateScanner implements IGTScanner {
             return result;
         }
 
-        void aggregate(GTRecord r) {
+        boolean aggregate(GTRecord r, int stopForLimit) {
             if (++aggregatedRowCount % 100000 == 0) {
                 if (memTracker != null) {
                     memTracker.markHigh();
@@ -257,6 +278,12 @@ public class GTAggregateScanner implements IGTScanner {
             final byte[] key = createKey(r);
             MeasureAggregator[] aggrs = aggBufMap.get(key);
             if (aggrs == null) {
+
+                //for storage push down limit
+                if (aggBufMap.size() >= stopForLimit) {
+                    return false;
+                }
+
                 aggrs = newAggregators();
                 aggBufMap.put(key, aggrs);
             }
@@ -267,6 +294,7 @@ public class GTAggregateScanner implements IGTScanner {
                     aggrs[i].aggregate(metrics);
                 }
             }
+            return true;
         }
 
         private void spillBuffMap() throws RuntimeException {

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
index 673d22e..21da4ea 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
@@ -151,7 +151,7 @@ public class GTInfo {
         if (!expected.equals(ref))
             throw new IllegalArgumentException();
     }
-
+    
     void validate() {
         if (codeSystem == null)
             throw new IllegalStateException();

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
index 37f42c7..4d26029 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTRecord.java
@@ -20,6 +20,7 @@ package org.apache.kylin.gridtable;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.List;
 
 import org.apache.kylin.common.util.ByteArray;
@@ -27,7 +28,7 @@ import org.apache.kylin.common.util.ImmutableBitSet;
 
 import com.google.common.base.Preconditions;
 
-public class GTRecord implements Comparable<GTRecord> {
+public class GTRecord implements Comparable<GTRecord>, Cloneable {
 
     final transient GTInfo info;
     final ByteArray[] cols;
@@ -54,6 +55,11 @@ public class GTRecord implements Comparable<GTRecord> {
         }
     }
 
+    @Override
+    public Object clone() {
+        return new GTRecord(this);
+    }
+
     public GTInfo getInfo() {
         return info;
     }
@@ -189,12 +195,33 @@ public class GTRecord implements Comparable<GTRecord> {
 
     @Override
     public int compareTo(GTRecord o) {
+        return compareToInternal(o, info.colAll);
+    }
+
+    public int compareToOnPrimaryKey(GTRecord o) {
+        return compareToInternal(o, info.primaryKey);
+    }
+
+    public static Comparator<GTRecord> getPrimaryKeyComparator() {
+        return new Comparator<GTRecord>() {
+            @Override
+            public int compare(GTRecord o1, GTRecord o2) {
+                if (o1 == null || o2 == null) {
+                    throw new IllegalStateException("Cannot handle null");
+                }
+
+                return o1.compareToOnPrimaryKey(o2);
+            }
+        };
+    }
+
+    private int compareToInternal(GTRecord o, ImmutableBitSet participateCols) {
         assert this.info == o.info; // reference equal for performance
         IGTComparator comparator = info.codeSystem.getComparator();
 
         int comp = 0;
-        for (int i = 0; i < info.colAll.trueBitCount(); i++) {
-            int c = info.colAll.trueBitAt(i);
+        for (int i = 0; i < participateCols.trueBitCount(); i++) {
+            int c = participateCols.trueBitAt(i);
             comp = comparator.compare(cols[c], o.cols[c]);
             if (comp != 0)
                 return comp;

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
new file mode 100644
index 0000000..dd57e90
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanExceedThresholdException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+public class GTScanExceedThresholdException extends RuntimeException {
+
+    public GTScanExceedThresholdException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
index 17d27f9..b8f7e0e 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -165,7 +165,7 @@ public class GTScanRangePlanner {
         GTScanRequest scanRequest;
         List<GTScanRange> scanRanges = this.planScanRanges();
         if (scanRanges != null && scanRanges.size() != 0) {
-            scanRequest = new GTScanRequest(gtInfo, scanRanges, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter);
+            scanRequest = new GTScanRequestBuilder().setInfo(gtInfo).setRanges(scanRanges).setDimensions(gtDimensions).setAggrGroupBy(gtAggrGroups).setAggrMetrics(gtAggrMetrics).setAggrMetricsFuncs(gtAggrFuncs).setFilterPushDown(gtFilter).createGTScanRequest();
         } else {
             scanRequest = null;
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 0ce6b4c..e2bac3d 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -57,20 +57,14 @@ public class GTScanRequest {
     private String[] aggrMetricsFuncs;
 
     // hint to storage behavior
-    private boolean allowPreAggregation = true;
-    private double aggrCacheGB = 0; // 0 means no row/memory limit; positive means memory limit in GB; negative means row limit
-
-    public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet columns, TupleFilter filterPushDown) {
-        this(info, ranges, columns, null, null, null, filterPushDown, true, 0);
-    }
-
-    public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
-            ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown) {
-        this(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, true, 0);
-    }
-
-    public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
-            ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowPreAggregation, double aggrCacheGB) {
+    private boolean allowStorageAggregation;
+    private double aggCacheMemThreshold;
+    private int storageScanRowNumThreshold;
+    private int storagePushDownLimit;
+
+    GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
+            ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowStorageAggregation, //
+            double aggCacheMemThreshold, int storageScanRowNumThreshold, int storagePushDownLimit) {
         this.info = info;
         if (ranges == null) {
             this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info)));
@@ -84,8 +78,10 @@ public class GTScanRequest {
         this.aggrMetrics = aggrMetrics;
         this.aggrMetricsFuncs = aggrMetricsFuncs;
 
-        this.allowPreAggregation = allowPreAggregation;
-        this.aggrCacheGB = aggrCacheGB;
+        this.allowStorageAggregation = allowStorageAggregation;
+        this.aggCacheMemThreshold = aggCacheMemThreshold;
+        this.storageScanRowNumThreshold = storageScanRowNumThreshold;
+        this.storagePushDownLimit = storagePushDownLimit;
 
         validate(info);
     }
@@ -143,7 +139,7 @@ public class GTScanRequest {
     }
 
     public IGTScanner decorateScanner(IGTScanner scanner) throws IOException {
-        return decorateScanner(scanner, true, true);
+        return decorateScanner(scanner, true, true, Long.MAX_VALUE);
     }
 
     /**
@@ -152,7 +148,7 @@ public class GTScanRequest {
      * <p/>
      * Refer to CoprocessorBehavior for explanation
      */
-    public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr) throws IOException {
+    public IGTScanner decorateScanner(IGTScanner scanner, boolean doFilter, boolean doAggr, long deadline) throws IOException {
         IGTScanner result = scanner;
         if (!doFilter) { //Skip reading this section if you're not profiling! 
             int scanned = lookAndForget(result);
@@ -169,11 +165,11 @@ public class GTScanRequest {
                 return new EmptyGTScanner(scanned);
             }
 
-            if (!this.allowPreAggregation) {
+            if (!this.isAllowStorageAggregation()) {
                 logger.info("pre aggregation is not beneficial, skip it");
             } else if (this.hasAggregation()) {
                 logger.info("pre aggregating results before returning");
-                result = new GTAggregateScanner(result, this);
+                result = new GTAggregateScanner(result, this, deadline);
             } else {
                 logger.info("has no aggregation, skip it");
             }
@@ -235,6 +231,10 @@ public class GTScanRequest {
         return filterPushDown;
     }
 
+    public ImmutableBitSet getDimensions() {
+        return this.getColumns().andNot(this.getAggrMetrics());
+    }
+
     public ImmutableBitSet getAggrGroupBy() {
         return aggrGroupBy;
     }
@@ -247,34 +247,41 @@ public class GTScanRequest {
         return aggrMetricsFuncs;
     }
 
-    public boolean isAllowPreAggregation() {
-        return allowPreAggregation;
+    public boolean isAllowStorageAggregation() {
+        return allowStorageAggregation;
     }
 
-    public void setAllowPreAggregation(boolean allowPreAggregation) {
-        this.allowPreAggregation = allowPreAggregation;
+    public void setAllowStorageAggregation(boolean allowStorageAggregation) {
+        this.allowStorageAggregation = allowStorageAggregation;
     }
 
-    public double getAggrCacheGB() {
-        if (aggrCacheGB < 0)
+    public double getAggCacheMemThreshold() {
+        if (aggCacheMemThreshold < 0)
             return 0;
         else
-            return aggrCacheGB;
+            return aggCacheMemThreshold;
     }
 
-    public void setAggrCacheGB(double gb) {
-        this.aggrCacheGB = gb;
+    public void setAggCacheMemThreshold(double gb) {
+        this.aggCacheMemThreshold = gb;
     }
 
-    public int getRowLimit() {
-        if (aggrCacheGB < 0)
-            return (int) -aggrCacheGB;
-        else
-            return 0;
+    public int getStorageScanRowNumThreshold() {
+        return storageScanRowNumThreshold;
+    }
+
+    public void setStorageScanRowNumThreshold(int storageScanRowNumThreshold) {
+        logger.info("storageScanRowNumThreshold is set to " + storageScanRowNumThreshold);
+        this.storageScanRowNumThreshold = storageScanRowNumThreshold;
+    }
+
+    public int getStoragePushDownLimit() {
+        return this.storagePushDownLimit;
     }
 
-    public void setRowLimit(int limit) {
-        aggrCacheGB = -limit;
+    public void setStoragePushDownLimit(int limit) {
+        logger.info("storagePushDownLimit is set to " + storagePushDownLimit);
+        this.storagePushDownLimit = limit;
     }
 
     public List<Integer> getRequiredMeasures() {
@@ -323,8 +330,10 @@ public class GTScanRequest {
             ImmutableBitSet.serializer.serialize(value.aggrGroupBy, out);
             ImmutableBitSet.serializer.serialize(value.aggrMetrics, out);
             BytesUtil.writeAsciiStringArray(value.aggrMetricsFuncs, out);
-            BytesUtil.writeVInt(value.allowPreAggregation ? 1 : 0, out);
-            out.putDouble(value.aggrCacheGB);
+            BytesUtil.writeVInt(value.allowStorageAggregation ? 1 : 0, out);
+            out.putDouble(value.aggCacheMemThreshold);
+            BytesUtil.writeVInt(value.storageScanRowNumThreshold, out);
+            BytesUtil.writeVInt(value.storagePushDownLimit, out);
         }
 
         @Override
@@ -353,8 +362,13 @@ public class GTScanRequest {
             String[] sAggrMetricFuncs = BytesUtil.readAsciiStringArray(in);
             boolean sAllowPreAggr = (BytesUtil.readVInt(in) == 1);
             double sAggrCacheGB = in.getDouble();
+            int storageScanRowNumThreshold = BytesUtil.readVInt(in);
+            int storagePushDownLimit = BytesUtil.readVInt(in);
 
-            return new GTScanRequest(sInfo, sRanges, sColumns, sAggGroupBy, sAggrMetrics, sAggrMetricFuncs, sGTFilter, sAllowPreAggr, sAggrCacheGB);
+            return new GTScanRequestBuilder().setInfo(sInfo).setRanges(sRanges).setDimensions(sColumns).//
+            setAggrGroupBy(sAggGroupBy).setAggrMetrics(sAggrMetrics).setAggrMetricsFuncs(sAggrMetricFuncs).//
+            setFilterPushDown(sGTFilter).setAllowStorageAggregation(sAllowPreAggr).setAggCacheMemThreshold(sAggrCacheGB).//
+            setStorageScanRowNumThreshold(storageScanRowNumThreshold).setStoragePushDownLimit(storagePushDownLimit).createGTScanRequest();
         }
 
         private void serializeGTRecord(GTRecord gtRecord, ByteBuffer out) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
new file mode 100644
index 0000000..49ec759
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequestBuilder.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+import java.util.List;
+
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.filter.TupleFilter;
+
+public class GTScanRequestBuilder {
+    private GTInfo info;
+    private List<GTScanRange> ranges;
+    private TupleFilter filterPushDown;
+    private ImmutableBitSet dimensions;
+    private ImmutableBitSet aggrGroupBy = null;
+    private ImmutableBitSet aggrMetrics = null;
+    private String[] aggrMetricsFuncs = null;
+    private boolean allowStorageAggregation = true;
+    private double aggCacheMemThreshold = 0;
+    private int storageScanRowNumThreshold = Integer.MAX_VALUE;// storage should terminate itself when $storageScanRowNumThreshold cuboid rows are scanned, and throw exception.   
+    private int storagePushDownLimit = Integer.MAX_VALUE;// storage can quit working when $toragePushDownLimit aggregated rows are produced. 
+
+    public GTScanRequestBuilder setInfo(GTInfo info) {
+        this.info = info;
+        return this;
+    }
+
+    public GTScanRequestBuilder setRanges(List<GTScanRange> ranges) {
+        this.ranges = ranges;
+        return this;
+    }
+
+    public GTScanRequestBuilder setFilterPushDown(TupleFilter filterPushDown) {
+        this.filterPushDown = filterPushDown;
+        return this;
+    }
+
+    public GTScanRequestBuilder setDimensions(ImmutableBitSet dimensions) {
+        this.dimensions = dimensions;
+        return this;
+    }
+
+    public GTScanRequestBuilder setAggrGroupBy(ImmutableBitSet aggrGroupBy) {
+        this.aggrGroupBy = aggrGroupBy;
+        return this;
+    }
+
+    public GTScanRequestBuilder setAggrMetrics(ImmutableBitSet aggrMetrics) {
+        this.aggrMetrics = aggrMetrics;
+        return this;
+    }
+
+    public GTScanRequestBuilder setAggrMetricsFuncs(String[] aggrMetricsFuncs) {
+        this.aggrMetricsFuncs = aggrMetricsFuncs;
+        return this;
+    }
+
+    public GTScanRequestBuilder setAllowStorageAggregation(boolean allowStorageAggregation) {
+        this.allowStorageAggregation = allowStorageAggregation;
+        return this;
+    }
+
+    public GTScanRequestBuilder setAggCacheMemThreshold(double aggCacheMemThreshold) {
+        this.aggCacheMemThreshold = aggCacheMemThreshold;
+        return this;
+    }
+
+    public GTScanRequestBuilder setStorageScanRowNumThreshold(int storageScanRowNumThreshold) {
+        this.storageScanRowNumThreshold = storageScanRowNumThreshold;
+        return this;
+    }
+
+    public GTScanRequestBuilder setStoragePushDownLimit(int storagePushDownLimit) {
+        this.storagePushDownLimit = storagePushDownLimit;
+        return this;
+    }
+
+    public GTScanRequest createGTScanRequest() {
+        return new GTScanRequest(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, allowStorageAggregation, aggCacheMemThreshold, storageScanRowNumThreshold, storagePushDownLimit);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
new file mode 100644
index 0000000..e92dae3
--- /dev/null
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanTimeoutException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.gridtable;
+
+public class GTScanTimeoutException extends RuntimeException {
+
+    public GTScanTimeoutException(String message) {
+        super(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
index 680ba33..589f37c 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
@@ -29,6 +29,7 @@ import org.apache.kylin.gridtable.GTInfo.Builder;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTSampleCodeSystem;
 import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
@@ -110,7 +111,7 @@ public class GTScannerBenchmark {
     @SuppressWarnings("unused")
     private void testAggregate(ImmutableBitSet groupBy) throws IOException {
         long t = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(dimensions).setAggrGroupBy(groupBy).setAggrMetrics(metrics).setAggrMetricsFuncs(aggrFuncs).setFilterPushDown(null).createGTScanRequest();
         IGTScanner scanner = req.decorateScanner(gen.generate(N));
 
         long count = 0;
@@ -154,7 +155,7 @@ public class GTScannerBenchmark {
     @SuppressWarnings("unused")
     private void testFilter(TupleFilter filter) throws IOException {
         long t = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(info, null, info.getAllColumns(), filter);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(info.getAllColumns()).setFilterPushDown(filter).createGTScanRequest();
         IGTScanner scanner = req.decorateScanner(gen.generate(N));
 
         long count = 0;

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
index ae86e46..40a5e01 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
@@ -31,6 +31,7 @@ import org.apache.kylin.gridtable.GTInfo.Builder;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTSampleCodeSystem;
 import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.benchmark.SortedGTRecordGenerator.Randomizer;
 import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter;
@@ -132,7 +133,7 @@ public class GTScannerBenchmark2 {
     @SuppressWarnings("unused")
     private void testAggregate(ImmutableBitSet groupBy) throws IOException {
         long t = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(dimensions).setAggrGroupBy(groupBy).setAggrMetrics(metrics).setAggrMetricsFuncs(aggrFuncs).setFilterPushDown(null).createGTScanRequest();
         IGTScanner scanner = req.decorateScanner(gen.generate(N));
 
         long count = 0;
@@ -176,7 +177,7 @@ public class GTScannerBenchmark2 {
     @SuppressWarnings("unused")
     private void testFilter(TupleFilter filter) throws IOException {
         long t = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(info, null, info.getAllColumns(), filter);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(info.getAllColumns()).setFilterPushDown(filter).createGTScanRequest();
         IGTScanner scanner = req.decorateScanner(gen.generate(N));
 
         long count = 0;

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
index ceaf80d..6355f4a 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/ConcurrentDiskStoreTest.java
@@ -26,7 +26,7 @@ import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.gridtable.GTBuilder;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.UnitTestSupport;
@@ -84,7 +84,7 @@ public class ConcurrentDiskStoreTest extends LocalFileMetadataTestCase {
             t[i] = new Thread() {
                 public void run() {
                     try {
-                        IGTScanner scanner = table.scan(new GTScanRequest(table.getInfo(), null, null, null));
+                        IGTScanner scanner = table.scan(new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
                         int i = 0;
                         for (GTRecord r : scanner) {
                             assertEquals(data.get(i++), r);

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
index 807a6e3..06ade1c 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/MemDiskStoreTest.java
@@ -27,7 +27,7 @@ import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.gridtable.GTBuilder;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
-import org.apache.kylin.gridtable.GTScanRequest;
+import org.apache.kylin.gridtable.GTScanRequestBuilder;
 import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.gridtable.UnitTestSupport;
@@ -100,7 +100,7 @@ public class MemDiskStoreTest extends LocalFileMetadataTestCase {
         }
         builder.close();
 
-        IGTScanner scanner = table.scan(new GTScanRequest(info, null, null, null));
+        IGTScanner scanner = table.scan(new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest());
         int i = 0;
         for (GTRecord r : scanner) {
             assertEquals(data.get(i++), r);

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
index 4160f86..b5f6de7 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
@@ -84,10 +84,10 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase {
             }
         };
 
-        GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null);
-        scanRequest.setAggrCacheGB(0.5);
+        GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(INFO).setRanges(null).setDimensions(new ImmutableBitSet(0, 3)).setAggrGroupBy(new ImmutableBitSet(0, 3)).setAggrMetrics(new ImmutableBitSet(3, 6)).setAggrMetricsFuncs(new String[] { "SUM", "SUM", "COUNT_DISTINCT" }).setFilterPushDown(null).createGTScanRequest();
+        scanRequest.setAggCacheMemThreshold(0.5);
 
-        GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest);
+        GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest, Long.MAX_VALUE);
 
         int count = 0;
         for (GTRecord record : scanner) {
@@ -127,10 +127,10 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase {
         };
 
         // all-in-mem testcase
-        GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(1, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null);
-        scanRequest.setAggrCacheGB(0.5);
+        GTScanRequest scanRequest = new GTScanRequestBuilder().setInfo(INFO).setRanges(null).setDimensions(new ImmutableBitSet(0, 3)).setAggrGroupBy(new ImmutableBitSet(1, 3)).setAggrMetrics(new ImmutableBitSet(3, 6)).setAggrMetricsFuncs(new String[] { "SUM", "SUM", "COUNT_DISTINCT" }).setFilterPushDown(null).createGTScanRequest();
+        scanRequest.setAggCacheMemThreshold(0.5);
 
-        GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest);
+        GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest, Long.MAX_VALUE);
 
         int count = 0;
         for (GTRecord record : scanner) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
index af39e21..74338af 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -256,7 +256,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
 
     @Test
     public void verifyFirstRow() throws IOException {
-        doScanAndVerify(table, new GTScanRequest(table.getInfo(), null, null, null), "[1421193600000, 30, Yang, 10, 10.5]", //
+        doScanAndVerify(table, new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest(), "[1421193600000, 30, Yang, 10, 10.5]", //
                 "[1421193600000, 30, Luke, 10, 10.5]", //
                 "[1421280000000, 20, Dong, 10, 10.5]", //
                 "[1421280000000, 20, Jason, 10, 10.5]", //
@@ -276,7 +276,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         GTScanRequest sGTScanRequest = GTScanRequest.serializer.deserialize(buffer);
 
         Assert.assertArrayEquals(origin.getAggrMetricsFuncs(), sGTScanRequest.getAggrMetricsFuncs());
-        Assert.assertEquals(origin.getAggrCacheGB(), sGTScanRequest.getAggrCacheGB(), 0.01);
+        Assert.assertEquals(origin.getAggCacheMemThreshold(), sGTScanRequest.getAggCacheMemThreshold(), 0.01);
         return sGTScanRequest;
     }
 
@@ -289,7 +289,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
         LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
 
-        GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[]{"sum"}).setFilterPushDown(filter).createGTScanRequest();
 
         // note the unEvaluatable column 1 in filter is added to group by
         assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
@@ -305,7 +305,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
         LogicalTupleFilter filter = and(fComp1, fComp2);
 
-        GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0)).setAggrMetrics(setOf(3)).setAggrMetricsFuncs(new String[]{"sum"}).setFilterPushDown(filter).createGTScanRequest();
         // note the evaluatable column 1 in filter is added to returned columns but not in group by
         assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
 
@@ -334,7 +334,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
     @SuppressWarnings("unused")
     private void testFilterScannerPerfInner(GridTable table, GTInfo info, LogicalTupleFilter filter) throws IOException {
         long start = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(info, null, null, filter);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(info).setRanges(null).setDimensions(null).setFilterPushDown(filter).createGTScanRequest();
         IGTScanner scanner = table.scan(req);
         int i = 0;
         for (GTRecord r : scanner) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
index 2abe928..fd571d0 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
@@ -90,7 +90,7 @@ public class SimpleGridTableTest extends LocalFileMetadataTestCase {
     }
 
     private IGTScanner scan(GridTable table) throws IOException {
-        GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, null);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setFilterPushDown(null).createGTScanRequest();
         IGTScanner scanner = table.scan(req);
         for (GTRecord r : scanner) {
             Object[] v = r.getValues();
@@ -106,7 +106,7 @@ public class SimpleGridTableTest extends LocalFileMetadataTestCase {
     }
 
     private IGTScanner scanAndAggregate(GridTable table) throws IOException {
-        GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null);
+        GTScanRequest req = new GTScanRequestBuilder().setInfo(table.getInfo()).setRanges(null).setDimensions(null).setAggrGroupBy(setOf(0, 2)).setAggrMetrics(setOf(3, 4)).setAggrMetricsFuncs(new String[]{"count", "sum"}).setFilterPushDown(null).createGTScanRequest();
         IGTScanner scanner = table.scan(req);
         int i = 0;
         for (GTRecord r : scanner) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
index 7d401ec..742cfba 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/ITuple.java
@@ -28,7 +28,7 @@ import org.apache.kylin.metadata.model.TblColRef;
  * 
  * @author yangli9
  */
-public interface ITuple extends IEvaluatableTuple {
+public interface ITuple extends IEvaluatableTuple, Cloneable {
 
     List<String> getAllFields();
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
index 14d717e..54e5786 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/tuple/Tuple.java
@@ -55,6 +55,11 @@ public class Tuple implements ITuple {
     }
 
     @Override
+    public Object clone() {
+        return makeCopy();
+    }
+
+    @Override
     public ITuple makeCopy() {
         Tuple ret = new Tuple(this.info);
         for (int i = 0; i < this.values.length; ++i) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 90a2e43..acb4960 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -104,6 +104,10 @@ public class StorageContext {
         return this.enableLimit;
     }
 
+    public int getStoragePushDownLimit() {
+        return this.isLimitEnabled() ? this.getOffset() + this.getLimit() : Integer.MAX_VALUE;
+    }
+    
     public void markSort() {
         this.hasSort = true;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
index 9ca53f9..83ee6c7 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeSegmentScanner.java
@@ -69,10 +69,10 @@ public class CubeSegmentScanner implements IGTScanner {
         }
         scanRequest = scanRangePlanner.planScanRequest();
         if (scanRequest != null) {
-            scanRequest.setAllowPreAggregation(context.isNeedStorageAggregation());
-            scanRequest.setAggrCacheGB(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
-            if (context.isLimitEnabled())
-                scanRequest.setRowLimit(context.getLimit());
+            scanRequest.setAllowStorageAggregation(context.isNeedStorageAggregation());
+            scanRequest.setAggCacheMemThreshold(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
+            scanRequest.setStorageScanRowNumThreshold(context.getThreshold());//TODO: devide by shard number?
+            scanRequest.setStoragePushDownLimit(context.getStoragePushDownLimit());
         }
         scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest, gtStorage);
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 68556d6..0f96e3c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -18,10 +18,11 @@
 
 package org.apache.kylin.storage.gtrecord;
 
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
+import java.util.Map.Entry;
 
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.Dictionary;
@@ -36,9 +37,11 @@ import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
@@ -62,8 +65,10 @@ public class CubeTupleConverter {
 
     final int nSelectedDims;
 
+    final int[] dimensionIndexOnTuple;
+
     public CubeTupleConverter(CubeSegment cubeSeg, Cuboid cuboid, //
-            Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
+                              Set<TblColRef> selectedDimensions, Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo) {
         this.cubeSeg = cubeSeg;
         this.cuboid = cuboid;
         this.tupleInfo = returnTupleInfo;
@@ -83,6 +88,20 @@ public class CubeTupleConverter {
         advMeasureFillers = Lists.newArrayListWithCapacity(1);
         advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
 
+        // dimensionIndexOnTuple is for SQL with limit
+        List<Integer> temp = Lists.newArrayList();
+        for (TblColRef dim : cuboid.getColumns()) {
+            if (tupleInfo.hasColumn(dim)) {
+                temp.add(tupleInfo.getColumnIndex(dim));
+            }
+        }
+        dimensionIndexOnTuple = new int[temp.size()];
+        for (int i = 0; i < temp.size(); i++) {
+            dimensionIndexOnTuple[i] = temp.get(i);
+        }
+        
+        ////////////
+
         int iii = 0;
 
         // pre-calculate dimension index mapping to tuple
@@ -90,6 +109,11 @@ public class CubeTupleConverter {
             int i = mapping.getIndexOf(dim);
             gtColIdx[iii] = i;
             tupleIdx[iii] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
+
+            //            if (tupleIdx[iii] == -1) {
+            //                throw new IllegalStateException("dim not used in tuple:" + dim);
+            //            }
+
             iii++;
         }
 
@@ -131,6 +155,44 @@ public class CubeTupleConverter {
         }
     }
 
+    public Comparator<ITuple> getTupleDimensionComparator() {
+        return new Comparator<ITuple>() {
+            @Override
+            public int compare(ITuple o1, ITuple o2) {
+                Preconditions.checkNotNull(o1);
+                Preconditions.checkNotNull(o2);
+                for (int i = 0; i < dimensionIndexOnTuple.length; i++) {
+                    int index = dimensionIndexOnTuple[i];
+
+                    if (index == -1) {
+                        //TODO: 
+                        continue;
+                    }
+
+                    Comparable a = (Comparable) o1.getAllValues()[index];
+                    Comparable b = (Comparable) o2.getAllValues()[index];
+
+                    if (a == null && b == null) {
+                        continue;
+                    } else if (a == null) {
+                        return 1;
+                    } else if (b == null) {
+                        return -1;
+                    } else {
+                        int temp = a.compareTo(b);
+                        if (temp != 0) {
+                            return temp;
+                        } else {
+                            continue;
+                        }
+                    }
+                }
+
+                return 0;
+            }
+        };
+    }
+
     // load only needed dictionaries
     private Map<TblColRef, Dictionary<String>> buildDictionaryMap(List<TblColRef> columnsNeedDictionary) {
         Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
@@ -201,55 +263,55 @@ public class CubeTupleConverter {
             return null;
 
         switch (deriveInfo.type) {
-        case LOOKUP:
-            return new IDerivedColumnFiller() {
-                CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
-                LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.dimension);
-                int[] derivedColIdx = initDerivedColIdx();
-                Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
-
-                private int[] initDerivedColIdx() {
-                    int[] idx = new int[deriveInfo.columns.length];
-                    for (int i = 0; i < idx.length; i++) {
-                        idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex();
+            case LOOKUP:
+                return new IDerivedColumnFiller() {
+                    CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
+                    LookupStringTable lookupTable = cubeMgr.getLookupTable(cubeSeg, deriveInfo.dimension);
+                    int[] derivedColIdx = initDerivedColIdx();
+                    Array<String> lookupKey = new Array<String>(new String[hostTmpIdx.length]);
+
+                    private int[] initDerivedColIdx() {
+                        int[] idx = new int[deriveInfo.columns.length];
+                        for (int i = 0; i < idx.length; i++) {
+                            idx[i] = deriveInfo.columns[i].getColumnDesc().getZeroBasedIndex();
+                        }
+                        return idx;
                     }
-                    return idx;
-                }
 
-                @Override
-                public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
-                    for (int i = 0; i < hostTmpIdx.length; i++) {
-                        lookupKey.data[i] = CubeTupleConverter.toString(gtValues[hostTmpIdx[i]]);
-                    }
+                    @Override
+                    public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
+                        for (int i = 0; i < hostTmpIdx.length; i++) {
+                            lookupKey.data[i] = CubeTupleConverter.toString(gtValues[hostTmpIdx[i]]);
+                        }
 
-                    String[] lookupRow = lookupTable.getRow(lookupKey);
+                        String[] lookupRow = lookupTable.getRow(lookupKey);
 
-                    if (lookupRow != null) {
-                        for (int i = 0; i < derivedTupleIdx.length; i++) {
-                            if (derivedTupleIdx[i] >= 0) {
-                                String value = lookupRow[derivedColIdx[i]];
-                                tuple.setDimensionValue(derivedTupleIdx[i], value);
+                        if (lookupRow != null) {
+                            for (int i = 0; i < derivedTupleIdx.length; i++) {
+                                if (derivedTupleIdx[i] >= 0) {
+                                    String value = lookupRow[derivedColIdx[i]];
+                                    tuple.setDimensionValue(derivedTupleIdx[i], value);
+                                }
                             }
-                        }
-                    } else {
-                        for (int i = 0; i < derivedTupleIdx.length; i++) {
-                            if (derivedTupleIdx[i] >= 0) {
-                                tuple.setDimensionValue(derivedTupleIdx[i], null);
+                        } else {
+                            for (int i = 0; i < derivedTupleIdx.length; i++) {
+                                if (derivedTupleIdx[i] >= 0) {
+                                    tuple.setDimensionValue(derivedTupleIdx[i], null);
+                                }
                             }
                         }
                     }
-                }
-            };
-        case PK_FK:
-            return new IDerivedColumnFiller() {
-                @Override
-                public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
-                    // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns()
-                    tuple.setDimensionValue(derivedTupleIdx[0], CubeTupleConverter.toString(gtValues[hostTmpIdx[0]]));
-                }
-            };
-        default:
-            throw new IllegalArgumentException();
+                };
+            case PK_FK:
+                return new IDerivedColumnFiller() {
+                    @Override
+                    public void fillDerivedColumns(Object[] gtValues, Tuple tuple) {
+                        // composite keys are split, so only copy [0] is enough, see CubeDesc.initDimensionColumns()
+                        tuple.setDimensionValue(derivedTupleIdx[0], CubeTupleConverter.toString(gtValues[hostTmpIdx[0]]));
+                    }
+                };
+            default:
+                throw new IllegalArgumentException();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/FetchSourceAwareIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/FetchSourceAwareIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/FetchSourceAwareIterator.java
new file mode 100644
index 0000000..cb83819
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/FetchSourceAwareIterator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Iterator;
+
+interface FetchSourceAwareIterator<F> extends IFetchSourceAware<F>, Iterator<F> {
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 7acf186..ae5240b 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -101,13 +101,13 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
         boolean exactAggregation = isExactAggregation(cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation);
         context.setExactAggregation(exactAggregation);
-        context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD, exactAggregation));
 
         // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
         TupleFilter filterD = translateDerived(filter, groupsD);
 
+        context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD, exactAggregation));
+        enableStoragePushDownLimit(cuboid, groups, derivedPostAggregation, groupsD, filter, sqlDigest.aggregations, context);
         setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
-        setLimit(filter, context);
 
         List<CubeSegmentScanner> scanners = Lists.newArrayList();
         for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
@@ -227,6 +227,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         return !isExactAggregation;
     }
 
+    //exact aggregation was introduced back when we had some measures (like holistic distinct count) that is sensitive
+    //to post aggregation. Now that we don't have such measure any more, isExactAggregation should be useless (at least in v2 storage and above)
     public boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
         boolean exact = true;
 
@@ -372,11 +374,44 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         }
     }
 
-    private void setLimit(TupleFilter filter, StorageContext context) {
-        boolean goodAggr = context.isExactAggregation();
+    private void enableStoragePushDownLimit(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Collection<FunctionDesc> functionDescs, StorageContext context) {
+        boolean possible = true;
+
         boolean goodFilter = filter == null || (TupleFilter.isEvaluableRecursively(filter) && context.isCoprocessorEnabled());
-        boolean goodSort = context.hasSort() == false;
-        if (goodAggr && goodFilter && goodSort) {
+        if (!goodFilter) {
+            possible = false;
+            logger.info("Storage limit push down is impossible because the filter is unevaluatable");
+        }
+
+        boolean goodSort = !context.hasSort();
+        if (!goodSort) {
+            possible = false;
+            logger.info("Storage limit push down is impossible because the query has order by");
+        }
+
+        // derived aggregation is bad, unless expanded columns are already in group by
+        if (!groups.containsAll(derivedPostAggregation)) {
+            possible = false;
+            logger.info("Storage limit push down is impossible because derived column require post aggregation: " + derivedPostAggregation);
+        }
+
+        //if groupsD is clustered at "head" of the rowkey, then limit push down is possible
+        int size = groupsD.size();
+        if (!groupsD.containsAll(cuboid.getColumns().subList(0, size))) {
+            possible = false;
+            logger.info("Storage limit push down is impossible because groupD is not clustered at head, groupsD: " + groupsD //
+                    + " with cuboid columns: " + cuboid.getColumns());
+        }
+
+        //if exists measures like max(cal_dt), then it's not a perfect cuboid match, cannot apply limit
+        for (FunctionDesc functionDesc : functionDescs) {
+            if (functionDesc.isDimensionAsMetric()) {
+                possible = false;
+                logger.info("Storage limit push down is impossible because {} isDimensionAsMetric ", functionDesc);
+            }
+        }
+
+        if (possible) {
             logger.info("Enable limit " + context.getLimit());
             context.enableLimit();
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IFetchSourceAware.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IFetchSourceAware.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IFetchSourceAware.java
new file mode 100644
index 0000000..d51ca4c
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/IFetchSourceAware.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.util.Iterator;
+
+public interface IFetchSourceAware<E> {
+    public Iterator<? extends E> getFetchSource();
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PeekingImpl.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PeekingImpl.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PeekingImpl.java
new file mode 100644
index 0000000..96a232f
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/PeekingImpl.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import java.util.Iterator;
+
+import com.google.common.collect.PeekingIterator;
+
+/**
+ * copied from guava, change iterator access modifier to public
+ * 
+ * Implementation of PeekingIterator that avoids peeking unless necessary.
+ */
+class PeekingImpl<E> implements PeekingIterator<E> {
+
+    public final Iterator<? extends E> iterator;
+    private boolean hasPeeked;
+    private E peekedElement;
+
+    public PeekingImpl(Iterator<? extends E> iterator) {
+        this.iterator = checkNotNull(iterator);
+    }
+
+    @Override
+    public boolean hasNext() {
+        return hasPeeked || iterator.hasNext();
+    }
+
+    @Override
+    public E next() {
+        if (!hasPeeked) {
+            return iterator.next();
+        }
+        E result = peekedElement;
+        hasPeeked = false;
+        peekedElement = null;
+        return result;
+    }
+
+    @Override
+    public void remove() {
+        checkState(!hasPeeked, "Can't remove after you've peeked at next");
+        iterator.remove();
+    }
+
+    @Override
+    public E peek() {
+        if (!hasPeeked) {
+            peekedElement = iterator.next();
+            hasPeeked = true;
+        }
+        return peekedElement;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/28e94230/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
new file mode 100644
index 0000000..61267ae
--- /dev/null
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/SegmentCubeTupleIterator.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *  
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *  
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.storage.gtrecord;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import org.apache.kylin.cube.cuboid.Cuboid;
+import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.measure.MeasureType.IAdvMeasureFiller;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.tuple.ITuple;
+import org.apache.kylin.metadata.tuple.ITupleIterator;
+import org.apache.kylin.metadata.tuple.Tuple;
+import org.apache.kylin.metadata.tuple.TupleInfo;
+import org.apache.kylin.storage.StorageContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+public class SegmentCubeTupleIterator implements ITupleIterator {
+
+    private static final Logger logger = LoggerFactory.getLogger(SegmentCubeTupleIterator.class);
+
+    protected final CubeSegmentScanner scanner;
+    protected final Cuboid cuboid;
+    protected final Set<TblColRef> selectedDimensions;
+    protected final Set<FunctionDesc> selectedMetrics;
+    protected final TupleInfo tupleInfo;
+    protected final Tuple tuple;
+    protected final StorageContext context;
+
+    protected Iterator<GTRecord> gtItr;
+    protected CubeTupleConverter cubeTupleConverter;
+    protected Tuple next;
+
+    private List<IAdvMeasureFiller> advMeasureFillers;
+    private int advMeasureRowsRemaining;
+    private int advMeasureRowIndex;
+
+    public SegmentCubeTupleIterator(CubeSegmentScanner scanner, Cuboid cuboid, Set<TblColRef> selectedDimensions, //
+            Set<FunctionDesc> selectedMetrics, TupleInfo returnTupleInfo, StorageContext context) {
+        this.scanner = scanner;
+        this.cuboid = cuboid;
+        this.selectedDimensions = selectedDimensions;
+        this.selectedMetrics = selectedMetrics;
+        this.tupleInfo = returnTupleInfo;
+        this.tuple = new Tuple(returnTupleInfo);
+        this.context = context;
+        this.gtItr = getGTItr(scanner);
+        this.cubeTupleConverter = new CubeTupleConverter(scanner.cubeSeg, cuboid, selectedDimensions, selectedMetrics, tupleInfo);
+    }
+
+    private Iterator<GTRecord> getGTItr(CubeSegmentScanner scanner) {
+        return scanner.iterator();
+    }
+
+    @Override
+    public boolean hasNext() {
+        if (next != null)
+            return true;
+
+        // consume any left rows from advanced measure filler
+        if (advMeasureRowsRemaining > 0) {
+            for (IAdvMeasureFiller filler : advMeasureFillers) {
+                filler.fillTuple(tuple, advMeasureRowIndex);
+            }
+            advMeasureRowIndex++;
+            advMeasureRowsRemaining--;
+            next = tuple;
+            return true;
+        }
+
+        // now we have a GTRecord
+        if (!gtItr.hasNext()) {
+            return false;
+        }
+        GTRecord curRecord = gtItr.next();
+
+        Preconditions.checkNotNull(cubeTupleConverter);
+
+        // translate into tuple
+        advMeasureFillers = cubeTupleConverter.translateResult(curRecord, tuple);
+
+        // the simple case
+        if (advMeasureFillers == null) {
+            next = tuple;
+            return true;
+        }
+
+        // advanced measure filling, like TopN, will produce multiple tuples out of one record
+        advMeasureRowsRemaining = -1;
+        for (IAdvMeasureFiller filler : advMeasureFillers) {
+            if (advMeasureRowsRemaining < 0)
+                advMeasureRowsRemaining = filler.getNumOfRows();
+            if (advMeasureRowsRemaining != filler.getNumOfRows())
+                throw new IllegalStateException();
+        }
+        if (advMeasureRowsRemaining < 0)
+            throw new IllegalStateException();
+
+        advMeasureRowIndex = 0;
+        return hasNext();
+    }
+
+    @Override
+    public ITuple next() {
+        // fetch next record
+        if (next == null) {
+            hasNext();
+            if (next == null)
+                throw new NoSuchElementException();
+        }
+
+        ITuple result = next;
+        next = null;
+        return result;
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close() {
+        close(scanner);
+    }
+
+    protected void close(CubeSegmentScanner scanner) {
+        try {
+            scanner.close();
+        } catch (IOException e) {
+            logger.error("Exception when close CubeScanner", e);
+        }
+    }
+
+    public CubeTupleConverter getCubeTupleConverter() {
+        return cubeTupleConverter;
+    }
+}