You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2018/11/26 16:04:07 UTC
[drill] 13/15: DRILL-6865: Query returns wrong result when filter
pruning happens
This is an automated email from the ASF dual-hosted git repository.
volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
commit d1a082cd11c79497449fda06189cd00d3510b2e9
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Wed Nov 21 14:08:17 2018 +0200
DRILL-6865: Query returns wrong result when filter pruning happens
---
.../store/parquet/AbstractParquetGroupScan.java | 76 +++++++++++++++-------
.../exec/store/parquet/ParquetFilterBuilder.java | 21 ++++--
.../exec/store/parquet/ParquetPushDownFilter.java | 41 ++++++++++--
.../store/parquet/ParquetRGFilterEvaluator.java | 4 +-
.../store/parquet/TestParquetFilterPushDown.java | 5 ++
5 files changed, 112 insertions(+), 35 deletions(-)
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index 0d35ddb..1bbf63b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -247,7 +247,11 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
final List<RowGroupInfo> qualifiedRGs = new ArrayList<>(rowGroupInfos.size());
- ParquetFilterPredicate filterPredicate = null;
+ ParquetFilterPredicate filterPredicate = getParquetFilterPredicate(filterExpr, udfUtilities, functionImplementationRegistry, optionManager, true);
+
+ if (filterPredicate == null) {
+ return null;
+ }
for (RowGroupInfo rowGroup : rowGroupInfos) {
final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns);
@@ -261,27 +265,8 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr);
- if (filterPredicate == null) {
- ErrorCollector errorCollector = new ErrorCollectorImpl();
- LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr(
- filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry);
-
- if (errorCollector.hasErrors()) {
- logger.error("{} error(s) encountered when materialize filter expression : {}",
- errorCollector.getErrorCount(), errorCollector.toErrorString());
- return null;
- }
- logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter));
-
- Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
- filterPredicate = ParquetFilterBuilder.buildParquetFilterPredicate(materializedFilter, constantBoundaries, udfUtilities);
-
- if (filterPredicate == null) {
- return null;
- }
- }
-
- ParquetFilterPredicate.RowsMatch match = ParquetRGFilterEvaluator.matches(filterPredicate, columnStatisticsMap, rowGroup.getRowCount(), parquetTableMetadata, rowGroup.getColumns(), schemaPathsInExpr);
+ ParquetFilterPredicate.RowsMatch match = ParquetRGFilterEvaluator.matches(filterPredicate,
+ columnStatisticsMap, rowGroup.getRowCount(), parquetTableMetadata, rowGroup.getColumns(), schemaPathsInExpr);
if (match == ParquetFilterPredicate.RowsMatch.NONE) {
continue; // No row comply to the filter => drop the row group
}
@@ -310,6 +295,53 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
return null;
}
}
+
+ /**
+ * Returns parquet filter predicate built from specified {@code filterExpr}.
+ *
+ * @param filterExpr filter expression to build
+ * @param udfUtilities udf utilities
+ * @param functionImplementationRegistry context to find drill function holder
+ * @param optionManager option manager
+ * @param omitUnsupportedExprs whether expressions which cannot be converted
+ * may be omitted from the resulting expression
+ * @return parquet filter predicate
+ */
+ public ParquetFilterPredicate getParquetFilterPredicate(LogicalExpression filterExpr,
+ UdfUtilities udfUtilities, FunctionImplementationRegistry functionImplementationRegistry,
+ OptionManager optionManager, boolean omitUnsupportedExprs) {
+ // used first row group to receive fields list
+ assert rowGroupInfos.size() > 0 : "row groups count cannot be 0";
+ RowGroupInfo rowGroup = rowGroupInfos.iterator().next();
+ ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns);
+
+ Map<String, String> implicitColValues = columnExplorer.populateImplicitColumns(
+ rowGroup.getPath(),
+ getPartitionValues(rowGroup),
+ supportsFileImplicitColumns());
+
+ ParquetMetaStatCollector statCollector = new ParquetMetaStatCollector(
+ parquetTableMetadata,
+ rowGroup.getColumns(),
+ implicitColValues);
+
+ Set<SchemaPath> schemaPathsInExpr = filterExpr.accept(new ParquetRGFilterEvaluator.FieldReferenceFinder(), null);
+ Map<SchemaPath, ColumnStatistics> columnStatisticsMap = statCollector.collectColStat(schemaPathsInExpr);
+
+ ErrorCollector errorCollector = new ErrorCollectorImpl();
+ LogicalExpression materializedFilter = ExpressionTreeMaterializer.materializeFilterExpr(
+ filterExpr, columnStatisticsMap, errorCollector, functionImplementationRegistry);
+
+ if (errorCollector.hasErrors()) {
+ logger.error("{} error(s) encountered when materialize filter expression : {}",
+ errorCollector.getErrorCount(), errorCollector.toErrorString());
+ return null;
+ }
+ logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter));
+
+ Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
+ return ParquetFilterBuilder.buildParquetFilterPredicate(materializedFilter, constantBoundaries, udfUtilities, omitUnsupportedExprs);
+ }
// filter push down methods block end
// limit push down methods start
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java
index f0f1029..86e207f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetFilterBuilder.java
@@ -63,6 +63,12 @@ public class ParquetFilterBuilder extends AbstractExprVisitor<LogicalExpression,
static final Logger logger = LoggerFactory.getLogger(ParquetFilterBuilder.class);
private final UdfUtilities udfUtilities;
+ // Flag to check whether predicate cannot be fully converted
+ // to parquet filter predicate without omitting its parts.
+ // It should be set to false for the case when we want to
+ // verify that predicate is fully convertible to parquet filter predicate,
+ // otherwise null is returned instead of the converted expression.
+ private final boolean omitUnsupportedExprs;
/**
* @param expr materialized filter expression
@@ -71,18 +77,24 @@ public class ParquetFilterBuilder extends AbstractExprVisitor<LogicalExpression,
*
* @return parquet filter predicate
*/
- public static ParquetFilterPredicate buildParquetFilterPredicate(LogicalExpression expr, final Set<LogicalExpression> constantBoundaries, UdfUtilities udfUtilities) {
- LogicalExpression logicalExpression = expr.accept(new ParquetFilterBuilder(udfUtilities), constantBoundaries);
+ public static ParquetFilterPredicate buildParquetFilterPredicate(LogicalExpression expr,
+ Set<LogicalExpression> constantBoundaries, UdfUtilities udfUtilities, boolean omitUnsupportedExprs) {
+ LogicalExpression logicalExpression =
+ expr.accept(new ParquetFilterBuilder(udfUtilities, omitUnsupportedExprs), constantBoundaries);
if (logicalExpression instanceof ParquetFilterPredicate) {
return (ParquetFilterPredicate) logicalExpression;
+ } else if (logicalExpression instanceof TypedFieldExpr) {
+ // Calcite simplifies `= true` expression to field name, wrap it with is true predicate
+ return (ParquetFilterPredicate) ParquetIsPredicate.createIsPredicate(FunctionGenerationHelper.IS_TRUE, logicalExpression);
}
logger.debug("Logical expression {} was not qualified for filter push down", logicalExpression);
return null;
}
- private ParquetFilterBuilder(UdfUtilities udfUtilities) {
+ private ParquetFilterBuilder(UdfUtilities udfUtilities, boolean omitUnsupportedExprs) {
this.udfUtilities = udfUtilities;
+ this.omitUnsupportedExprs = omitUnsupportedExprs;
}
@Override
@@ -159,8 +171,9 @@ public class ParquetFilterBuilder extends AbstractExprVisitor<LogicalExpression,
for (LogicalExpression arg : op.args) {
LogicalExpression childPredicate = arg.accept(this, value);
if (childPredicate == null) {
- if (functionName.equals("booleanOr")) {
+ if (functionName.equals("booleanOr") || !omitUnsupportedExprs) {
// we can't include any leg of the OR if any of the predicates cannot be converted
+ // or prohibited omitting of unconverted operands
return null;
}
} else {
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
index c59cdce..95a0534 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
@@ -134,13 +134,32 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
// get a conjunctions of the filter condition. For each conjunction, if it refers to ITEM or FLATTEN expression
// then we could not pushed down. Otherwise, it's qualified to be pushed down.
- final List<RexNode> predList = RelOptUtil.conjunctions(condition);
+ final List<RexNode> predList = RelOptUtil.conjunctions(RexUtil.toCnf(filter.getCluster().getRexBuilder(), condition));
final List<RexNode> qualifiedPredList = new ArrayList<>();
- for (final RexNode pred : predList) {
+ // list of predicates which cannot be converted to parquet filter predicate
+ List<RexNode> nonConvertedPredList = new ArrayList<>();
+
+ for (RexNode pred : predList) {
if (DrillRelOptUtil.findOperators(pred, Collections.emptyList(), BANNED_OPERATORS) == null) {
+ LogicalExpression drillPredicate = DrillOptiq.toDrill(
+ new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, pred);
+
+ // checks whether predicate may be used for filter pushdown
+ ParquetFilterPredicate parquetFilterPredicate =
+ groupScan.getParquetFilterPredicate(drillPredicate,
+ optimizerContext,
+ optimizerContext.getFunctionRegistry(),
+ optimizerContext.getPlannerSettings().getOptions(), false);
+ // collects predicates that contain unsupported for filter pushdown expressions
+ // to build filter with them
+ if (parquetFilterPredicate == null) {
+ nonConvertedPredList.add(pred);
+ }
qualifiedPredList.add(pred);
+ } else {
+ nonConvertedPredList.add(pred);
}
}
@@ -155,7 +174,7 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
- final GroupScan newGroupScan = groupScan.applyFilter(conditionExp,optimizerContext,
+ final GroupScan newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext,
optimizerContext.getFunctionRegistry(), optimizerContext.getPlannerSettings().getOptions());
if (timer != null) {
logger.debug("Took {} ms to apply filter on parquet row groups. ", timer.elapsed(TimeUnit.MILLISECONDS));
@@ -166,10 +185,10 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
return;
}
- RelNode newScan = new ScanPrel(scan.getCluster(), scan.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable());
+ RelNode newNode = new ScanPrel(scan.getCluster(), scan.getTraitSet(), newGroupScan, scan.getRowType(), scan.getTable());
if (project != null) {
- newScan = project.copy(project.getTraitSet(), Collections.singletonList(newScan));
+ newNode = project.copy(project.getTraitSet(), Collections.singletonList(newNode));
}
if (newGroupScan instanceof AbstractParquetGroupScan) {
@@ -182,12 +201,20 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
}
}
if (matchAll == ParquetFilterPredicate.RowsMatch.ALL) {
- call.transformTo(newScan);
+ // creates filter from the expressions which can't be pushed to the scan
+ if (nonConvertedPredList.size() > 0) {
+ newNode = filter.copy(filter.getTraitSet(), newNode,
+ RexUtil.composeConjunction(
+ filter.getCluster().getRexBuilder(),
+ nonConvertedPredList,
+ true));
+ }
+ call.transformTo(newNode);
return;
}
}
- final RelNode newFilter = filter.copy(filter.getTraitSet(), Collections.singletonList(newScan));
+ final RelNode newFilter = filter.copy(filter.getTraitSet(), Collections.singletonList(newNode));
call.transformTo(newFilter);
}
}
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
index 281e865..0125149 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetRGFilterEvaluator.java
@@ -87,8 +87,8 @@ public class ParquetRGFilterEvaluator {
}
Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
- ParquetFilterPredicate parquetPredicate = (ParquetFilterPredicate) ParquetFilterBuilder.buildParquetFilterPredicate(
- materializedFilter, constantBoundaries, udfUtilities);
+ ParquetFilterPredicate parquetPredicate = ParquetFilterBuilder.buildParquetFilterPredicate(
+ materializedFilter, constantBoundaries, udfUtilities, true);
return matches(parquetPredicate, columnStatisticsMap, rowCount);
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
index ea12f40..ccc1480 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
@@ -70,6 +70,7 @@ public class TestParquetFilterPushDown extends PlanTestBase {
@AfterClass
public static void teardown() throws IOException {
+ fragContext.close();
fs.close();
}
@@ -294,6 +295,10 @@ public class TestParquetFilterPushDown extends PlanTestBase {
PlanTestBase.testPlanMatchingPatterns(sql + "a < 1 or a > 1", new String[]{"numRowGroups=3"}); // No filter pruning
PlanTestBase.testPlanMatchingPatterns(sql + "a < 1 or a > 2", new String[]{"numRowGroups=2"}, new String[]{"Filter\\("}); //Filter pruning
+
+ // Partial filter pruning
+ testParquetFilterPruning(sql + "a >=1 and cast(a as varchar) like '%3%'", 1, 2, new String[]{">\\($1, 1\\)"});
+ testParquetFilterPruning(sql + "a >=1 and a/3>=1", 2, 2, new String[]{">\\($1, 1\\)"});
}
@Test