You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by vo...@apache.org on 2019/01/25 16:49:06 UTC

[drill] 02/08: DRILL-6910: Allow applying DrillPushProjectIntoScanRule at physical phase

This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 03f4677667ffed11c3c3ac0e80eb354436bf311e
Author: Volodymyr Vysotskyi <vv...@gmail.com>
AuthorDate: Wed Jan 23 01:21:13 2019 +0200

    DRILL-6910: Allow applying DrillPushProjectIntoScanRule at physical phase
    
    closes #1619
---
 .../apache/drill/exec/planner/PlannerPhase.java    |   3 +-
 .../logical/DrillPushProjectIntoScanRule.java      | 111 +++++++++++++++++----
 .../java/org/apache/drill/TestProjectPushDown.java |  20 +++-
 3 files changed, 111 insertions(+), 23 deletions(-)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
index 91d9d43..2d2b073 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/PlannerPhase.java
@@ -447,7 +447,8 @@ public enum PlannerPhase {
             // estimation of filter operator, after filter is pushed down to scan.
 
             ParquetPushDownFilter.getFilterOnProject(optimizerRulesContext),
-            ParquetPushDownFilter.getFilterOnScan(optimizerRulesContext)
+            ParquetPushDownFilter.getFilterOnScan(optimizerRulesContext),
+            DrillPushProjectIntoScanRule.DRILL_PHYSICAL_INSTANCE
         )
         .build();
 
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectIntoScanRule.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectIntoScanRule.java
index db20cb7..8d0ac84 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectIntoScanRule.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/logical/DrillPushProjectIntoScanRule.java
@@ -25,10 +25,16 @@ import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.TableScan;
 import org.apache.calcite.rel.logical.LogicalProject;
 import org.apache.calcite.rel.rules.ProjectRemoveRule;
+import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexNode;
 import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.planner.common.DrillProjectRelBase;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil;
 import org.apache.drill.exec.planner.common.DrillRelOptUtil.ProjectPushInfo;
+import org.apache.drill.exec.planner.common.DrillScanRelBase;
+import org.apache.drill.exec.planner.physical.Prel;
+import org.apache.drill.exec.planner.physical.ProjectPrel;
+import org.apache.drill.exec.planner.physical.ScanPrel;
 import org.apache.drill.exec.util.Utilities;
 
 import java.io.IOException;
@@ -43,51 +49,76 @@ public class DrillPushProjectIntoScanRule extends RelOptRule {
   public static final RelOptRule INSTANCE =
       new DrillPushProjectIntoScanRule(LogicalProject.class,
           EnumerableTableScan.class,
-          "DrillPushProjIntoEnumerableScan");
+          "DrillPushProjIntoEnumerableScan") {
+
+        @Override
+        protected boolean skipScanConversion(RelDataType projectRelDataType, TableScan scan) {
+          // do not allow skipping conversion of EnumerableTableScan to DrillScanRel if rule is applicable
+          return false;
+        }
+      };
 
   public static final RelOptRule DRILL_LOGICAL_INSTANCE =
       new DrillPushProjectIntoScanRule(LogicalProject.class,
           DrillScanRel.class,
           "DrillPushProjIntoDrillRelScan");
 
+  public static final RelOptRule DRILL_PHYSICAL_INSTANCE =
+      new DrillPushProjectIntoScanRule(ProjectPrel.class,
+          ScanPrel.class,
+          "DrillPushProjIntoScanPrel") {
+
+        @Override
+        protected ScanPrel createScan(TableScan scan, ProjectPushInfo projectPushInfo) {
+          ScanPrel drillScan = (ScanPrel) scan;
+
+          return new ScanPrel(drillScan.getCluster(),
+              drillScan.getTraitSet().plus(Prel.DRILL_PHYSICAL),
+              drillScan.getGroupScan().clone(projectPushInfo.getFields()),
+              projectPushInfo.createNewRowType(drillScan.getCluster().getTypeFactory()),
+              drillScan.getTable());
+        }
+
+        @Override
+        protected ProjectPrel createProject(Project project, TableScan newScan, List<RexNode> newProjects) {
+          return new ProjectPrel(project.getCluster(),
+              project.getTraitSet().plus(Prel.DRILL_PHYSICAL),
+              newScan,
+              newProjects,
+              project.getRowType());
+        }
+      };
+
   private DrillPushProjectIntoScanRule(Class<? extends Project> projectClass, Class<? extends TableScan> scanClass, String description) {
     super(RelOptHelper.some(projectClass, RelOptHelper.any(scanClass)), description);
   }
 
   @Override
   public void onMatch(RelOptRuleCall call) {
-    final Project project = call.rel(0);
-    final TableScan scan = call.rel(1);
+    Project project = call.rel(0);
+    TableScan scan = call.rel(1);
 
     try {
-
       if (scan.getRowType().getFieldList().isEmpty()) {
         return;
       }
 
       ProjectPushInfo projectPushInfo = DrillRelOptUtil.getFieldsInformation(scan.getRowType(), project.getProjects());
-      if (!canPushProjectIntoScan(scan.getTable(), projectPushInfo)) {
+      if (!canPushProjectIntoScan(scan.getTable(), projectPushInfo)
+          || skipScanConversion(projectPushInfo.createNewRowType(project.getCluster().getTypeFactory()), scan)) {
+        // project above scan may be removed in ProjectRemoveRule for the case when it is trivial
         return;
       }
 
-      final DrillScanRel newScan =
-          new DrillScanRel(scan.getCluster(),
-              scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
-              scan.getTable(),
-              projectPushInfo.createNewRowType(project.getInput().getCluster().getTypeFactory()),
-              projectPushInfo.getFields());
+      DrillScanRelBase newScan = createScan(scan, projectPushInfo);
 
       List<RexNode> newProjects = new ArrayList<>();
       for (RexNode n : project.getChildExps()) {
         newProjects.add(n.accept(projectPushInfo.getInputReWriter()));
       }
 
-      final DrillProjectRel newProject =
-          new DrillProjectRel(project.getCluster(),
-              project.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
-              newScan,
-              newProjects,
-              project.getRowType());
+      DrillProjectRelBase newProject =
+          createProject(project, newScan, newProjects);
 
       if (ProjectRemoveRule.isTrivial(newProject)) {
         call.transformTo(newScan);
@@ -100,6 +131,52 @@ public class DrillPushProjectIntoScanRule extends RelOptRule {
   }
 
   /**
+   * Checks whether conversion of input {@code TableScan} instance to target
+   * {@code TableScan} may be omitted.
+   *
+   * @param projectRelDataType project rel data type
+   * @param scan               TableScan to be checked
+   * @return true if specified {@code TableScan} has the same row type as specified.
+   */
+  protected boolean skipScanConversion(RelDataType projectRelDataType, TableScan scan) {
+    return projectRelDataType.equals(scan.getRowType());
+  }
+
+  /**
+   * Creates new {@code DrillProjectRelBase} instance with specified {@code TableScan newScan} child
+   * and {@code List<RexNode> newProjects} expressions using specified {@code Project project} as prototype.
+   *
+   * @param project     the prototype of resulting project
+   * @param newScan     new project child
+   * @param newProjects new project expressions
+   * @return new project instance
+   */
+  protected DrillProjectRelBase createProject(Project project, TableScan newScan, List<RexNode> newProjects) {
+    return new DrillProjectRel(project.getCluster(),
+        project.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+        newScan,
+        newProjects,
+        project.getRowType());
+  }
+
+  /**
+   * Creates new {@code DrillScanRelBase} instance with row type and fields list
+   * obtained from specified {@code ProjectPushInfo projectPushInfo}
+   * using specified {@code TableScan scan} as prototype.
+   *
+   * @param scan            the prototype of resulting scan
+   * @param projectPushInfo the source of row type and fields list
+   * @return new scan instance
+   */
+  protected DrillScanRelBase createScan(TableScan scan, ProjectPushInfo projectPushInfo) {
+    return new DrillScanRel(scan.getCluster(),
+        scan.getTraitSet().plus(DrillRel.DRILL_LOGICAL),
+        scan.getTable(),
+        projectPushInfo.createNewRowType(scan.getCluster().getTypeFactory()),
+        projectPushInfo.getFields());
+  }
+
+  /**
    * Push project into scan be done only if this is not a star query and
    * table supports project push down.
    *
diff --git a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
index ced105c..f152ba3 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestProjectPushDown.java
@@ -39,7 +39,6 @@ public class TestProjectPushDown extends PlanTestBase {
   }
 
   @Test
-  @Ignore
   public void testGroupBy() throws Exception {
     String expectedColNames = " \"columns\" : [ \"`marital_status`\" ]";
     testPhysicalPlan(
@@ -48,7 +47,6 @@ public class TestProjectPushDown extends PlanTestBase {
   }
 
   @Test
-  @Ignore
   public void testOrderBy() throws Exception {
     String expectedColNames = "\"columns\" : [ \"`employee_id`\", \"`full_name`\", \"`first_name`\", \"`last_name`\" ]";
     testPhysicalPlan("select employee_id , full_name, first_name , last_name "
@@ -57,7 +55,6 @@ public class TestProjectPushDown extends PlanTestBase {
   }
 
   @Test
-  @Ignore
   public void testExprInSelect() throws Exception {
     String expectedColNames = "\"columns\" : [ \"`employee_id`\", \"`full_name`\", \"`first_name`\", \"`last_name`\" ]";
     testPhysicalPlan(
@@ -67,7 +64,6 @@ public class TestProjectPushDown extends PlanTestBase {
   }
 
   @Test
-  @Ignore
   public void testExprInWhere() throws Exception {
     String expectedColNames = "\"columns\" : [ \"`employee_id`\", \"`full_name`\", \"`first_name`\", \"`last_name`\" ]";
     testPhysicalPlan(
@@ -291,12 +287,26 @@ public class TestProjectPushDown extends PlanTestBase {
     final String query = "SELECT L.L_QUANTITY FROM cp.`tpch/lineitem.parquet` L, cp.`tpch/orders.parquet` O" +
         " WHERE cast(L.L_ORDERKEY as int) = cast(O.O_ORDERKEY as int)";
     final String[] expectedPatterns = {
-        ".*HashJoin.*", "Project.*\\(L_QUANTITY.*CAST\\(\\$0\\)\\:INTEGER.*", "Project.*CAST\\(\\$0\\)\\:INTEGER.*"};
+        ".*HashJoin.*", "Project.*\\(L_QUANTITY\\=\\[\\$0\\].*CAST\\(\\$1\\)\\:INTEGER.*", "Project.*CAST\\(\\$0\\)\\:INTEGER.*"};
     // L_ORDERKEY, O_ORDERKEY should not be present in the projects below the join
     final String[] excludedPatterns = {".*Project\\(L_ORDERKEY=.*", ".*Project\\(O_ORDERKEY=.*"};
     PlanTestBase.testPlanMatchingPatterns(query, expectedPatterns, excludedPatterns);
   }
 
+  @Test
+  public void testProjectPushdownAfterFilterRemoving() throws Exception {
+    test("create table dfs.tmp.`nation` as\n" +
+        "select * from cp.`tpch/nation.parquet` where n_regionkey < 10");
+    try {
+      // filter will be removed form the plan
+      String query = "select n_nationkey from dfs.tmp.`nation` where n_regionkey < 10";
+      PlanTestBase.testPlanMatchingPatterns(query,
+          new String[]{"columns\\=\\[`n_nationkey`\\]"}, new String[]{"n_regionkey"});
+    } finally {
+      test("drop table if exists dfs.tmp.`nation`");
+    }
+  }
+
   protected void testPushDown(PushDownTestInstance test) throws Exception {
     testPhysicalPlan(test.getSql(), test.getExpected());
   }