You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ga...@apache.org on 2016/11/14 13:18:39 UTC

kylin git commit: KYLIN-2173 push down limit leads to wrong answer when filter is loosened

Repository: kylin
Updated Branches:
  refs/heads/master 8822e78af -> 13e1e75f8


KYLIN-2173 push down limit leads to wrong answer when filter is loosened


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/13e1e75f
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/13e1e75f
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/13e1e75f

Branch: refs/heads/master
Commit: 13e1e75f86b685fc5c30879f13efbd4a2c71edf0
Parents: 8822e78
Author: gaodayue <ga...@meituan.com>
Authored: Mon Nov 14 21:17:44 2016 +0800
Committer: gaodayue <ga...@meituan.com>
Committed: Mon Nov 14 21:17:44 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/storage/StorageContext.java    |  1 +
 .../gtrecord/GTCubeStorageQueryBase.java        | 97 ++++----------------
 2 files changed, 18 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/13e1e75f/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index cc39918..b338b3c 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -135,6 +135,7 @@ public class StorageContext {
                     tempPushDownLimit, pushDownLimitMax);
         } else {
             this.finalPushDownLimit = tempPushDownLimit;
+            logger.info("Enable limit: " + tempPushDownLimit);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/13e1e75f/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
----------------------------------------------------------------------
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index b51af59..23ebf33 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -109,33 +109,23 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         logger.info("Cuboid identified: cube={}, cuboidId={}, groupsD={}, otherDimsD={}", cubeInstance.getName(), cuboid.getId(), groupsD, otherDimsD);
         context.setCuboid(cuboid);
 
-        // isExactAggregation? meaning: tuples returned from storage requires no further aggregation in query engine
+        // set whether to aggr at storage
         Set<TblColRef> singleValuesD = findSingleValueColumns(filter);
-        boolean exactAggregation = isExactAggregation(cuboid, groups, otherDimsD, singleValuesD, derivedPostAggregation);
-        context.setExactAggregation(exactAggregation);
+        context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD));
 
         // replace derived columns in filter with host columns; columns on loosened condition must be added to group by
-        TupleFilter filterD = translateDerived(filter, groupsD);
+        Set<TblColRef> loosenedColumnD = Sets.newHashSet();
+        TupleFilter filterD = translateDerived(filter, loosenedColumnD);
+        groupsD.addAll(loosenedColumnD);
 
-        //set whether to aggr at storage
-        context.setNeedStorageAggregation(isNeedStorageAggregation(cuboid, groupsD, singleValuesD));
         // set limit push down
-        enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, sqlDigest.aggregations, context);
-        context.setFinalPushDownLimit(cubeInstance);
+        enableStorageLimitIfPossible(cuboid, groups, derivedPostAggregation, groupsD, filter, loosenedColumnD, sqlDigest.aggregations, context);
         // set cautious threshold to prevent out of memory
         setThresholdIfNecessary(dimensionsD, metrics, context);
 
         List<CubeSegmentScanner> scanners = Lists.newArrayList();
         for (CubeSegment cubeSeg : cubeInstance.getSegments(SegmentStatusEnum.READY)) {
             CubeSegmentScanner scanner;
-            if (cubeSeg.getInputRecords() == 0) {
-                if (!skipZeroInputSegment(cubeSeg)) {
-                    logger.warn("cube segment {} input record is 0, " + "it may caused by kylin failed to the job counter " + "as the hadoop history server wasn't running", cubeSeg);
-                } else {
-                    logger.warn("cube segment {} input record is 0, skip it ", cubeSeg);
-                    continue;
-                }
-            }
             try {
                 scanner = new CubeSegmentScanner(cubeSeg, cuboid, dimensionsD, groupsD, metrics, filterD, context, getGTStorage());
             } catch (IllegalArgumentException ex) {
@@ -155,10 +145,6 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
     }
 
-    protected boolean skipZeroInputSegment(CubeSegment cubeSegment) {
-        return false;
-    }
-
     protected abstract String getGTStorage();
 
     private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
@@ -265,45 +251,6 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         }
     }
 
-    //exact aggregation was introduced back when we had some measures (like holistic distinct count) that is sensitive
-    //to post aggregation. Now that we don't have such measure any more, isExactAggregation should be useless (at least in v2 storage and above)
-    public boolean isExactAggregation(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> othersD, Set<TblColRef> singleValuesD, Set<TblColRef> derivedPostAggregation) {
-        boolean exact = true;
-
-        if (cuboid.requirePostAggregation()) {
-            exact = false;
-            logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
-        }
-
-        // derived aggregation is bad, unless expanded columns are already in group by
-        if (groups.containsAll(derivedPostAggregation) == false) {
-            exact = false;
-            logger.info("exactAggregation is false because derived column require post aggregation: " + derivedPostAggregation);
-        }
-
-        // other columns (from filter) is bad, unless they are ensured to have single value
-        if (singleValuesD.containsAll(othersD) == false) {
-            exact = false;
-            logger.info("exactAggregation is false because some column not on group by: " + othersD //
-                    + " (single value column: " + singleValuesD + ")");
-        }
-
-        // for partitioned cube, the partition column must belong to group by or has single value
-        PartitionDesc partDesc = cuboid.getCubeDesc().getModel().getPartitionDesc();
-        if (partDesc.isPartitioned()) {
-            TblColRef col = partDesc.getPartitionDateColumnRef();
-            if (!groups.contains(col) && !singleValuesD.contains(col)) {
-                exact = false;
-                logger.info("exactAggregation is false because cube is partitioned and " + col + " is not on group by");
-            }
-        }
-
-        if (exact) {
-            logger.info("exactAggregation is true, cuboid id is " + cuboid.getId());
-        }
-        return exact;
-    }
-
     @SuppressWarnings("unchecked")
     private TupleFilter translateDerived(TupleFilter filter, Set<TblColRef> collector) {
         if (filter == null)
@@ -366,26 +313,13 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
             return;
 
         if (filter instanceof ColumnTupleFilter) {
-            collectColumns(((ColumnTupleFilter) filter).getColumn(), collector);
+            collector.add(((ColumnTupleFilter) filter).getColumn());
         }
         for (TupleFilter child : filter.getChildren()) {
             collectColumnsRecursively(child, collector);
         }
     }
 
-    private void collectColumns(TblColRef col, Set<TblColRef> collector) {
-        if (cubeDesc.isExtendedColumn(col)) {
-            throw new CubeDesc.CannotFilterExtendedColumnException(col);
-        }
-        if (cubeDesc.isDerived(col)) {
-            DeriveInfo hostInfo = cubeDesc.getHostInfo(col);
-            for (TblColRef h : hostInfo.columns)
-                collector.add(h);
-        } else {
-            collector.add(col);
-        }
-    }
-
     private void setThresholdIfNecessary(Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics, StorageContext context) {
         boolean hasMemHungryMeasure = false;
         for (FunctionDesc func : metrics) {
@@ -412,17 +346,20 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         }
     }
 
-    private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Collection<FunctionDesc> functionDescs, StorageContext context) {
+    private void enableStorageLimitIfPossible(Cuboid cuboid, Collection<TblColRef> groups, Set<TblColRef> derivedPostAggregation, Collection<TblColRef> groupsD, TupleFilter filter, Set<TblColRef> loosenedColumnD, Collection<FunctionDesc> functionDescs, StorageContext context) {
         boolean possible = true;
 
-        boolean goodFilter = filter == null || TupleFilter.isEvaluableRecursively(filter);
-        if (!goodFilter) {
+        if (!TupleFilter.isEvaluableRecursively(filter)) {
+            possible = false;
+            logger.info("Storage limit push down is impossible because the filter isn't evaluable");
+        }
+
+        if (!loosenedColumnD.isEmpty()) { // KYLIN-2173
             possible = false;
-            logger.info("Storage limit push down is impossible because the filter is unevaluatable");
+            logger.info("Storage limit push down is impossible because filter is loosened: " + loosenedColumnD);
         }
 
-        boolean goodSort = !context.hasSort();
-        if (!goodSort) {
+        if (context.hasSort()) {
             possible = false;
             logger.info("Storage limit push down is impossible because the query has order by");
         }
@@ -450,8 +387,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         }
 
         if (possible) {
-            logger.info("Enable limit " + context.getLimit());
             context.enableLimit();
+            context.setFinalPushDownLimit(cubeInstance);
         }
     }