You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2018/11/26 16:04:07 UTC

[drill] 13/15: DRILL-6865: Query returns wrong result when filter pruning happens

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit d1a082cd11c79497449fda06189cd00d3510b2e9
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Wed Nov 21 14:08:17 2018 +0200

    DRILL-6865: Query returns wrong result when filter pruning happens
---
 .../store/parquet/AbstractParquetGroupScan.java    | 76 +++++++++++++++-------
 .../exec/store/parquet/ParquetFilterBuilder.java   | 21 ++++--
 .../exec/store/parquet/ParquetPushDownFilter.java  | 41 ++++++++++--
 .../store/parquet/ParquetRGFilterEvaluator.java    |  4 +-
 .../store/parquet/TestParquetFilterPushDown.java   |  5 ++
 5 files changed, 112 insertions(+), 35 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index 0d35ddb..1bbf63b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -247,7 +247,11 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
 
     final List<RowGroupInfo> qualifiedRGs = new ArrayList<>(rowGroupInfos.size());
 
-    ParquetFilterPredicate filterPredicate = null;
+    ParquetFilterPredicate filterPredicate = getParquetFilterPredicate(filterExpr, udfUtilities, functionImplementationRegistry, optionManager, true);
+
+    if (filterPredicate == null) {
+      return null;
+    }
 
     for (RowGroupInfo rowGroup : rowGroupInfos) {
       final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns);
@@ -261,27 +265,8 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
 
       Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr);
 
-      if (filterPredicate == null) {
-        ErrorCollector errorCollector = new ErrorCollectorImpl();
-        LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr(
-            filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry);
-
-        if (errorCollector.hasErrors()) {
-          logger.error("{} error(s) encountered when materialize filter expression : {}",
-              errorCollector.getErrorCount(), errorCollector.toErrorString());
-          return null;
-        }
-        logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter));
-
-        Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
-        filterPredicate = ParquetFilterBuilder.buildParquetFilterPredicate(materializedFilter, constantBoundaries, udfUtilities);
-
-        if (filterPredicate == null) {
-          return null;
-        }
-      }
-
-      ParquetFilterPredicate.RowsMatch match = ParquetRGFilterEvaluator.matches(filterPredicate, columnStatisticsMap, rowGroup.getRowCount(), parquetTableMetadata, rowGroup.getColumns(), schemaPathsInExpr);
+      ParquetFilterPredicate.RowsMatch match = ParquetRGFilterEvaluator.matches(filterPredicate,
+          columnStatisticsMap, rowGroup.getRowCount(), parquetTableMetadata, rowGroup.getColumns(), schemaPathsInExpr);
       if (match == ParquetFilterPredicate.RowsMatch.NONE) {
         continue; // No row comply to the filter => drop the row group
       }
@@ -310,6 +295,53 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
       return null;
     }
   }
+
+  /**
+   * Returns parquet filter predicate built from specified {@code filterExpr}.
+   *
+   * @param filterExpr                     filter expression to build
+   * @param udfUtilities                   udf utilities
+   * @param functionImplementationRegistry context to find drill function holder
+   * @param optionManager                  option manager
+   * @param omitUnsupportedExprs           whether expressions which cannot be converted
+   *                                       may be omitted from the resulting expression
+   * @return parquet filter predicate
+   */
+  public ParquetFilterPredicate getParquetFilterPredicate(LogicalExpression filterExpr,
+      UdfUtilities udfUtilities, FunctionImplementationRegistry functionImplementationRegistry,
+      OptionManager optionManager, boolean omitUnsupportedExprs) {
+    // used first row group to receive fields list
+    assert rowGroupInfos.size() > 0 : "row groups count cannot be 0";
+    RowGroupInfo rowGroup = rowGroupInfos.iterator().next();
+    ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns);
+
+    Map<String, String> implicitColValues = columnExplorer.populateImplicitColumns(
+        rowGroup.getPath(),
+        getPartitionValues(rowGroup),
+        supportsFileImplicitColumns());
+
+    ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector(
+        parquetTableMetadata,
+        rowGroup.getColumns(),
+        implicitColValues);
+
+    Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new ParquetRGFilterEvaluator.FieldReferenceFinder(), null);
+    Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr);
+
+    ErrorCollector errorCollector = new ErrorCollectorImpl();
+    LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr(
+        filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry);
+
+    if (errorCollector.hasErrors()) {
+      logger.error("{} error(s) encountered when materialize filter expression : {}",
+          errorCollector.getErrorCount(), errorCollector.toErrorString());
+      return null;
+    }
+    logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter));
+
+    Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
+    return ParquetFilterBuilder.buildParquetFilterPredicate(materializedFilter, constantBoundaries, udfUtilities, omitUnsupportedExprs);
+  }
   // filter push down methods block end
 
   // limit push down methods start
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java
index f0f1029..86e207f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java
@@ -63,6 +63,12 @@ public class ParquetFilterBuilder extends AbstractExprVisitor<LogicalExpression,
   static final Logger logger = LoggerFactory.getLogger(ParquetFilterBuilder.class);
 
   private final UdfUtilities udfUtilities;
+  // Flag to check whether predicate cannot be fully converted
+  // to parquet filter predicate without omitting its parts.
+  // It should be set to false for the case when we want to
+  // verify that predicate is fully convertible to parquet filter predicate,
+  // otherwise null is returned instead of the converted expression.
+  private final boolean omitUnsupportedExprs;
 
   /**
    * @param expr materialized filter expression
@@ -71,18 +77,24 @@ public class ParquetFilterBuilder extends AbstractExprVisitor<LogicalExpression,
    *
    * @return parquet filter predicate
    */
-  public static ParquetFilterPredicate buildParquetFilterPredicate(LogicalExpression expr, final Set<LogicalExpression> constantBoundaries, UdfUtilities udfUtilities) {
-    LogicalExpression logicalExpression = expr.accept(new ParquetFilterBuilder(udfUtilities), constantBoundaries);
+  public static ParquetFilterPredicate buildParquetFilterPredicate(LogicalExpression expr,
+      Set<LogicalExpression> constantBoundaries, UdfUtilities udfUtilities, boolean omitUnsupportedExprs) {
+    LogicalExpression logicalExpression =
+        expr.accept(new ParquetFilterBuilder(udfUtilities, omitUnsupportedExprs), constantBoundaries);
     if (logicalExpression instanceof ParquetFilterPredicate) {
       return (ParquetFilterPredicate) logicalExpression;
+    } else if (logicalExpression instanceof TypedFieldExpr) {
+      // Calcite simplifies `= true` expression to field name, wrap it with is true predicate
+      return (ParquetFilterPredicate) ParquetIsPredicate.createIsPredicate(FunctionGenerationHelper.IS_TRUE, logicalExpression);
     }
     logger.debug("Logical expression {} was not qualified for filter push down", logicalExpression);
     return null;
   }
 
 
-  private ParquetFilterBuilder(UdfUtilities udfUtilities) {
+  private ParquetFilterBuilder(UdfUtilities udfUtilities, boolean omitUnsupportedExprs) {
     this.udfUtilities = udfUtilities;
+    this.omitUnsupportedExprs = omitUnsupportedExprs;
   }
 
   @Override
@@ -159,8 +171,9 @@ public class ParquetFilterBuilder extends AbstractExprVisitor<LogicalExpression,
     for (LogicalExpression arg : op.args) {
       LogicalExpression childPredicate = arg.accept(this, value);
       if (childPredicate == null) {
-        if (functionName.equals("booleanOr")) {
+        if (functionName.equals("booleanOr") || !omitUnsupportedExprs) {
           // we can't include any leg of the OR if any of the predicates cannot be converted
+          // or prohibited omitting of unconverted operands
           return null;
         }
       } else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
index c59cdce..95a0534 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
@@ -134,13 +134,32 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
 
     // get a conjunctions of the filter condition. For each conjunction, if it refers to ITEM or FLATTEN expression
     // then we could not pushed down. Otherwise, it's qualified to be pushed down.
-    final List<RexNode> predList = RelOptUtil.conjunctions(condition);
+    final List<RexNode> predList = RelOptUtil.conjunctions(RexUtil.toCnf(filter.getCluster().getRexBuilder(), condition));
 
     final List<RexNode> qualifiedPredList = new ArrayList<>();
 
-    for (final RexNode pred : predList) {
+    // list of predicates which cannot be converted to parquet filter predicate
+    List<RexNode> nonConvertedPredList = new ArrayList<>();
+
+    for (RexNode pred : predList) {
       if (DrillRelOptUtil.findOperators(pred, Collections.emptyList(), BANNED_OPERATORS) == null) {
+        LogicalExpression drillPredicate = DrillOptiq.toDrill(
+            new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, pred);
+
+        // checks whether predicate may be used for filter pushdown
+        ParquetFilterPredicate parquetFilterPredicate =
+            groupScan.getParquetFilterPredicate(drillPredicate,
+                optimizerContext,
+                optimizerContext.getFunctionRegistry(),
+                optimizerContext.getPlannerSettings().getOptions(), false);
+        // collects predicates that contain unsupported for filter pushdown expressions
+        // to build filter with them
+        if (parquetFilterPredicate == null) {
+          nonConvertedPredList.add(pred);
+        }
         qualifiedPredList.add(pred);
+      } else {
+        nonConvertedPredList.add(pred);
       }
     }
 
@@ -155,7 +174,7 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
 
 
     Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
-    final GroupScan newGroupScan = groupScan.applyFilter(conditionExp,optimizerContext,
+    final GroupScan newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext,
         optimizerContext.getFunctionRegistry(), optimizerContext.getPlannerSettings().getOptions());
     if (timer != null) {
       logger.debug("Took {} ms to apply filter on parquet row groups. ", timer.elapsed(TimeUnit.MILLISECONDS));
@@ -166,10 +185,10 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
       return;
     }
 
-    RelNode newScan = new ScanPrel(scan.getCluster(), scan.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable());
+    RelNode newNode = new ScanPrel(scan.getCluster(), scan.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable());
 
     if (project != null) {
-      newScan = project.copy(project.getTraitSet(), Collections.singletonList(newScan));
+      newNode = project.copy(project.getTraitSet(), Collections.singletonList(newNode));
     }
 
     if (newGroupScan instanceof AbstractParquetGroupScan) {
@@ -182,12 +201,20 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
         }
       }
       if (matchAll == ParquetFilterPredicate.RowsMatch.ALL) {
-        call.transformTo(newScan);
+        // creates filter from the expressions which can't be pushed to the scan
+        if (nonConvertedPredList.size() > 0) {
+          newNode = filter.copy(filter.getTraitSet(), newNode,
+              RexUtil.composeConjunction(
+                  filter.getCluster().getRexBuilder(),
+                  nonConvertedPredList,
+                  true));
+        }
+        call.transformTo(newNode);
         return;
       }
     }
 
-    final RelNode newFilter = filter.copy(filter.getTraitSet(), Collections.singletonList(newScan));
+    final RelNode newFilter = filter.copy(filter.getTraitSet(), Collections.singletonList(newNode));
     call.transformTo(newFilter);
   }
 }
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
index 281e865..0125149 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
@@ -87,8 +87,8 @@ public class ParquetRGFilterEvaluator {
     }
 
     Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
-    ParquetFilterPredicate parquetPredicate = (ParquetFilterPredicate) ParquetFilterBuilder.buildParquetFilterPredicate(
-        materializedFilter, constantBoundaries, udfUtilities);
+    ParquetFilterPredicate parquetPredicate = ParquetFilterBuilder.buildParquetFilterPredicate(
+        materializedFilter, constantBoundaries, udfUtilities, true);
 
     return matches(parquetPredicate, columnStatisticsMap, rowCount);
   }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
index ea12f40..ccc1480 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
@@ -70,6 +70,7 @@ public class TestParquetFilterPushDown extends PlanTestBase {
 
   @AfterClass
   public static void teardown() throws IOException {
+    fragContext.close();
     fs.close();
   }
 
@@ -294,6 +295,10 @@ public class TestParquetFilterPushDown extends PlanTestBase {
 
     PlanTestBase.testPlanMatchingPatterns(sql + "a < 1 or a > 1", new String[]{"numRowGroups=3"}); // No filter pruning
     PlanTestBase.testPlanMatchingPatterns(sql + "a < 1 or a > 2", new String[]{"numRowGroups=2"}, new String[]{"Filter\\("}); //Filter pruning
+
+    // Partial filter pruning
+    testParquetFilterPruning(sql + "a >=1 and cast(a as varchar) like '%3%'", 1, 2, new String[]{">\\($1, 1\\)"});
+    testParquetFilterPruning(sql + "a >=1 and a/3>=1", 2, 2, new String[]{">\\($1, 1\\)"});
   }
 
   @Test