You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2018/11/26 16:04:08 UTC

[drill] 14/15: DRILL-6865: Filter is not removed from the plan when parquet table fully matches the filter

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 99a3d76551d1a08958c7cd7670df189963fbc943
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Fri Nov 23 11:34:13 2018 +0200

    DRILL-6865: Filter is not removed from the plan when parquet table fully matches the filter
    
    closes #1552
---
 .../store/parquet/AbstractParquetGroupScan.java    | 44 ++++++++++++-----
 .../exec/store/parquet/ParquetPushDownFilter.java  | 57 +++++++++++++---------
 .../drill/exec/store/parquet/RowGroupInfo.java     |  6 ---
 .../store/parquet/TestParquetFilterPushDown.java   |  9 ++++
 4 files changed, 74 insertions(+), 42 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
index 1bbf63b..a366339 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/AbstractParquetGroupScan.java
@@ -85,6 +85,8 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
 
   private List<EndpointAffinity> endpointAffinities;
   private ParquetGroupScanStatistics parquetGroupScanStatistics;
+  // whether all row groups of this group scan fully match the filter
+  private boolean matchAllRowGroups = false;
 
   protected AbstractParquetGroupScan(String userName,
                                      List<SchemaPath> columns,
@@ -111,6 +113,7 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
     this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
     this.entries = that.entries == null ? null : new ArrayList<>(that.entries);
     this.readerConfig = that.readerConfig;
+    this.matchAllRowGroups = that.matchAllRowGroups;
   }
 
   @JsonProperty
@@ -136,6 +139,11 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
   }
 
   @JsonIgnore
+  public boolean isMatchAllRowGroups() {
+    return matchAllRowGroups;
+  }
+
+  @JsonIgnore
   @Override
   public Collection<String> getFiles() {
     return fileSet;
@@ -229,15 +237,12 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
   }
 
   @Override
-  public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
-                               FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
-
-    if (rowGroupInfos.size() == 1 ||
-        ! (parquetTableMetadata.isRowGroupPrunable()) ||
-        rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)
-        ) {
-      // Stop pruning for 3 cases:
-      //    -  1 single parquet file,
+  public AbstractParquetGroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
+      FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
+
+    if (!parquetTableMetadata.isRowGroupPrunable() ||
+        rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)) {
+      // Stop pruning for 2 cases:
       //    -  metadata does not have proper format to support row group level filter pruning,
       //    -  # of row groups is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.
       return null;
@@ -253,6 +258,8 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
       return null;
     }
 
+    boolean matchAllRowGroupsLocal = true;
+
     for (RowGroupInfo rowGroup : rowGroupInfos) {
       final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns);
       List<String> partitionValues = getPartitionValues(rowGroup);
@@ -270,16 +277,27 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
       if (match == ParquetFilterPredicate.RowsMatch.NONE) {
         continue; // No row comply to the filter => drop the row group
       }
-      rowGroup.setRowsMatch(match);
+      // for the case when any of row groups partially matches the filter,
+      // matchAllRowGroupsLocal should be set to false
+      if (matchAllRowGroupsLocal) {
+        matchAllRowGroupsLocal = match == ParquetFilterPredicate.RowsMatch.ALL;
+      }
 
       qualifiedRGs.add(rowGroup);
     }
 
-    if (qualifiedRGs.size() == rowGroupInfos.size() ) {
+    if (qualifiedRGs.size() == rowGroupInfos.size()) {
       // There is no reduction of rowGroups. Return the original groupScan.
       logger.debug("applyFilter() does not have any pruning!");
+      matchAllRowGroups = matchAllRowGroupsLocal;
       return null;
     } else if (qualifiedRGs.size() == 0) {
+      if (rowGroupInfos.size() == 1) {
+        // For the case when group scan has single row group and it was filtered,
+        // no need to create new group scan with the same row group.
+        return null;
+      }
+      matchAllRowGroupsLocal = false;
       logger.debug("All row groups have been filtered out. Add back one to get schema from scanner.");
       RowGroupInfo rg = rowGroupInfos.iterator().next();
       qualifiedRGs.add(rg);
@@ -289,7 +307,9 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {
       ExpressionStringBuilder.toString(filterExpr), rowGroupInfos.size(), qualifiedRGs.size());
 
     try {
-      return cloneWithRowGroupInfos(qualifiedRGs);
+      AbstractParquetGroupScan cloneGroupScan = cloneWithRowGroupInfos(qualifiedRGs);
+      cloneGroupScan.matchAllRowGroups = matchAllRowGroupsLocal;
+      return cloneGroupScan;
     } catch (IOException e) {
       logger.warn("Could not apply filter prune due to Exception : {}", e);
       return null;
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
index 95a0534..c12ea73 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetPushDownFilter.java
@@ -28,9 +28,7 @@ import org.apache.calcite.rex.RexUtil;
 import org.apache.drill.common.expression.LogicalExpression;
 import org.apache.drill.common.expression.ValueExpressions;
 import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
-import org.apache.drill.exec.expr.stat.ParquetFilterPredicate.RowsMatch;
 import org.apache.drill.exec.ops.OptimizerRulesContext;
-import org.apache.drill.exec.physical.base.GroupScan;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.logical.DrillOptiq;
 import org.apache.drill.exec.planner.logical.DrillParseContext;
@@ -174,14 +172,35 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
 
 
     Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
-    final GroupScan newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext,
+    AbstractParquetGroupScan newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext,
         optimizerContext.getFunctionRegistry(), optimizerContext.getPlannerSettings().getOptions());
     if (timer != null) {
       logger.debug("Took {} ms to apply filter on parquet row groups. ", timer.elapsed(TimeUnit.MILLISECONDS));
       timer.stop();
     }
 
-    if (newGroupScan == null ) {
+    // For the case when newGroupScan wasn't created, the old one may
+    // fully match the filter for the case when row group pruning did not happen.
+    if (newGroupScan == null) {
+      if (groupScan.isMatchAllRowGroups()) {
+        RelNode child = project == null ? scan : project;
+        // If current row group fully matches filter,
+        // but row group pruning did not happen, remove the filter.
+        if (nonConvertedPredList.size() == 0) {
+          call.transformTo(child);
+        } else if (nonConvertedPredList.size() == predList.size()) {
+          // None of the predicates participated in filter pushdown.
+          return;
+        } else {
+          // If some of the predicates weren't used in the filter, creates new filter with them
+          // on top of current scan. Excludes the case when all predicates weren't used in the filter.
+          call.transformTo(filter.copy(filter.getTraitSet(), child,
+              RexUtil.composeConjunction(
+                  filter.getCluster().getRexBuilder(),
+                  nonConvertedPredList,
+                  true)));
+        }
+      }
       return;
     }
 
@@ -191,27 +210,17 @@ public abstract class ParquetPushDownFilter extends StoragePluginOptimizerRule {
       newNode = project.copy(project.getTraitSet(), Collections.singletonList(newNode));
     }
 
-    if (newGroupScan instanceof AbstractParquetGroupScan) {
-      RowsMatch matchAll = RowsMatch.ALL;
-      List<RowGroupInfo> rowGroupInfos = ((AbstractParquetGroupScan) newGroupScan).rowGroupInfos;
-      for (RowGroupInfo rowGroup : rowGroupInfos) {
-        if (rowGroup.getRowsMatch() != RowsMatch.ALL) {
-          matchAll = RowsMatch.SOME;
-          break;
-        }
-      }
-      if (matchAll == ParquetFilterPredicate.RowsMatch.ALL) {
-        // creates filter from the expressions which can't be pushed to the scan
-        if (nonConvertedPredList.size() > 0) {
-          newNode = filter.copy(filter.getTraitSet(), newNode,
-              RexUtil.composeConjunction(
-                  filter.getCluster().getRexBuilder(),
-                  nonConvertedPredList,
-                  true));
-        }
-        call.transformTo(newNode);
-        return;
+    if (newGroupScan.isMatchAllRowGroups()) {
+      // creates filter from the expressions which can't be pushed to the scan
+      if (nonConvertedPredList.size() > 0) {
+        newNode = filter.copy(filter.getTraitSet(), newNode,
+            RexUtil.composeConjunction(
+                filter.getCluster().getRexBuilder(),
+                nonConvertedPredList,
+                true));
       }
+      call.transformTo(newNode);
+      return;
     }
 
     final RelNode newFilter = filter.copy(filter.getTraitSet(), Collections.singletonList(newNode));
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
index 7d2143c..1c9ce10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/RowGroupInfo.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.store.parquet;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.drill.exec.expr.stat.ParquetFilterPredicate.RowsMatch;
 import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
 import org.apache.drill.exec.store.dfs.easy.FileWork;
 import org.apache.drill.exec.store.schedule.CompleteWork;
@@ -36,7 +35,6 @@ public class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, Fil
   private List<? extends ColumnMetadata> columns;
   private long rowCount;  // rowCount = -1 indicates to include all rows.
   private long numRecordsToRead;
-  private RowsMatch rowsMatch = RowsMatch.SOME;
 
   @JsonCreator
   public RowGroupInfo(@JsonProperty("path") String path,
@@ -96,8 +94,4 @@ public class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, Fil
   public void setColumns(List<? extends ColumnMetadata> columns) {
     this.columns = columns;
   }
-
-  public RowsMatch getRowsMatch() { return rowsMatch; }
-
-  public void setRowsMatch(RowsMatch rowsMatch) { this.rowsMatch = rowsMatch; }
 }
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
index ccc1480..80b06d9 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetFilterPushDown.java
@@ -649,6 +649,15 @@ public class TestParquetFilterPushDown extends PlanTestBase {
     assertEquals(RowsMatch.ALL, isNotFalse.matches(re));
   }
 
+  @Test
+  public void testParquetSingleRowGroupFilterRemoving() throws Exception {
+    test("create table dfs.tmp.`singleRowGroupTable` as select * from cp.`tpch/nation.parquet`");
+
+    String query = "select * from dfs.tmp.`singleRowGroupTable` where n_nationkey > -1";
+
+    testParquetFilterPruning(query, 25, 1, new String[]{"Filter\\("});
+  }
+
   //////////////////////////////////////////////////////////////////////////////////////////////////
   // Some test helper functions.
   //////////////////////////////////////////////////////////////////////////////////////////////////