You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2016/06/24 07:05:11 UTC
[33/50] kylin git commit: KYLIN-1787 Enable limit and threshold for
v2 storage
KYLIN-1787 Enable limit and threshold for v2 storage
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/48e16492
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/48e16492
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/48e16492
Branch: refs/heads/stream_m1
Commit: 48e164924944a6f945a9d1e20828967146201695
Parents: 97bfd57
Author: Li Yang <li...@apache.org>
Authored: Wed Jun 22 16:57:38 2016 +0800
Committer: Li Yang <li...@apache.org>
Committed: Wed Jun 22 16:58:07 2016 +0800
----------------------------------------------------------------------
.../cube/inmemcubing/InMemCubeBuilder.java | 4 +-
.../kylin/gridtable/GTAggregateScanner.java | 3 --
.../kylin/gridtable/GTScanRangePlanner.java | 6 +--
.../apache/kylin/gridtable/GTScanRequest.java | 41 ++++++++++++++------
.../gridtable/benchmark/GTScannerBenchmark.java | 2 +-
.../benchmark/GTScannerBenchmark2.java | 2 +-
.../gridtable/AggregationCacheSpillTest.java | 6 ++-
.../kylin/gridtable/DictGridTableTest.java | 4 +-
.../kylin/gridtable/SimpleGridTableTest.java | 2 +-
.../hbase/cube/v2/CubeSegmentScanner.java | 11 +++++-
.../storage/hbase/cube/v2/CubeStorageQuery.java | 25 +++++++-----
.../cube/v2/SequentialCubeTupleIterator.java | 16 ++++++++
.../coprocessor/endpoint/CubeVisitService.java | 4 ++
13 files changed, 88 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index c270d3f..fe83ccf 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -329,7 +329,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
IGTScanner baseInput = new InputConverter(baseCuboid.getInfo(), input);
Pair<ImmutableBitSet, ImmutableBitSet> dimensionMetricsBitSet = InMemCubeBuilderUtils.getDimensionAndMetricColumnBitSet(baseCuboidId, measureCount);
- GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null, true, 0);
+ GTScanRequest req = new GTScanRequest(baseCuboid.getInfo(), null, null, dimensionMetricsBitSet.getFirst(), dimensionMetricsBitSet.getSecond(), metricsAggrFuncs, null);
GTAggregateScanner aggregationScanner = new GTAggregateScanner(baseInput, req);
aggregationScanner.trackMemoryLevel(baseCuboidMemTracker);
@@ -397,7 +397,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
private GTAggregateScanner prepareGTAggregationScanner(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
GTInfo info = gridTable.getInfo();
- GTScanRequest req = new GTScanRequest(info, null, null, aggregationColumns, measureColumns, metricsAggrFuncs, null, true, 0);
+ GTScanRequest req = new GTScanRequest(info, null, null, aggregationColumns, measureColumns, metricsAggrFuncs, null);
GTAggregateScanner scanner = (GTAggregateScanner) gridTable.scan(req);
// for child cuboid, some measures don't need aggregation.
http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index d748297..53cc387 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -25,8 +25,6 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Comparator;
@@ -38,7 +36,6 @@ import java.util.SortedMap;
import org.apache.commons.io.IOUtils;
import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
import org.apache.kylin.common.util.ImmutableBitSet;
import org.apache.kylin.common.util.MemoryBudgetController;
import org.apache.kylin.common.util.MemoryBudgetController.MemoryWaterLevel;
http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
index 4f641e9..7173815 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRangePlanner.java
@@ -162,11 +162,11 @@ public class GTScanRangePlanner {
}
- public GTScanRequest planScanRequest(boolean allowPreAggregate) {
+ public GTScanRequest planScanRequest() {
GTScanRequest scanRequest;
List<GTScanRange> scanRanges = this.planScanRanges();
if (scanRanges != null && scanRanges.size() != 0) {
- scanRequest = new GTScanRequest(gtInfo, scanRanges, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter, allowPreAggregate, cubeSegment.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
+ scanRequest = new GTScanRequest(gtInfo, scanRanges, gtDimensions, gtAggrGroups, gtAggrMetrics, gtAggrFuncs, gtFilter);
} else {
scanRequest = null;
}
@@ -177,7 +177,7 @@ public class GTScanRangePlanner {
* Overwrite this method to provide smarter storage visit plans
* @return
*/
- public List<GTScanRange> planScanRanges() {
+ List<GTScanRange> planScanRanges() {
TupleFilter flatFilter = flattenToOrAndFilter(gtFilter);
List<Collection<ColumnRange>> orAndDimRanges = translateToOrAndDimRanges(flatFilter);
http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
index 55d84e6..593469a 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTScanRequest.java
@@ -52,21 +52,18 @@ public class GTScanRequest {
// hint to storage behavior
private boolean allowPreAggregation = true;
- private double aggrCacheGB = 0; // no limit
+ private double aggrCacheGB = 0; // 0 means no row/memory limit; positive means memory limit in GB; negative means row limit
public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet columns, TupleFilter filterPushDown) {
- this.info = info;
- if (ranges == null) {
- this.ranges = Lists.newArrayList(new GTScanRange(new GTRecord(info), new GTRecord(info)));
- } else {
- this.ranges = ranges;
- }
- this.columns = columns;
- this.filterPushDown = filterPushDown;
- validate(info);
+ this(info, ranges, columns, null, null, null, filterPushDown, true, 0);
}
public GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
+ ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown) {
+ this(info, ranges, dimensions, aggrGroupBy, aggrMetrics, aggrMetricsFuncs, filterPushDown, true, 0);
+ }
+
+ private GTScanRequest(GTInfo info, List<GTScanRange> ranges, ImmutableBitSet dimensions, ImmutableBitSet aggrGroupBy, //
ImmutableBitSet aggrMetrics, String[] aggrMetricsFuncs, TupleFilter filterPushDown, boolean allowPreAggregation, double aggrCacheGB) {
this.info = info;
if (ranges == null) {
@@ -238,14 +235,36 @@ public class GTScanRequest {
return aggrMetricsFuncs;
}
+ public boolean isAllowPreAggregation() {
+ return allowPreAggregation;
+ }
+
+ public void setAllowPreAggregation(boolean allowPreAggregation) {
+ this.allowPreAggregation = allowPreAggregation;
+ }
+
public double getAggrCacheGB() {
- return aggrCacheGB;
+ if (aggrCacheGB < 0)
+ return 0;
+ else
+ return aggrCacheGB;
}
public void setAggrCacheGB(double gb) {
this.aggrCacheGB = gb;
}
+ public int getRowLimit() {
+ if (aggrCacheGB < 0)
+ return (int) -aggrCacheGB;
+ else
+ return 0;
+ }
+
+ public void setRowLimit(int limit) {
+ aggrCacheGB = -limit;
+ }
+
@Override
public String toString() {
return "GTScanRequest [range=" + ranges + ", columns=" + columns + ", filterPushDown=" + filterPushDown + ", aggrGroupBy=" + aggrGroupBy + ", aggrMetrics=" + aggrMetrics + ", aggrMetricsFuncs=" + Arrays.toString(aggrMetricsFuncs) + "]";
http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
index 19c4bea..0d96ac0 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark.java
@@ -110,7 +110,7 @@ public class GTScannerBenchmark {
@SuppressWarnings("unused")
private void testAggregate(ImmutableBitSet groupBy) throws IOException {
long t = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null, true, 10);
+ GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null);
IGTScanner scanner = req.decorateScanner(gen.generate(N));
long count = 0;
http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
index e1e3881..8a376ba 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/benchmark/GTScannerBenchmark2.java
@@ -132,7 +132,7 @@ public class GTScannerBenchmark2 {
@SuppressWarnings("unused")
private void testAggregate(ImmutableBitSet groupBy) throws IOException {
long t = System.currentTimeMillis();
- GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null, true, 10);
+ GTScanRequest req = new GTScanRequest(info, null, dimensions, groupBy, metrics, aggrFuncs, null);
IGTScanner scanner = req.decorateScanner(gen.generate(N));
long count = 0;
http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
index ff67c9c..fd891f5 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/AggregationCacheSpillTest.java
@@ -83,7 +83,8 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase {
}
};
- GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true, 0.5);
+ GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(0, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null);
+ scanRequest.setAggrCacheGB(0.5);
GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest);
@@ -125,7 +126,8 @@ public class AggregationCacheSpillTest extends LocalFileMetadataTestCase {
};
// all-in-mem testcase
- GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(1, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null, true, 0.5);
+ GTScanRequest scanRequest = new GTScanRequest(INFO, null, new ImmutableBitSet(0, 3), new ImmutableBitSet(1, 3), new ImmutableBitSet(3, 6), new String[] { "SUM", "SUM", "COUNT_DISTINCT" }, null);
+ scanRequest.setAggrCacheGB(0.5);
GTAggregateScanner scanner = new GTAggregateScanner(inputScanner, scanRequest);
http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
index a328216..af39e21 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/DictGridTableTest.java
@@ -289,7 +289,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
LogicalTupleFilter fNotPlusUnevaluatable = not(unevaluatable(info.colRef(1)));
LogicalTupleFilter filter = and(fComp, fUnevaluatable, fNotPlusUnevaluatable);
- GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter, true, 0);
+ GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
// note the unEvaluatable column 1 in filter is added to group by
assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], [null], [null]], aggrGroupBy={0, 1}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
@@ -305,7 +305,7 @@ public class DictGridTableTest extends LocalFileMetadataTestCase {
CompareTupleFilter fComp2 = compare(info.colRef(1), FilterOperatorEnum.GT, enc(info, 1, "10"));
LogicalTupleFilter filter = and(fComp1, fComp2);
- GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter, true, 0);
+ GTScanRequest req = new GTScanRequest(info, null, null, setOf(0), setOf(3), new String[] { "sum" }, filter);
// note the evaluatable column 1 in filter is added to returned columns but not in group by
assertEquals("GTScanRequest [range=[[null, null]-[null, null]], columns={0, 1, 3}, filterPushDown=AND [NULL.GT_MOCKUP_TABLE.0 GT [\\x00\\x00\\x01J\\xE5\\xBD\\x5C\\x00], NULL.GT_MOCKUP_TABLE.1 GT [\\x00]], aggrGroupBy={0}, aggrMetrics={3}, aggrMetricsFuncs=[sum]]", req.toString());
http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
index 1b8529f..d300787 100644
--- a/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/gridtable/SimpleGridTableTest.java
@@ -104,7 +104,7 @@ public class SimpleGridTableTest extends LocalFileMetadataTestCase {
}
private IGTScanner scanAndAggregate(GridTable table) throws IOException {
- GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null, true, 0);
+ GTScanRequest req = new GTScanRequest(table.getInfo(), null, null, setOf(0, 2), setOf(3, 4), new String[] { "count", "sum" }, null);
IGTScanner scanner = table.scan(req);
int i = 0;
for (GTRecord r : scanner) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
index 2b55ace..9890ae9 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeSegmentScanner.java
@@ -37,6 +37,7 @@ import org.apache.kylin.metadata.filter.ITupleFilterTransformer;
import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.storage.StorageContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,7 +52,7 @@ public class CubeSegmentScanner implements IGTScanner {
final GTScanRequest scanRequest;
public CubeSegmentScanner(CubeSegment cubeSeg, Cuboid cuboid, Set<TblColRef> dimensions, Set<TblColRef> groups, //
- Collection<FunctionDesc> metrics, TupleFilter filter, boolean allowPreAggregate) {
+ Collection<FunctionDesc> metrics, TupleFilter filter, StorageContext context) {
this.cuboid = cuboid;
this.cubeSeg = cubeSeg;
@@ -66,7 +67,13 @@ public class CubeSegmentScanner implements IGTScanner {
} catch (Exception e) {
throw new RuntimeException(e);
}
- scanRequest = scanRangePlanner.planScanRequest(allowPreAggregate);
+ scanRequest = scanRangePlanner.planScanRequest();
+ if (scanRequest != null) {
+ scanRequest.setAllowPreAggregation(!context.isExactAggregation());
+ scanRequest.setAggrCacheGB(cubeSeg.getCubeInstance().getConfig().getQueryCoprocessorMemGB());
+ if (context.isLimitEnabled())
+ scanRequest.setRowLimit(context.getLimit());
+ }
scanner = new ScannerWorker(cubeSeg, cuboid, scanRequest);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index c8a5412..cec4e2f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -40,6 +40,7 @@ import org.apache.kylin.metadata.filter.TupleFilter;
import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
import org.apache.kylin.metadata.model.FunctionDesc;
import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.metadata.realization.SQLDigest;
@@ -93,7 +94,7 @@ public class CubeStorageQuery implements IStorageQuery {
Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
dimensionsD.addAll(groupsD);
dimensionsD.addAll(otherDimsD);
- Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc,dimensionsD, metrics);
+ Cuboid cuboid = Cuboid.identifyCuboid(cubeDesc, dimensionsD, metrics);
context.setCuboid(cuboid);
// isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
@@ -104,20 +105,16 @@ public class CubeStorageQuery implements IStorageQuery {
// replace derived columns in filter with host columns; columns on loosened condition must be added to group by
TupleFilter filterD = translateDerived(filter, groupsD);
- //actually even if the threshold is set, it will not be used in this query engine
setThreshold(dimensionsD, metrics, context); // set cautious threshold to prevent out of memory
-
setLimit(filter, context);
List<CubeSegmentScanner> scanners = Lists.newArrayList();
for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
CubeSegmentScanner scanner;
if (cubeSeg.getInputRecords() == 0) {
- logger.warn("cube segment {} input record is 0, " +
- "it may caused by kylin failed to the job counter " +
- "as the hadoop history server wasn't running", cubeSeg);
+ logger.warn("cube segment {} input record is 0, " + "it may caused by kylin failed to the job counter " + "as the hadoop history server wasn't running", cubeSeg);
}
- scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, !isExactAggregation);
+ scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, context);
scanners.add(scanner);
}
@@ -170,8 +167,6 @@ public class CubeStorageQuery implements IStorageQuery {
return expanded;
}
-
-
@SuppressWarnings("unchecked")
private Set<TblColRef> findSingleValueColumns(TupleFilter filter) {
Collection<? extends TupleFilter> toCheck;
@@ -236,6 +231,16 @@ public class CubeStorageQuery implements IStorageQuery {
+ " (single value column: " + singleValuesD + ")");
}
+ // for partitioned cube, the partition column must belong to group by or has single value
+ PartitionDesc partDesc = cuboid.getCubeDesc().getModel().getPartitionDesc();
+ if (partDesc.isPartitioned()) {
+ TblColRef col = partDesc.getPartitionDateColumnRef();
+ if (!groups.contains(col) && !singleValuesD.contains(col)) {
+ exact = false;
+ logger.info("exactAggregation is false because cube is partitioned and " + col + " is not on group by");
+ }
+ }
+
if (exact) {
logger.info("exactAggregation is true");
}
@@ -343,7 +348,7 @@ public class CubeStorageQuery implements IStorageQuery {
long rowEst = this.cubeInstance.getConfig().getQueryMemBudget() / rowSizeEst;
if (rowEst > 0) {
- logger.info("Memory budget is set to: " + rowEst);
+ logger.info("Memory budget is set to " + rowEst + " rows");
context.setThreshold((int) rowEst);
} else {
logger.info("Memory budget is not set.");
http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
index bbc7370..f8b055c 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/SequentialCubeTupleIterator.java
@@ -34,6 +34,7 @@ import org.apache.kylin.metadata.tuple.ITupleIterator;
import org.apache.kylin.metadata.tuple.Tuple;
import org.apache.kylin.metadata.tuple.TupleInfo;
import org.apache.kylin.storage.StorageContext;
+import org.apache.kylin.storage.exception.ScanOutOfLimitException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,6 +78,9 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
if (next != null)
return true;
+ if (hitLimitAndThreshold())
+ return false;
+
// consume any left rows from advanced measure filler
if (advMeasureRowsRemaining > 0) {
for (IAdvMeasureFiller filler : advMeasureFillers) {
@@ -137,6 +141,18 @@ public class SequentialCubeTupleIterator implements ITupleIterator {
}
+ private boolean hitLimitAndThreshold() {
+ // check limit
+ if (context.isLimitEnabled() && scanCount >= context.getLimit() + context.getOffset()) {
+ return true;
+ }
+ // check threshold
+ if (scanCount >= context.getThreshold()) {
+ throw new ScanOutOfLimitException("Scan row count exceeded threshold: " + context.getThreshold() + ", please add filter condition to narrow down backend scan range, like where clause.");
+ }
+ return false;
+ }
+
@Override
public ITuple next() {
// fetch next record
http://git-wip-us.apache.org/repos/asf/kylin/blob/48e16492/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
index 3827aa9..d320dc5 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/coprocessor/endpoint/CubeVisitService.java
@@ -243,6 +243,7 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
final MutableBoolean scanNormalComplete = new MutableBoolean(true);
final long startTime = this.serviceStartTime;
final long timeout = request.getTimeout();
+ final int rowLimit = scanReq.getRowLimit();
final CellListIterator cellListIterator = new CellListIterator() {
@@ -257,6 +258,9 @@ public class CubeVisitService extends CubeVisitProtos.CubeVisitService implement
@Override
public boolean hasNext() {
+ if (rowLimit > 0 && rowLimit <= counter)
+ return false;
+
if (counter % 1000 == 1) {
if (System.currentTimeMillis() - startTime > timeout) {
scanNormalComplete.setValue(false);