You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ga...@apache.org on 2016/11/14 13:18:39 UTC
kylin git commit: KYLIN-2173 push down limit leads to wrong answer
when filter is loosened
Repository: kylin
Updated Branches:
refs/heads/master 8822e78af -> 13e1e75f8
KYLIN-2173 push down limit leads to wrong answer when filter is loosened
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/13e1e75f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/13e1e75f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/13e1e75f
Branch: refs/heads/master
Commit: 13e1e75f86b685fc5c30879f13efbd4a2c71edf0
Parents: 8822e78
Author: gaodayue <ga...@meituan.com>
Authored: Mon Nov 14 21:17:44 2016 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Mon Nov 14 21:17:44 2016 +0800
----------------------------------------------------------------------
.../apache/kylin/storage/StorageContext.java | 1 +
.../gtrecord/GTCubeStorageQueryBase.java | 97 ++++----------------
2 files changed, 18 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/13e1e75f/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index cc39918..b338b3c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -135,6 +135,7 @@ public class StorageContext {
tempPushDownLimit, pushDownLimitMax);
} else {
this.finalPushDownLimit = tempPushDownLimit;
+ logger.info("Enable limit: " + tempPushDownLimit);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/13e1e75f/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index b51af59..23ebf33 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -109,33 +109,23 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
logger.info("Cuboid identified: cube={}, cuboidId={}, groupsD={}, otherDimsD={}", cubeInstance.getName(), cuboid.getId(), groupsD, otherDimsD);
context.setCuboid(cuboid);
- // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
+ // set whether to aggr at storage
Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
- boolean exactAggregation = isExactAggregation(cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation);
- context.setExactAggregation(exactAggregation);
+ context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD));
// replace derived columns in filter with host columns; columns on loosened condition must be added to group by
- TupleFilter filterD = translateDerived(filter, groupsD);
+ Set<TblColRef> loosenedColumnD = Sets.newHashSet();
+ TupleFilter filterD = translateDerived(filter, loosenedColumnD);
+ groupsD.addAll(loosenedColumnD);
- //set whether to aggr at storage
- context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD));
// set limit push down
- enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, sqlDigest.aggregations, context);
- context.setFinalPushDownLimit(cubeInstance);
+ enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, loosenedColumnD, sqlDigest.aggregations, context);
// set cautious threshold to prevent out of memory
setThresholdIfNecessary(dimensionsD, metrics, context);
List<CubeSegmentScanner> scanners = Lists.newArrayList();
for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
CubeSegmentScanner scanner;
- if (cubeSeg.getInputRecords() == 0) {
- if (!skipZeroInputSegment(cubeSeg)) {
- logger.warn("cube segment {} input record is 0, " + "it may caused by kylin failed to the job counter " + "as the hadoop history server wasn't running", cubeSeg);
- } else {
- logger.warn("cube segment {} input record is 0, skip it ", cubeSeg);
- continue;
- }
- }
try {
scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, context, getGTStorage());
} catch (IllegalArgumentException ex) {
@@ -155,10 +145,6 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
}
- protected boolean skipZeroInputSegment(CubeSegment cubeSegment) {
- return false;
- }
-
protected abstract String getGTStorage();
private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
@@ -265,45 +251,6 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
}
- //exact aggregation was introduced back when we had some measures (like holistic distinct count) that is sensitive
- //to post aggregation. Now that we don't have such measure any more, isExactAggregation should be useless (at least in v2 storage and above)
- public boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
- boolean exact = true;
-
- if (cuboid.requirePostAggregation()) {
- exact = false;
- logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
- }
-
- // derived aggregation is bad, unless expanded columns are already in group by
- if (groups.containsAll(derivedPostAggregation) == false) {
- exact = false;
- logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
- }
-
- // other columns (from filter) is bad, unless they are ensured to have single value
- if (singleValuesD.containsAll(othersD) == false) {
- exact = false;
- logger.info("exactAggregation is false because some column not on group by: " + othersD //
- + " (single value column: " + singleValuesD + ")");
- }
-
- // for partitioned cube, the partition column must belong to group by or has single value
- PartitionDesc partDesc = cuboid.getCubeDesc().getModel().getPartitionDesc();
- if (partDesc.isPartitioned()) {
- TblColRef col = partDesc.getPartitionDateColumnRef();
- if (!groups.contains(col) && !singleValuesD.contains(col)) {
- exact = false;
- logger.info("exactAggregation is false because cube is partitioned and " + col + " is not on group by");
- }
- }
-
- if (exact) {
- logger.info("exactAggregation is true, cuboid id is " + cuboid.getId());
- }
- return exact;
- }
-
@SuppressWarnings("unchecked")
private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
if (filter == null)
@@ -366,26 +313,13 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
return;
if (filter instanceof ColumnTupleFilter) {
- collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
+ collector.add(((ColumnTupleFilter) filter).getColumn());
}
for (TupleFilter child : filter.getChildren()) {
collectColumnsRecursively(child, collector);
}
}
- private void collectColumns(TblColRef col, Set<TblColRef> collector) {
- if (cubeDesc.isExtendedColumn(col)) {
- throw new CubeDesc.CannotFilterExtendedColumnException(col);
- }
- if (cubeDesc.isDerived(col)) {
- DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
- for (TblColRef h : hostInfo.columns)
- collector.add(h);
- } else {
- collector.add(col);
- }
- }
-
private void setThresholdIfNecessary(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
boolean hasMemHungryMeasure = false;
for (FunctionDesc func : metrics) {
@@ -412,17 +346,20 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
}
- private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Collection<FunctionDesc> functionDescs, StorageContext context) {
+ private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Set<TblColRef> loosenedColumnD, Collection<FunctionDesc> functionDescs, StorageContext context) {
boolean possible = true;
- boolean goodFilter = filter == null || TupleFilter.isEvaluableRecursively(filter);
- if (!goodFilter) {
+ if (!TupleFilter.isEvaluableRecursively(filter)) {
+ possible = false;
+ logger.info("Storage limit push down is impossible because the filter isn't evaluable");
+ }
+
+ if (!loosenedColumnD.isEmpty()) { // KYLIN-2173
possible = false;
- logger.info("Storage limit push down is impossible because the filter is unevaluatable");
+ logger.info("Storage limit push down is impossible because filter is loosened: " + loosenedColumnD);
}
- boolean goodSort = !context.hasSort();
- if (!goodSort) {
+ if (context.hasSort()) {
possible = false;
logger.info("Storage limit push down is impossible because the query has order by");
}
@@ -450,8 +387,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
if (possible) {
- logger.info("Enable limit " + context.getLimit());
context.enableLimit();
+ context.setFinalPushDownLimit(cubeInstance);
}
}