You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/06/24 07:05:11 UTC

[33/50] kylin git commit: KYLIN-1787 Enable limit and threshold for v2 storage

KYLIN-1787 Enable limit and threshold for v2 storage


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/48e16492
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/48e16492
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/48e16492

Branch: refs/heads/stream_m1
Commit: 48e164924944a6f945a9d1e20828967146201695
Parents: 97bfd57
Author: Li Yang <li...@apache.org>
Authored: Wed Jun 22 16:57:38 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Wed Jun 22 16:58:07 2016 +0800

----------------------------------------------------------------------
 .../cube/inmemcubing/InMemCubeBuilder.java      |  4 +-
 .../kylin/gridtable/GTAggregateScanner.java     |  3 --
 .../kylin/gridtable/GTScanRangePlanner.java     |  6 +--
 .../apache/kylin/gridtable/GTScanRequest.java   | 41 ++++++++++++++------
 .../gridtable/benchmark/GTScannerBenchmark.java |  2 +-
 .../benchmark/GTScannerBenchmark2.java          |  2 +-
 .../gridtable/AggregationCacheSpillTest.java    |  6 ++-
 .../kylin/gridtable/DictGridTableTest.java      |  4 +-
 .../kylin/gridtable/SimpleGridTableTest.java    |  2 +-
 .../hbase/cube/v2/CubeSegmentScanner.java       | 11 +++++-
 .../storage/hbase/cube/v2/CubeStorageQuery.java | 25 +++++++-----
 .../cube/v2/SequentialCubeTupleIterator.java    | 16 ++++++++
 .../coprocessor/endpoint/CubeVisitService.java  |  4 ++
 13 files changed, 88 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index c270d3f..fe83ccf 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -329,7 +329,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         IGTScanner baseInput = new InputConverter(baseCuboid.getInfo(), input);
 
         Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
-        GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null, true, 0);
+        GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
         GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req);
         aggregationScanner.trackMemoryLevel(baseCuboidMemTracker);
 
@@ -397,7 +397,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
     private GTAggregateScanner prepareGTAggregationScanner(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
         GTInfo info = gridTable.getInfo();
-        GTScanRequest req = new GTScanRequest(info, null, null, aggregationColumns, measureColumns, metricsAggrFuncs, null, true, 0);
+        GTScanRequest req = new GTScanRequest(info, null, null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
         GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req);
 
         // for child cuboid, some measures don't need aggregation.

http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index d748297..53cc387 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -25,8 +25,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Comparator;
@@ -38,7 +36,6 @@ import java.util.SortedMap;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;

http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
index 4f641e9..7173815 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -162,11 +162,11 @@ public class GTScanRangePlanner {
     }
 
 
-    public GTScanRequest planScanRequest(boolean allowPreAggregate) {
+    public GTScanRequest planScanRequest() {
         GTScanRequest scanRequest;
         List<GTScanRange> scanRanges = this.planScanRanges();
         if (scanRanges != null && scanRanges.size() != 0) {
-            scanRequest = new GTScanRequest(gtInfo, scanRanges, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate, cubeSegment.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
+            scanRequest = new GTScanRequest(gtInfo, scanRanges, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter);
         } else {
             scanRequest = null;
         }
@@ -177,7 +177,7 @@ public class GTScanRangePlanner {
      * Overwrite this method to provide smarter storage visit plans
      * @return
      */
-    public List<GTScanRange> planScanRanges() {
+    List<GTScanRange> planScanRanges() {
         TupleFilter flatFilter = flattenToOrAndFilter(gtFilter);
 
         List<Collection<ColumnRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter);

http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 55d84e6..593469a 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -52,21 +52,18 @@ public class GTScanRequest {
 
     // hint to storage behavior
     private boolean allowPreAggregation = true;
-    private double aggrCacheGB = 0; // no limit
+    private double aggrCacheGB = 0; // 0 means no row/memory limit; positive means memory limit in GB; negative means row limit
 
     public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet columns, TupleFilter filterPushDown) {
-        this.info = info;
-        if (ranges == null) {
-            this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info)));
-        } else {
-            this.ranges = ranges;
-        }
-        this.columns = columns;
-        this.filterPushDown = filterPushDown;
-        validate(info);
+        this(info, ranges, columns, null, null, null, filterPushDown, true, 0);
     }
 
     public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
+            ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown) {
+        this(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, true, 0);
+    }
+
+    private GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
             ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowPreAggregation, double aggrCacheGB) {
         this.info = info;
         if (ranges == null) {
@@ -238,14 +235,36 @@ public class GTScanRequest {
         return aggrMetricsFuncs;
     }
 
+    public boolean isAllowPreAggregation() {
+        return allowPreAggregation;
+    }
+
+    public void setAllowPreAggregation(boolean allowPreAggregation) {
+        this.allowPreAggregation = allowPreAggregation;
+    }
+
     public double getAggrCacheGB() {
-        return aggrCacheGB;
+        if (aggrCacheGB < 0)
+            return 0;
+        else
+            return aggrCacheGB;
     }
 
     public void setAggrCacheGB(double gb) {
         this.aggrCacheGB = gb;
     }
 
+    public int getRowLimit() {
+        if (aggrCacheGB < 0)
+            return (int) -aggrCacheGB;
+        else
+            return 0;
+    }
+
+    public void setRowLimit(int limit) {
+        aggrCacheGB = -limit;
+    }
+
     @Override
     public String toString() {
         return "GTScanRequest [range=" + ranges + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]";

http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
index 19c4bea..0d96ac0 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
@@ -110,7 +110,7 @@ public class GTScannerBenchmark {
     @SuppressWarnings("unused")
     private void testAggregate(ImmutableBitSet groupBy) throws IOException {
         long t = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null, true, 10);
+        GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null);
         IGTScanner scanner = req.decorateScanner(gen.generate(N));
 
         long count = 0;

http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
index e1e3881..8a376ba 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
@@ -132,7 +132,7 @@ public class GTScannerBenchmark2 {
     @SuppressWarnings("unused")
     private void testAggregate(ImmutableBitSet groupBy) throws IOException {
         long t = System.currentTimeMillis();
-        GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null, true, 10);
+        GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null);
         IGTScanner scanner = req.decorateScanner(gen.generate(N));
 
         long count = 0;

http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
index ff67c9c..fd891f5 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
@@ -83,7 +83,8 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase {
             }
         };
 
-        GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true, 0.5);
+        GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null);
+        scanRequest.setAggrCacheGB(0.5);
 
         GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest);
 
@@ -125,7 +126,8 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase {
         };
 
         // all-in-mem testcase
-        GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(1, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true, 0.5);
+        GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(1, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null);
+        scanRequest.setAggrCacheGB(0.5);
 
         GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
index a328216..af39e21 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -289,7 +289,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
         LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
 
-        GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter, true, 0);
+        GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
 
         // note the unEvaluatable column 1 in filter is added to group by
         assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
@@ -305,7 +305,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
         CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
         LogicalTupleFilter filter = and(fComp1, fComp2);
 
-        GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter, true, 0);
+        GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
         // note the evaluatable column 1 in filter is added to returned columns but not in group by
         assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
index 1b8529f..d300787 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
@@ -104,7 +104,7 @@ public class SimpleGridTableTest extends LocalFileMetadataTestCase {
     }
 
     private IGTScanner scanAndAggregate(GridTable table) throws IOException {
-        GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null, true, 0);
+        GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null);
         IGTScanner scanner = table.scan(req);
         int i = 0;
         for (GTRecord r : scanner) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
index 2b55ace..9890ae9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
@@ -37,6 +37,7 @@ import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.StorageContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,7 +52,7 @@ public class CubeSegmentScanner implements IGTScanner {
     final GTScanRequest scanRequest;
 
     public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
-            Collection<FunctionDesc> metrics, TupleFilter filter, boolean allowPreAggregate) {
+            Collection<FunctionDesc> metrics, TupleFilter filter, StorageContext context) {
         this.cuboid = cuboid;
         this.cubeSeg = cubeSeg;
 
@@ -66,7 +67,13 @@ public class CubeSegmentScanner implements IGTScanner {
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
-        scanRequest = scanRangePlanner.planScanRequest(allowPreAggregate);
+        scanRequest = scanRangePlanner.planScanRequest();
+        if (scanRequest != null) {
+            scanRequest.setAllowPreAggregation(!context.isExactAggregation());
+            scanRequest.setAggrCacheGB(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
+            if (context.isLimitEnabled())
+                scanRequest.setRowLimit(context.getLimit());
+        }
         scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index c8a5412..cec4e2f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -40,6 +40,7 @@ import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.realization.SQLDigest;
@@ -93,7 +94,7 @@ public class CubeStorageQuery implements IStorageQuery {
         Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
         dimensionsD.addAll(groupsD);
         dimensionsD.addAll(otherDimsD);
-        Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc,dimensionsD, metrics);
+        Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc, dimensionsD, metrics);
         context.setCuboid(cuboid);
 
         // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
@@ -104,20 +105,16 @@ public class CubeStorageQuery implements IStorageQuery {
         // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
         TupleFilter filterD = translateDerived(filter, groupsD);
 
-        //actually even if the threshold is set, it will not be used in this query engine
         setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
-
         setLimit(filter, context);
 
         List<CubeSegmentScanner> scanners = Lists.newArrayList();
         for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
             CubeSegmentScanner scanner;
             if (cubeSeg.getInputRecords() == 0) {
-                logger.warn("cube segment {} input record is 0, " +
-                        "it may caused by kylin failed to the job counter " +
-                        "as the hadoop history server wasn't running", cubeSeg);
+                logger.warn("cube segment {} input record is 0, " + "it may caused by kylin failed to the job counter " + "as the hadoop history server wasn't running", cubeSeg);
             }
-            scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, !isExactAggregation);
+            scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, context);
             scanners.add(scanner);
         }
 
@@ -170,8 +167,6 @@ public class CubeStorageQuery implements IStorageQuery {
         return expanded;
     }
 
-   
-
     @SuppressWarnings("unchecked")
     private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
         Collection<? extends TupleFilter> toCheck;
@@ -236,6 +231,16 @@ public class CubeStorageQuery implements IStorageQuery {
                     + " (single value column: " + singleValuesD + ")");
         }
 
+        // for partitioned cube, the partition column must belong to group by or has single value
+        PartitionDesc partDesc = cuboid.getCubeDesc().getModel().getPartitionDesc();
+        if (partDesc.isPartitioned()) {
+            TblColRef col = partDesc.getPartitionDateColumnRef();
+            if (!groups.contains(col) && !singleValuesD.contains(col)) {
+                exact = false;
+                logger.info("exactAggregation is false because cube is partitioned and " + col + " is not on group by");
+            }
+        }
+
         if (exact) {
             logger.info("exactAggregation is true");
         }
@@ -343,7 +348,7 @@ public class CubeStorageQuery implements IStorageQuery {
 
         long rowEst = this.cubeInstance.getConfig().getQueryMemBudget() / rowSizeEst;
         if (rowEst > 0) {
-            logger.info("Memory budget is set to: " + rowEst);
+            logger.info("Memory budget is set to " + rowEst + " rows");
             context.setThreshold((int) rowEst);
         } else {
             logger.info("Memory budget is not set.");

http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
index bbc7370..f8b055c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
@@ -34,6 +34,7 @@ import org.apache.kylin.metadata.tuple.ITupleIterator;
 import org.apache.kylin.metadata.tuple.Tuple;
 import org.apache.kylin.metadata.tuple.TupleInfo;
 import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.exception.ScanOutOfLimitException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -77,6 +78,9 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
         if (next != null)
             return true;
         
+        if (hitLimitAndThreshold())
+            return false;
+        
         // consume any left rows from advanced measure filler
         if (advMeasureRowsRemaining > 0) {
             for (IAdvMeasureFiller filler : advMeasureFillers) {
@@ -137,6 +141,18 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
     }
     
 
+    private boolean hitLimitAndThreshold() {
+        // check limit
+        if (context.isLimitEnabled() && scanCount >= context.getLimit() + context.getOffset()) {
+            return true;
+        }
+        // check threshold
+        if (scanCount >= context.getThreshold()) {
+            throw new ScanOutOfLimitException("Scan row count exceeded threshold: " + context.getThreshold() + ", please add filter condition to narrow down backend scan range, like where clause.");
+        }
+        return false;
+    }
+
     @Override
     public ITuple next() {
         // fetch next record

http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 3827aa9..d320dc5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -243,6 +243,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
             final MutableBoolean scanNormalComplete = new MutableBoolean(true);
             final long startTime = this.serviceStartTime;
             final long timeout = request.getTimeout();
+            final int rowLimit = scanReq.getRowLimit();
 
             final CellListIterator cellListIterator = new CellListIterator() {
 
@@ -257,6 +258,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
 
                 @Override
                 public boolean hasNext() {
+                    if (rowLimit > 0 && rowLimit <= counter)
+                        return false;
+                    
                     if (counter % 1000 == 1) {
                         if (System.currentTimeMillis() - startTime > timeout) {
                             scanNormalComplete.setValue(false);